high concurrent ws log server

backgroud

previously, we designed a logger module from ws to db/, this blog is one real implementation of the python solution.

high concurrent ws server online

single server with multi clients: a simple C++

  • the server is started first, and wait for the incoming client calls, periodically report its status: how many clients keep connected. meanwhile, once an incoming call is detected and accepted, the server will create a separate thread to handle this client. therefore, the server creates as many separate sessions as there are incoming clients.

  • how handle multiple clients? once an incoming client call is received and accepted, the main server thread will create a new thread, and pass the client connection to this thread

  • what if client threads need access some global variables? a semaphore instance ish helpful.

how ws server handles multiple incomming connections

  • socket object is often thought to represent a connection, but not entirely true, since they can be active or passive. a socket object in passive/listen mode is created by listen() for incoming connection requets. by definition, such a socket is not a connection, it just listen for conenction requets.

  • accept() doesn’t change the state of the passive socket created by listen() previously, but it returns an active/connected socket, which represents a real conenction. after accept() has returned the connected socket object, it can be called again on the passive socket, and again and again. or known as accept loop

  • But call accept() takes time, can’t it miss incoming conenction requests? it won’t, there is a queue of pending connection requests, it is handled automatically by TCP/IP stack of the OS. meaning, while accept() can only deal with incoming connection request one-by-one, no incoming request will be missed even when they are incoming at a high rate.

python env setup

websockets module requires python3.6, the python version in bare OS is 3.5, which gives:

1
2
3
4
5
6
7
8
9
10
File "/usr/lib/python3/dist-packages/websockets/compatibility.py", line 8
asyncio_ensure_future = asyncio.async # Python < 3.5
^
SyntaxError: invalid syntaxß
```
basically, [asyncio.async](https://stackoverflow.com/questions/51292523/why-does-asyncio-ensure-future-asyncio-async-raise-a-syntaxerror-invalid-synt) and another depend module [discord.py](https://github.com/Rapptz/discord.py/issues/1396), require python < 3.5v, then gives another error:
```python
TypeError: unsupported operand type(s) for -=: 'Retry' and 'int'

the Retry error fixed by adding the following lines to ~/.pip/pip.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
```
in a bare system with pre-installed python3.5, I did the following steps:
```python
sudo apt install software-properties-common
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt-get install python3.7
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 2
sudo apt-get install python3-websockets
sudo apt-get install python3-websocket
sudo apt-get install python3-sqlalchemy
sudo pip3 install threadpool
sudo apt-get remove --purge python3-websocket #based on python3.5
sudo apt-get install python3-websocket #based on python3.7

which gives another error:

1
2
3
4
5
6
/var/lib/dpkg/info/python3-websocket.postinst: 6: /var/lib/dpkg/info/python3-websocket.postinst: py3compile: not found
dpkg: error processing package python3-websocket (--configure):
subprocess installed post-installation script returned error exit status 127
Errors were encountered while processing:
python3-websocket
E: Sub-process /usr/bin/dpkg returned an error code (1)

which then be fixed by go to /var/lib/dpkg/info/ and delete all python3-websocket.* files:

1
2
3
sudo rm /var/lib/dpkg/info/[package_name].*
sudo dpkg --configure -a
sudo apt-get update

everything looks good, but still report:

1
ModuleNotFoundError: No module named 'websockets'

Gave up setting up with the bare python, then create a new conda env, and ran the following settings inside, clean and simple:

1
2
3
4
5
pip install websockets
pip install websocket-client #rather websocket
pip install threadpool
pip install sqlalchemy
pip install psycopg2

during remote test, if ws server down unexpected, need kill the ws pid:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
sudo netstat -tlnp #find the special running port and its pid
kill $pid
```
or `kill $(lsof -t -i:$port)`
## websockets & websocket-client
the long-lived connection sample from [ws-client](https://github.com/websocket-client/websocket-client) is good base, as here need to test a high concurrent clients, we add `threadpool`:
```python
def on_open(ws, num):
def run(*args): ##args
for i in range(3):
time.sleep(1)
message = "your_message"
ws.send(json.dumps(message))
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
def on_start(num):
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://localhost:8888/",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open(ws, num)
ws.run_forever()
def threadpool_test():
start_time = time.time()
pool = ThreadPool(100)
test = list()
for itr in range(100):
test.append(itr)
requests = makeRequests(on_start, test)
[pool.putRequest(req) for req in requests]
pool.wait()
print('%d second'% (time.time() - start_time))

in ws-client src, we see:

  • on_open: callable object which is called at opening websocket. this function has one argument. The argument is this class object. but all customized callback func can add more arguments, which is helpful.

  • on_message: callable object which is called when received data. on_message has 2 arguments. The 1st argument is this class object. the 2nd argument is utf-8 string which we get from the server.

we can implement a simple sqlalchemy orm db-writer, and add to the ws-server:

async def process(self, websocket, path):
     raw_ = await websocket.recv()
     jdata = json.loads(raw_)
     orm_obj = orm_(jdata)
     try:
         self.dbwriter_.write(orm_obj)
         print(jdata, "write to db successfully")
     except Exception as e:
         dbwriter_.rollback()
         print(e)

     greeting = "hello from server"
     await websocket.send(greeting)
     print(f"> {greeting}")

 def run(self):
     if self.host and self.port :
         start_server = websockets.serve(self.process, self.host, self.port)
     else:
         start_server = websockets.serve(self.process, "localhost", 8867)
     asyncio.get_event_loop().run_until_complete(start_server)
     asyncio.get_event_loop().run_forever()

in summary

in reality, each ws-client is integrated to one upper application, which generate messages/log, and send to ws-server, inside which write to db, due to asyncio, the performance is good so far. in future, we maybe need some buffer at ws-server.

refer

a simple multi-client ws server

create a simple python sw server using Tornado

python: websockets

python: json

python: threadpool