Asyc Server

Description

This article goes through the thorogh theory and practical information of creating an async server that can handle multiple clients at the same time. We would start with simple socket server that handles only one client and then gradually improve our server.

Let’s get started…

Simple echo server

# server.py
 
import socket
 
def handle_connection(sock: socket.socket):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)
 
    print('Client disconnected', sock.getpeername())
    sock.close()
 
def run_server(host: str, port: int) -> None:
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print('Server is listening on {}:{}'.format(host, port))
    while True:
        client_sock, addr = sock.accept()
        print('Connetion from', addr)
        handle_connection(client_sock)
 
if __name__ == '__main__':
    run_server('localhost', 3000)

Now, when we run it and connect with it, only one connection can be made at single point of time. For next connection the already existing connection has to end first.

python3 server.py
 
# client
nc localhost 3000
hello
hello

Introducing Threads

Let’s use threads for multiple connections:

 
# server.py
 
import socket
import threading
 
def handle_connection(sock: socket.socket):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)
 
    print('Client disconnected', sock.getpeername())
    sock.close()
 
def run_server(host: str, port: int) -> None:
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print('Server is listening on {}:{}'.format(host, port))
    while True:
        client_sock, addr = sock.accept()
        print('Connetion from', addr)
				thread = threading.Thread(target=handle_connection, args=[client_sock])
	      thread.start()
 
if __name__ == '__main__':
    run_server('localhost', 3000)
 

So, this let’s us connect multiple clients but still for each connection you need to have a thread which is expensive.

Using Thread pools

To control the number of threads being used, we can use thread pools

from concurrent.futures import ThreadPoolExecutor
 
sockets = set()
 
pool = ThreadPoolExecutor(max_workers=20)
 
def handle_connection(sock: socket.socket):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)
 
    print('Client disconnected', sock.getpeername())
    sock.close()
 
def run_server(host: str, port: int) -> None:
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print('Server is listening on {}:{}'.format(host, port))
    while True:
        client_sock, addr = sock.accept()
        print('Connetion from', addr)
				pool.submit(handle_connection, client_sock)
 
if __name__ == '__main__':
    run_server('localhost', 3000)

If you create 20 connections, to make one more connection, you need to wait for the atleast one to get end.

Let’s use IO multiplexing

The major problem with our socket is that it has to wait for any read or write that blocks the program execution. How cool that would be if someone notifies us that there is some update on the socket and run a callback? Cool huh…

Python provides IO multiplexing module selectors that we can use. It provides different selectors and one of them is DefaultSelector .

To register a socket, we do

sel = selectors.DefaultSelector()
 
sel.register(sock, selectors.EVENT_READ, data)

sel.select() returns a list of (key, events) tuple and each tuple describes a readu socket.

  • key is an object with socket object key.filobj and auxiliary data key.data.
  • events are bitmasks of events ready on the socket.

So, [server.py](http://server.py) with selectors

# server.py
 
import selectors
import socket
 
sockets = set()
 
def broadcast(broadcaster, message):
    for sock in sockets:
        if broadcaster is not sock:
            sock.sendall(message)
 
def handle_connection(sock: socket.socket):
    received_data = sock.recv(4096)
    if received_data:
        broadcast(sock, received_data)
    else:
        print('Client disconnected:', sock.getpeername())
        sel.unregister(sock)
        sock.close()
 
sel = selectors.DefaultSelector()
 
def accept(sock: socket.socket):
    client_sock, addr = sock.accept()
    print('Client connected from {}'.format(addr))
    sockets.add(client_sock)
    sel.register(client_sock, selectors.EVENT_READ, handle_connection)
 
def setup_server_socket(host: str, port: int):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print('Server is listening on {}:{}'.format(host, port))
    sel.register(sock, selectors.EVENT_READ, accept)
 
def run_event_loop():
    while True:
        for key, _ in sel.select():
            cb = key.data
            sock = key.fileobj
            cb(sock)
 
if __name__ == '__main__':
    setup_server_socket('localhost', 3000)
    run_event_loop()

This approach also has a problem. We are writing to the socket and if the writing queue is full, it will block the execution. Which means, handle_connection should be broken into two functions to write and read from the socket.

This works for the simple applications but for the larger applications this doesn’t work.

Generators as Coroutines

We can use yield return the control back to the caller. This way we can achieve context switching as run code irrespective of blocking statements. The easiest thing we can do it to put yield above every blocking statements.

About yield

This is used to create generators. What it does is returns the control back to the caller if yield gets executed. For example,

1. def create_counter():
2. 		for i in range(10):
3. 			yield i
4.       
 
counter = create_counter()
 
next(counter) # gives 0
next(counter) # gives 1
#...

So, it returns the control back to the caller if yield gets executed. In the context of generator, if generator is called again, it resumes where it left of. For examle, next(counter) returns 0 and controls goes back to calling location. When next(counter) called again generator start executing from line number 4 . It goes through the loop, i becomes 1 and again yield happens and it returns 1.

Let’s look another example for more details

def generator():
		print('started')
		print('before yielding 1')
		yield 1
		print('before yielding 2')
		yield 2
 
g = generator()
next(g) # prints 'started', 'before yielding 1' and returns 1
next(g) # prints 'before yielding 2' and returns 2
next(g) # raises StopIteration exception and there is no more yield statement

Using generator in our problem

So, we can put yield wherever there is blocking code. Also, we need an event loop the runs all the generators (or coroutines).

# server.py
from collections import deque
import socket
 
class EventLoop:
    def __init__(self):
        self.tasks_to_run = deque([])
 
    def create_task(self, coro):
        self.tasks_to_run.append(coro)
 
    def run(self):
        while self.tasks_to_run:
            task = self.tasks_to_run.popleft()
            try:
                print("running task", task)
                next(task)
            except StopIteration:
                print('STOPITERATION FOR ==> ', task)
                continue
            self.create_task(task)
 
def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print("Server is running on {}:{}".format(host, port))
    while True:
        print("in run_server while loop")
        yield
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        loop.create_task(handle_client(client_sock))
 
def handle_client(sock):
    print(sock)
    while True:
        yield
        received_data = sock.recv(4096)
        print(received_data)
        if not received_data:
            break
        yield
        sock.sendall(received_data)
 
    print('Client disconnected:', sock.getpeername())
    sock.close()
 
loop = EventLoop()
 
if __name__ == '__main__':
    loop.create_task(run_server('localhost', 3000))
    loop.run()

So, there is an event loop which has two methods of creating tasks and running all of them sequentially. It’s using deque .

Flow

  • We create event loop loop and create task where run_server() returns a generator that gets added to the queue. Then we start the loop.
  • Loop takes the leftmost task from the queue, runs it using next() that executes the generator until next yield comes. For the first time when run_server generator is getting executed, socket gets setup, we goes into the loop and we encouters our first yield that returns control back to the loop.
  • Loop again takes the same task as there is no task, continue executing the task. As before it was yield , now it executes accept() that blocks the execution and waits for the connection.
  • Once a client is connected, control moves forward. One more task is created of connection handler. Execution continues there in run_server loop. It goes to the loop begining and prints in run_server while loop where again yield comes and control goes back to the event loop.
  • Next task is then executed which is handler. It goes like that.

Did you see the problem here?

There is a problem. Though there is some sort of concorrency but still another task has to wait for the previous task to get completed (or until next yield comes in the task).

So, if a client is connected and there is handler task get control, if client writes something on the socket, it won’t be written back as the

yield
        sock.sendall(received_data)

makes control go back to the event loop and run_server takes control and it waits for the new connection. When a new connection is made then control goes back to handler and then data is written back.

Solving this issue with IO multiplexing

We can combine both generators and IO multiplexing to improve our solution. We have modify our EventLoop class.

# event loop class
 
class EventLoop():
    def __init__(self):
        self.tasks = deque([])
        self.sel = selectors.DefaultSelector()
 
    def create_task(self, task):
        self.tasks.append(task)
 
    def run(self):
        while True:
            print('tasks', self.tasks)
            if self.tasks:
                task = self.tasks.popleft()
                try:
                    op, sock = next(task)
                except StopIteration:
                    continue
 
                if op == 'read':
                    self.sel.register(sock, selectors.EVENT_READ, task)
                elif op == 'write':
                    self.sel.register(sock, selectors.EVENT_WRITE, task)
                else:
                    raise ValueError('Invalid event loop operation type', op)
            else:
                for key, _ in self.sel.select():
                    sock = key.fileobj
                    task = key.data
                    self.sel.unregister(sock)
                    self.create_task(task)

So, what change we made in EventLoop class is that we get the task, run next. Now, yield in generator returns some information. It returns the operation ( read or write) and the particular socket.

What we do is, according the operation code, we register the socket with selectors. So, if there is no task in the queue, we wait for some update on the socket using sel.select() method. We get the socket, and the task, unregister the socket and put back the same task into the queue where it gets executed.

def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    print("Server is running on {}:{}".format(host, port))
    while True:
        print("in run_server while loop")
        yield 'read', sock
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        sockets.add(client_sock)
        loop.create_task(handle_client(client_sock))
 
def broadcast(sender_sock, message):
    for sock in sockets:
        if sock is not sender_sock:
            sock.sendall(message)
 
def handle_client(sock):
    print(sock)
    while True:
        yield 'read', sock
        received_data = sock.recv(4096)
        print(received_data)
        if not received_data:
            break
        yield 'write', sock
        broadcast(sock, received_data)
        # sock.sendall(received_data)
 
    print('Client disconnected:', sock.getpeername())
    sock.close()
 
loop = EventLoop()
 
if __name__ == '__main__':
    loop.create_task(run_server('localhost', 3000))
    loop.run()