Asyc Server
Description
This article goes through the thorogh theory and practical information of creating an async server that can handle multiple clients at the same time. We would start with simple socket server that handles only one client and then gradually improve our server.
Let’s get started…
Simple echo server
# server.py
import socket
def handle_connection(sock: socket.socket):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print('Client disconnected', sock.getpeername())
sock.close()
def run_server(host: str, port: int) -> None:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print('Server is listening on {}:{}'.format(host, port))
while True:
client_sock, addr = sock.accept()
print('Connetion from', addr)
handle_connection(client_sock)
if __name__ == '__main__':
run_server('localhost', 3000)
Now, when we run it and connect with it, only one connection can be made at single point of time. For next connection the already existing connection has to end first.
python3 server.py
# client
nc localhost 3000
hello
hello
Introducing Threads
Let’s use threads for multiple connections:
# server.py
import socket
import threading
def handle_connection(sock: socket.socket):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print('Client disconnected', sock.getpeername())
sock.close()
def run_server(host: str, port: int) -> None:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print('Server is listening on {}:{}'.format(host, port))
while True:
client_sock, addr = sock.accept()
print('Connetion from', addr)
thread = threading.Thread(target=handle_connection, args=[client_sock])
thread.start()
if __name__ == '__main__':
run_server('localhost', 3000)
So, this let’s us connect multiple clients but still for each connection you need to have a thread which is expensive.
Using Thread pools
To control the number of threads being used, we can use thread pools
from concurrent.futures import ThreadPoolExecutor
sockets = set()
pool = ThreadPoolExecutor(max_workers=20)
def handle_connection(sock: socket.socket):
while True:
received_data = sock.recv(4096)
if not received_data:
break
sock.sendall(received_data)
print('Client disconnected', sock.getpeername())
sock.close()
def run_server(host: str, port: int) -> None:
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print('Server is listening on {}:{}'.format(host, port))
while True:
client_sock, addr = sock.accept()
print('Connetion from', addr)
pool.submit(handle_connection, client_sock)
if __name__ == '__main__':
run_server('localhost', 3000)
If you create 20 connections, to make one more connection, you need to wait for the atleast one to get end.
Let’s use IO multiplexing
The major problem with our socket is that it has to wait for any read or write that blocks the program execution. How cool that would be if someone notifies us that there is some update on the socket and run a callback? Cool huh…
Python provides IO multiplexing module selectors
that we can use. It provides different selectors and one of them is DefaultSelector
.
To register a socket, we do
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, data)
sel.select()
returns a list of (key, events)
tuple and each tuple describes a readu socket.
key
is an object with socket objectkey.filobj
and auxiliary datakey.data
.events
are bitmasks of events ready on the socket.
So, [server.py](http://server.py)
with selectors
# server.py
import selectors
import socket
sockets = set()
def broadcast(broadcaster, message):
for sock in sockets:
if broadcaster is not sock:
sock.sendall(message)
def handle_connection(sock: socket.socket):
received_data = sock.recv(4096)
if received_data:
broadcast(sock, received_data)
else:
print('Client disconnected:', sock.getpeername())
sel.unregister(sock)
sock.close()
sel = selectors.DefaultSelector()
def accept(sock: socket.socket):
client_sock, addr = sock.accept()
print('Client connected from {}'.format(addr))
sockets.add(client_sock)
sel.register(client_sock, selectors.EVENT_READ, handle_connection)
def setup_server_socket(host: str, port: int):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print('Server is listening on {}:{}'.format(host, port))
sel.register(sock, selectors.EVENT_READ, accept)
def run_event_loop():
while True:
for key, _ in sel.select():
cb = key.data
sock = key.fileobj
cb(sock)
if __name__ == '__main__':
setup_server_socket('localhost', 3000)
run_event_loop()
This approach also has a problem. We are writing to the socket and if the writing queue is full, it will block the execution. Which means, handle_connection
should be broken into two functions to write and read from the socket.
This works for the simple applications but for the larger applications this doesn’t work.
Generators as Coroutines
We can use yield
return the control back to the caller. This way we can achieve context switching as run code irrespective of blocking statements. The easiest thing we can do it to put yield
above every blocking statements.
About yield
This is used to create generators. What it does is returns the control back to the caller if yield
gets executed. For example,
1. def create_counter():
2. for i in range(10):
3. yield i
4.
counter = create_counter()
next(counter) # gives 0
next(counter) # gives 1
#...
So, it returns the control back to the caller if yield
gets executed. In the context of generator, if generator is called again, it resumes where it left of. For examle, next(counter)
returns 0 and controls goes back to calling location. When next(counter)
called again generator start executing from line number 4
. It goes through the loop, i
becomes 1 and again yield
happens and it returns 1
.
Let’s look another example for more details
def generator():
print('started')
print('before yielding 1')
yield 1
print('before yielding 2')
yield 2
g = generator()
next(g) # prints 'started', 'before yielding 1' and returns 1
next(g) # prints 'before yielding 2' and returns 2
next(g) # raises StopIteration exception and there is no more yield statement
Using generator in our problem
So, we can put yield
wherever there is blocking code. Also, we need an event loop the runs all the generators (or coroutines).
# server.py
from collections import deque
import socket
class EventLoop:
def __init__(self):
self.tasks_to_run = deque([])
def create_task(self, coro):
self.tasks_to_run.append(coro)
def run(self):
while self.tasks_to_run:
task = self.tasks_to_run.popleft()
try:
print("running task", task)
next(task)
except StopIteration:
print('STOPITERATION FOR ==> ', task)
continue
self.create_task(task)
def run_server(host='127.0.0.1', port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print("Server is running on {}:{}".format(host, port))
while True:
print("in run_server while loop")
yield
client_sock, addr = sock.accept()
print('Connection from', addr)
loop.create_task(handle_client(client_sock))
def handle_client(sock):
print(sock)
while True:
yield
received_data = sock.recv(4096)
print(received_data)
if not received_data:
break
yield
sock.sendall(received_data)
print('Client disconnected:', sock.getpeername())
sock.close()
loop = EventLoop()
if __name__ == '__main__':
loop.create_task(run_server('localhost', 3000))
loop.run()
So, there is an event loop which has two methods of creating tasks and running all of them sequentially. It’s using deque
.
Flow
- We create event loop
loop
and create task whererun_server()
returns a generator that gets added to thequeue
. Then we start the loop. - Loop takes the leftmost task from the queue, runs it using
next()
that executes the generator until nextyield
comes. For the first time whenrun_server
generator is getting executed, socket gets setup, we goes into the loop and we encouters our firstyield
that returns control back to the loop. - Loop again takes the same task as there is no task, continue executing the task. As before it was
yield
, now it executesaccept()
that blocks the execution and waits for the connection. - Once a client is connected, control moves forward. One more task is created of connection handler. Execution continues there in
run_server
loop. It goes to the loop begining and printsin run_server while loop
where againyield
comes and control goes back to the event loop. - Next task is then executed which is handler. It goes like that.
Did you see the problem here?
There is a problem. Though there is some sort of concorrency but still another task has to wait for the previous task to get completed (or until next yield
comes in the task).
So, if a client is connected and there is handler task get control, if client writes something on the socket, it won’t be written back as the
yield
sock.sendall(received_data)
makes control go back to the event loop and run_server
takes control and it waits for the new connection. When a new connection is made then control goes back to handler and then data is written back.
Solving this issue with IO multiplexing
We can combine both generators and IO multiplexing to improve our solution. We have modify our EventLoop
class.
# event loop class
class EventLoop():
def __init__(self):
self.tasks = deque([])
self.sel = selectors.DefaultSelector()
def create_task(self, task):
self.tasks.append(task)
def run(self):
while True:
print('tasks', self.tasks)
if self.tasks:
task = self.tasks.popleft()
try:
op, sock = next(task)
except StopIteration:
continue
if op == 'read':
self.sel.register(sock, selectors.EVENT_READ, task)
elif op == 'write':
self.sel.register(sock, selectors.EVENT_WRITE, task)
else:
raise ValueError('Invalid event loop operation type', op)
else:
for key, _ in self.sel.select():
sock = key.fileobj
task = key.data
self.sel.unregister(sock)
self.create_task(task)
So, what change we made in EventLoop
class is that we get the task, run next
. Now, yield
in generator returns some information. It returns the operation ( read or write) and the particular socket.
What we do is, according the operation code, we register the socket with selectors. So, if there is no task in the queue, we wait for some update on the socket using sel.select()
method. We get the socket, and the task, unregister the socket and put back the same task into the queue where it gets executed.
def run_server(host='127.0.0.1', port=55555):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen()
print("Server is running on {}:{}".format(host, port))
while True:
print("in run_server while loop")
yield 'read', sock
client_sock, addr = sock.accept()
print('Connection from', addr)
sockets.add(client_sock)
loop.create_task(handle_client(client_sock))
def broadcast(sender_sock, message):
for sock in sockets:
if sock is not sender_sock:
sock.sendall(message)
def handle_client(sock):
print(sock)
while True:
yield 'read', sock
received_data = sock.recv(4096)
print(received_data)
if not received_data:
break
yield 'write', sock
broadcast(sock, received_data)
# sock.sendall(received_data)
print('Client disconnected:', sock.getpeername())
sock.close()
loop = EventLoop()
if __name__ == '__main__':
loop.create_task(run_server('localhost', 3000))
loop.run()