Uniquely Managing Test Execution Resources using WebSockets

By tryexceptpass

Executing tests for simple applications is complicated. You have to think about the users, how they interact with it, how those interactions propagate through different components, as well as how to handle error situations gracefully. But things get even more complicated when you start looking at more extensive systems, like those with multiple external dependencies.

Dependencies come in various forms, including third-party modules, cloud services, compute resources, networks, and others.

This level of complexity is standard in almost all projects involving a large organization, whether delivering internal tools or external products.

It means you must put emphasis on developing test systems and mechanisms good enough to validate not just code, but those third-party dependencies as well. After all, they’re part of the final product and failing to interact with them, means the product fails.

What do you mean by test resource?

Today we discuss dependencies in the form of resources required to run your tests.

For example, when making a cloud application that works with multiple AWS services, you’ll likely have to manage compute and network resources. Given that the world does not work with an unlimited budget - monetary or otherwise - odds are you’ll have limitations in how many of those resources are available for testing.

Another example of resources relevant in a test environment is port assignments for various instances of a web service.

Maybe your automation includes stress testing a service load-balanced across several copies running in the same host, where each instance needs a different port assignment.

Perhaps you’re validating simple application that takes a long time to test, so you want to parallelize execution across a limited set of resources in your lab or cloud infrastructure.

A network port, a hardware resource, or a virtual host, regardless of their function, the one thing they have in common is that they are limited. Meaning, we must figure out how to manage them to execute our tests properly.

Approaches to managing test resources

Whether running automated tests or merely doing things manually, you’ll need a system with which to request resources for testing. One that tells you which resources are in use, while providing a simple mechanism to return those systems when finished.

It doesn’t matter whether your talking cloud or physical resources, I’ve found over the years that managing them in a large organization happens in multiple tiers.

Budget assignments come at an organizational level that shares with different departments, which give out smaller portions to specific teams. Then within a team, you dedicate some resources to development, others for manual and exploratory tests, and the rest goes towards automation.

In terms of organizing those resources, some folks like the concept of test benches, while others prefer using pools.

A test bench includes a collection of resources configured in a specific way. Selecting a bench means you selected the entire list of resources assigned to it. Depending on what you’re doing, this can lead to test inefficiencies and lower utilization.

Using a pool makes things more fluid because you’re working with a finer granularity. However, your trade-off could be in more complex test setup and cleanup functions.

It’s up to you to apply engineering knowledge of the product under test and how the tests themselves to pick the most efficient configuration. However, I’ve found that resource pools tend to provide better utilization numbers than the test bench idea.

The discussion in this article leans towards the resource pool concepts.

Using sockets to manage resource pools

There are plenty of off-the-shelf systems to help you. Reserving resources isn’t new. You do it every day at the office when you schedule a conference room for a meeting!

These systems are usually better for longer-term reservations at the higher tier of departmental or team assignments. Finding one that’s easily extensible and configurable to meet your specific testing needs is hard. In the end, you might do as much work to set it up how you want, as if you built it yourself.

Working effectively with a pool of resources does require us to return them when we’re finished. But this is easier said than done in test systems because you’re unsure of the nature of the build your testing.

The test might fail unexpectedly, or the process could die, your database might stop accepting connections, the host running your test function could fail. An almost endless number of possibilities come to mind, so it’s impossible to account for every situation.

In my time building these types of systems, I realized that using sockets to indicate a resource reservation is not a bad idea. If the socket is still open, then the resource is in use, if it dies for whatever reason, then we return it back to the pool.

I’ve used this mechanism effectively to manage hundreds of mundane and complex resources with test infrastructure running dozens of executions in parallel.

Why use WebSockets?

But how would you put that together? You don’t need anything fancy, and you don’t want anything custom, especially port assignments for the sockets.

In large networks, custom ports require special exceptions and changes to IT infrastructure to enable routing. Most of which will not only delay your rollout but also create yet another problem to worry about: are we routing things correctly?

Instead, I’ve chosen to do this over WebSockets because connections initiate from standard HTTP/S, which is already routed and working fine in IT infrastructure. Otherwise, your organization would be unable to access the internet.

How to build it

You might remember from a little while back when we used Sanic to build a webhook receiver or a log viewer.

It’s a versatile Python package with a lot of good defaults that can handle asynchronous functions. It works well in a situation like this to serve both as a standard REST API handling HTTP requests as well as a WebSocket server.

We’re going to track the resources in-memory using a simple Python list of dictionaries. It’s sufficient for short-lived testing in a small system.

If the server isn’t up and available, you can’t test, so it doesn’t matter if you can tell whether the resources are in use or not. For a more long-lived service, check out the next steps at the end of this article.

This time around, the system has two pieces: the server that manages resources pools, and a client used by the test functions to request resources from the server.

The WebSocket Server

Let’s get the easy stuff out of the way first.

The server needs a default port to run on, and the Sanic application to work with:

from sanic import Sanic DEFAULT_PORT = 8000 app = Sanic()

It has to track the available resources in a way that allows us to distinguish between pools and know whether they’re in use or not.

RESOURCE_AVAILABLE = 'available'
RESOURCE_IN_USE = 'in_use'
RESOURCES = [ { 'value': 7000, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 7010, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 7020, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 'some_vm.example.com', 'status': RESOURCE_AVAILABLE, 'type': 'compute' }, { 'value': '10.10.10.1', 'status': RESOURCE_AVAILABLE, 'type': 'compute' },
]

Here we’re saying that each resource has a type and a value to distinguish it from others. The value is what we return to the client, it has to contain enough information so that our test functions can use it, and it has to be unique.

The type delineates the pools. In our example, we have two types: compute and port, meaning we have a pool of network ports and a pool of compute hosts. A test function could do something like request('port') when asking for a new port resource. I like these semantics because it’s easy for a human to understand what’s going on by reading the test.

The server also needs to know which clients currently connected, and it should handle a queue for each resource type.

resource_clients = {}
resource_queues = {}

The point of having a resource_clients dictionary is to track the IP address and port number that a client used to connect to our server. Later you’ll see that each client generates a unique identifier that we can use as the key in this structure.

The resource_queues dict has an entry per pool of all clients waiting for a resource. When a client connects, if there isn’t a resource for it to use immediately, it goes into this structure.

Whenever a resource becomes available, we’ll check the queues to see if any clients are waiting for it and automatically assign it to them.

I’m using lists as queues in this example because I don’t need anything fancy. But, you should also know about the deque data structure provided in the built-in Python collections module. It has extra capabilities useful for making queues, like the ability to pop elements from both sides of the queue.

Given the previous notes, it becomes straightforward to find a resource available by using a list comprehension that checks its status and type:

available_resources = [r for r in RESOURCES if r['status'] == RESOURCE_AVAILABLE and r['type'] == resource_type]

Reserving a resource is as easy as changing its status and adding a key to track the client that requested it:

resource['status'] = RESOURCE_IN_USE
resource['used_by'] = { 'client_id': client_id, 'ip': resource_clients[client_id]['ip'], 'port': resource_clients[client_id]['port']
}

Returning one to the pool is also a simple status update and dict key removal:

resource['status'] = RESOURCE_AVAILABLE
del doc['used_by']

Now on to the more exciting parts that focus on using Sanic for this type of solution. Let’s use the app object we made earlier to put together a status endpoint that answers on HTTP.

It’s a way of validating that the server is online and routable from our clients. We already know how to do this from previous articles.

@app.get('/status', version=1)
async def status(request): """Check that the API server is up and running""" return response.json({'status': 'up'})

Because it shortcuts any internal server dependencies, an endpoint like this serves as a “ping.” Part of my routine is to make sure that any API server I build has a mechanism like this. I can’t tell you how many times it’s helped me find problems unrelated to my code.

Using Sanic to make the WebSocket endpoint is no more complicated than the previous function, it’s one of the reasons I love this module:

@app.websocket('/request')
async def websocket_handler(request, ws): """Handle websocket connections""" client_ip = request.headers.get('X-Forwarded-For', ws.remote_address[0]) logging.info(f"{client_ip}:{ws.remote_address[1]} - New Connection")

We have more work to do in this handler function around validating a tiny communications standard between the client and server, and the actual handling of resource commands, but we’ll talk about that later.

The request parameter is just like the HTTP request because a WebSocket connection is an “upgraded” HTTP connection. While the ws parameter contains the actual socket object that we’re using to communicate, it holds information about the client, like the IP address and port used in the connection.

In the real world, it’s highly unlikely that your Sanic application runs barebones on the network. It’s probably behind some load balancer, reverse proxy, or even an organizational proxy set up by your IT department.

We can’t blindly trust the value in ws.remote_address because it may always be the proxy’s address, or 127.0.0.1 if you’re using a local reverse proxy.

In proxy configurations, when forwarding requests, it’s common to set the X-Forwarded-For header to point to the IP address of the originating, so we check for that first.

The tricky piece is that we want to release resources whenever a client disconnects their WebSocket automatically. That means we have to add a little custom code into how Sanic manages the connection. The mechanism for doing that is through the WebSocketProtocol class.

import asyncio
from sanic.websocket import WebSocketProtocol class ResourceServerProtocol(WebSocketProtocol): def connection_lost(self, exc): super().connection_lost(exc) if self.websocket is not None: # Release resources asyncio.run_coroutine_threadsafe(release_client_resources(self.websocket.client_id), asyncio.get_event_loop()) # Remove the client from any waiting lists resource_clients.pop(self.websocket.client_id, None)

In the Sanic architecture, a WebSocketProtocol defines the server behavior when handling WebSocket connections. In our case, we’re subclassing the protocol to add two more steps after a connection is lost.

Note that we still call the connection_lost method of the base class using super(). Once Sanic finishes doing its thing, we release the resources held by the client that’s disconnecting, remove it from any queues and drop it from the list of connected clients.

Because Sanic makes use of asyncio, all our endpoints, and most of the supporting functions are also async. You can’t directly call release_client_resources() - defined further down - we instead use the asyncio.run_coroutine_threadsafe() to schedule it as a task in the event loop.

If you have questions about asyncio and how to work with it, we have a few articles on the topic. You can start with Threaded Asynchronous Magic and How to Wield It.

The snippet below puts all the server code together:

import logging
import json
import websockets
import asyncio from sanic import Sanic, response
from sanic.websocket import WebSocketProtocol DEFAULT_PORT = 8000
RESOURCE_AVAILABLE = 'available'
RESOURCE_IN_USE = 'in_use' # Sanic REST application
app = Sanic() # Resource pool
RESOURCES = [ { 'value': 7000, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 7010, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 7020, 'status': RESOURCE_AVAILABLE, 'type': 'port' }, { 'value': 'some_vm.example.com', 'status': RESOURCE_AVAILABLE, 'type': 'compute' }, { 'value': '10.10.10.1', 'status': RESOURCE_AVAILABLE, 'type': 'compute' },
] # Tracks the connected websocket clients
resource_clients = {} # Tracks the queues for resource types
resource_queues = {} def get_resource(resource_type): """Find the first resource of a given type that's available""" available_resources = [r for r in RESOURCES if r['status'] == RESOURCE_AVAILABLE and r['type'] == resource_type] if len(available_resources) > 0: return available_resources[0] def reserve_resource(resource, client_id): """Reserve a specific resource for the given client""" logging.info(f"{client_id} - Reserving {resource['value']}") resource['used_by'] = { 'client_id': client_id, 'ip': resource_clients[client_id]['ip'], 'port': resource_clients[client_id]['port'] } resource['status'] = RESOURCE_IN_USE async def release_resource(resource): """Return a resource to the pool""" logging.info(f"Releasing {resource['value']}") resource['status'] = RESOURCE_AVAILABLE del doc['used_by'] await resource_availability_change(resource['type']) async def release_client_resources(client_id): """Release everything reserved by the specified client""" logging.info(f"{client_id} - Releasing all resources") reserved_resources = [r for r in RESOURCES if 'used_by' in r and r['used_by']['client_id'] == client_id] for resource in reserved_resources: await release_resource(resource) # Since the client disconnected, it should not be in queue waiting for resources for resource_type, q in resource_queues.items(): resource_queues[resource_type] = [client for client in q if client['client_id'] != client_id] async def resource_availability_change(resource_type): """A resource is now available, check if someone in queue can use it""" # Make a new queue if we didn't have for this resource type and exit if resource_type not in resource_queues: resource_queues[resource_type] = [] return q = resource_queues[resource_type] # Iterate on the clients waiting in queue and see if they can use this resource type for i, request in enumerate(q): resource = get_resource(resource_type) if resource is not None: # Found one reserve_resource(resource, request['client_id']) # Send it to the client await resource_clients[request['client_id']]['client'].send(json.dumps({'status': 200, 'resource': resource['value']})) # Remove the client from the waiting list q.pop(i) def queue_for_resource(request, client_id): """Add this client_id to the queue for the requested resource""" resource_type = request['resource_type'] # Make a new queue if we didn't have one already if resource_type not in resource_queues: resource_queues[resource_type] = [] resource_queues[resource_type].append({'client_id': client_id}) @app.get('/status', version=1)
async def status(request): """Serve as a status check to verify the API is up and running""" return response.json({'status': 'up'}) @app.websocket('/request')
async def websocket_handler(request, ws): """Handle websocket connections""" # Find client information client_ip = request.headers.get('X-Forwarded-For', ws.remote_address[0]) logging.info(f"{client_ip}:{ws.remote_address[1]} - New Connection") # Store the unique identifier for this client in the websocket protocol for this connection setattr(ws, 'client_id', None) try: # Wait for client registration message = json.loads(await ws.recv()) # The only request allowed at this point is a 'register' command if message.get('command') != 'register' or 'client_id' not in message: return # Grab the client_id from the request ws.client_id = message['client_id'] logging.info(f"{client_ip}:{ws.remote_address[1]} - Registered as {ws.client_id}") # Store the client connection information in-memory resource_clients[ws.client_id] = {'ip': client_ip, 'port': ws.remote_address[1], 'client': ws} # Tell the client they have registered successfully await ws.send(json.dumps({'status': 200})) # Wait for resource requests async for message in ws: try: message = json.loads(message) logging.debug(f"{ws.client_id} - Received {message}") # All resource requests must have a 'command' and 'client_id' field if 'command' not in message or 'client_id' not in message: await ws.send(json.dumps({'status': 400, 'code': 'MissingParameterError', 'message': 'Missing command or client_id'})) # We shouldn't receive a request from a different client on the same socket if message['client_id'] != ws.client_id: await ws.send(json.dumps({'status': 422, 'code': 'InvalidParameterError', 'message': 'Invalid client_id'})) if message['command'] == 'request': # Requesting a new resource logging.info(f"{ws.client_id} - Requested {message['resource_type']} resource") # Attempt to retrieve a resource from the pool resource = get_resource(message['resource_type']) if resource is None: # Couldn't fone, get in queue logging.info(f"{ws.client_id} - Queued for {message['resource_type']}") queue_for_resource(message, ws.client_id) else: # Found a resource, pass it along to the client logging.info(f"{ws.client_id} - Found {message['resource_type']} '{resource}' available") reserve_resource(resource, ws.client_id) await ws.send(json.dumps({'status': 200, 'resource': resource['value']})) except Exception: # There was an unexpected error trying to parse the command message, log it and tell the client logging.exception(f"{ws.client_id} - Exception when handling message {message}") await ws.send(json.dumps({'status': 500, 'code': 'InternalError', 'message': 'An unexpected internal error occurred'})) logging.info(f"{ws.client_id} - Connection Closed") except websockets.exceptions.ConnectionClosed as e: # We should never see this exception because of how Sanic handles the WebSocketProtocol, but keeping it here just in case # Disconnection is handled within the ResourceServerProtocol class logging.info(f"{ws.client_id} - Connection Closed '{e.reason}'") class ResourceServerProtocol(WebSocketProtocol): def connection_lost(self, exc): super().connection_lost(exc) if self.websocket is not None: # Release resources asyncio.run_coroutine_threadsafe(release_client_resources(self.websocket.client_id), asyncio.get_event_loop()) # Remove the client resource_clients.pop(self.websocket.client_id, None) logging.info(f'{self.websocket.client_id} - Removed Client') if __name__ == '__main__': # Setup logging configuration logging.basicConfig(format='[%(levelname)s] %(asctime)s - %(funcName)s: %(message)s', level=logging.INFO) logging.info("Starting server...") app.run(host='0.0.0.0', port=DEFAULT_PORT, protocol=ResourceServerProtocol)

Things to note here:

  • The app.run() call at the bottom has to tell Sanic to use the ResourceServerProtocol class we defined earlier instead of its default.
  • We added checks into websocket_handler() that implement our client-server protocol. As you can see, we expect all messages from our clients to contain a client_id to identify themselves, and a command with their server request.
  • There’s only one possible command that the client can send (request), which it uses to ask for a resource. And this command requires the resource_type parameter that we use to look up and reserve items from our pools.
  • It’s easy to add more commands in the future.
  • All responses to the client contain a status field that uses HTTP standard error codes:
    • 500 for server errors.
    • 4xx for errors in the request.
    • 200 for successful requests.
  • There’s a catch in the handler’s try block that’s using a generic except Exception clause. Not a thing you do typically, but in our case, we don’t want to kill the server if any handler code is failing. Instead, we want to tell the client that we had an error. Notice we print the stack trace using logging.exception().

The Client module

It’s time to look at the client code. We don’t need Sanic for this part because we’re not running the server. Instead, let’s use the websockets module - which is what Sanic uses under the covers to provide its server functionality.

Because we’re building a module for use by our test functions, I always like to spend a little time thinking about how I expect to use it before writing the code.

You’ll want to build an interface that’s simple and intuitive. Picture something that requests a resource like this:

from resources import ResourceClient client = ResourceClient(SERVER_ADDRESS)
host_for_testing = client.request('compute') run_test(host_for_testing)

To implement that, we need a module like this:

import logging
import json
import uuid import asyncio
import websockets DEFAULT_PORT = 8000 class ResourceClientError(Exception): pass class ResourceConnectionError(ResourceClientError): pass class RegistrationError(ResourceConnectionError): pass class ResourceClientProtocol(websockets.WebSocketClientProtocol): def connection_lost(self, exc): super().connection_lost(exc) logging.warning('Lost connection to resource server') class ResourceClient(): def __init__(self, address="127.0.0.1", port=DEFAULT_PORT, client_id=str(uuid.uuid4())): self.server = f'{address}:{port}' self.client_id = client_id self.client = None self.__closing = False self.assigned_resources = {} self._loop = asyncio.get_event_loop() self._loop.run_until_complete(self._connect()) @property def is_connected(self): return False if self.client is None else self.client.open async def _connect(self): """Connect to a websocket server running the resource manager and register a new client""" self.client = await websockets.client.connect(f'ws://{self.server}/request') await self.client.send(json.dumps({'command': 'register', 'client_id': self.client_id})) response = json.loads(await self.client.recv()) logging.debug(f'Connected to Resource server {self.server}') # Check that registration was successful if response['status'] != 200: raise RegistrationError(f"{response['status']} {response['code']} - {response['message']}") logging.info(f'Registered as Resource client {self.client_id}') setattr(self.client, 'resource_client', self) async def _request(self, resource_type): """Asynchronous request for a resource of the given type""" command = {'command': 'request', 'resource_type': resource_type, 'client_id': self.client_id} logging.info(f'Requesting {resource_type}') await self.client.send(json.dumps(command)) response = json.loads(await self.client.recv()) if resource_type in self.assigned_resources: self.assigned_resources[resource_type].append(response['resource']) else: self.assigned_resources[resource_type] = [response['resource']] logging.info(f"Received {resource_type} assignment '{response['resource']}'") return response['resource'] def request(self, resource_type): """Request a new resource and wait for assignment""" return self._loop.run_until_complete(self._request(resource_type))

Notice that we’re making use of proper exception handling here. We have a generic ResourceClientError exception that others can inherit from, so consumers of our code can catch all errors generated by our module by using that class.

Since the websockets module is asynchronous, we’re using asyncio concepts again. This time though, we want to wrap the async calls into synchronous functions.

The main reason is that we want our code to block while it waits in the queue for a resource assignment. We did something similar when building the korv module in this article.

To wrap the functions, we’re putting the execution steps in methods prefixed with underscores (_), and calling them with run_until_complete() from the non-underscored methods exposed to users.

Note that there’s also a custom protocol class (ResourceClientProtocol) for our socket handler that prints a warning if it unexpectedly drops the connection to the server.

The concept is the same as described with the server protocol earlier. Just add a step at the end of the connection_lost() function. You can expand on it more in the future if you want to do smarter things when this happens, like stopping test execution.

Instantiating the class makes a unique identifier for the client.

I’ve used the built-in uuid module as a solution for this before, and I prefer it over other means because the output is usually an easy to parse string that looks like 7b616c94-0ed8-43d8-8902-d905f070c1c9. But you can try other things like a timestamp, or even a crypto library like built-in secrets.

As you can see, sending data over the socket is done with send(), and we use the await to give up control while waiting for the data to make it over or waiting for responses.

The client is also smart enough to keep a dictionary of the resources its been assigned in its assigned_resources property. It makes it easy to retrieve them later in case we need to do something special with them.

Using with pytest fixtures

We can’t talk about testing without talking about pytest, so I wanted to include at least a small blurb on why this type of system pairs well with pytest mechanisms like fixtures.

Fixtures are a great way of reusing test utilities and functions. They help you run setup and teardown for your tests, and they help optimize execution such that you only need to run those steps once for all your test functions that use the same fixture.

Here’s an example of how to integrate:

import pytest
from resources import ResourceClient RESOURCE_SERVER = 'resources.example.com' @pytest.fixture
def get_test_resources(scope='module'): # Connect to the resource server client = ResourceClient(RESOURCE_SERVER) # Request a port in which to start a new API session compute = client.request('compute') port = client.request('port') yield compute, port # Do some teardown clean_resources() def test_1(get_test_resources): compute, port = get_test_resources ssh_session = ssh_to_host(compute) ssh_session.execute(['start_api_server', port]) def test_2(get_test_resources): compute, port = get_test_resources requests.get(f"https://{compute}:{port}/v1/status") assert requests.status_code == 200

We’re running two different tests, which validate two different things.

The first one validates that some shell command properly executes on a compute virtual machine using a fictitious ssh_to_host() function.

The second one checks that there’s a REST API listening in that virtual machine on the given port.

Because we defined get_test_resources() as a fixture with a module-level scope, both of the test functions use its output, but only run it once. The way pytest executes is as follows:

  1. Run get_test_resources() until its yield statement.
  2. Run test_1() and pass the tuple response from the yield statement.
  3. Run test_2() and pass the tuple response from the yield statement.
  4. Return control to get_test_resources() and the steps after the yield statement.

Next steps

This solution works just fine if you don’t have much stuff to keep track of, assuming resources always dedicated to testing. I’ve successfully run a system like it to track hundreds of in-memory resources used by parallel testing that took several hours.

But of course, there’s a trade-off. While simple to manage, you have other problems to think about, like “disabling” resources for maintenance, or recovering from a server crash.

You can mitigate those and many other problems by tracking the resource pool in an external database.

It enables you to query the state of your pool even if the system is down. You can disable and re-enable resources from a separate interface during maintenance, and the persistent state simplifies recovering from crash.

As always, there are many options for choosing the best database for this type of scenario, but I find NoSQL is well suited for it.

It’s high-speed, unstructured, and better for scaling. It enables future attribute enhancements for each resource, allowing complex queries, like: give me the next available compute that’s running Ubuntu.

To be more specific, I prefer CouchDB for this implementation because of its replication capabilities, giving us even better availability.

You can use the python-cloudant module to interact with it.

Summarizing

I hope this article gave you something to think about when approaching resource management in your test infrastructure. Dealing with out-of-sync resource state because of unplanned scenarios consumes a lot of time, but using sockets as a reservation tool reduces the chances of that happening considerably. While it does add network dependencies, I find that the automated way of returning unused resources to a pool when a connection breaks is worth it.

WebSockets are versatile and easy to route. Python interacts with them very well through asynchronous modules like sanic and websockets. Don’t forget to consider them in your next network-related project.