background
recently thinking about how to design middleware and frameworks in self-driving team, just like the back-end service in most web servicees, which gives the way to go uppper and go abstract. currently many start-up ADS team, espcially Internet-based start-ups, have built-in ADS frameworks and middlewares, to support their daily dev and easily to implement new features. here is an old topic, redis task queue, how to design a robost data pipeline
to support ADAS/ADS functions virtual test with physical collected road data.
the experience to put lgsvl into swarm/k8s brings the idea about micro-services
, which is a great way to decouple a large work to a few small pieces of independent but communicatable services. so when coming to handle large data set, which is very common in ADS dev.
so the first idea is to decouple the data pipeline
as a few micro-services: reader service, process service, post-analysis service e.t.c
then two questions immediately up: which data/message type fits, which network communication protocol fits.and beyond these two basic questions, also need a lot work about message adapter among/in services. previously, I designed websocket and json
solution. but it’s too tedious to plug in a ws-server/client
at the front/end of each service, especially as the number of serivces grows.
take it back, data pipeline
is a heavy data IO work, is it really smart to split the work into a few pieces, then find the network communication among them ? we increase the system complex by introducing additional network communcation moduels, and the only benefit is decouple a heavy data IO work. and more, the network modules need consider cache, job scheduler, load balance issues, as the data process service may take much longer than reader services.
traditionally, heavy data IO work is common run in batch processing
, disregarding network issues, and it’s better to run directly in memory/cache. so I go to rq
interprocess communication in distributed system
MPI
is the standard for IPC in HPC apps, of course there are Linux IPC
libs, which brings more low-level ipc APIs. MPI
apps mostly run on high performance computing cluster, which has the samilar API e.g. Allreduce
as Hadoop/MapReduce
, while the difference MPI/allReduce
doesn’t tolerate failure, which means any node failed, the MPI
apps failed. Which is the foundmental difference from HPC to distributed system nowadays, really popular as the new infrastructure for cloud and AI.
in the distributed system, there are a few ways to do interprocess communication:
RESTful protocol, such as TCP, UDP, websocket.
async communication, there are different ways to implement async interprocess communication, one way is
message queue
, of course many language, e.g. js, go have some light-weight libs/framework to support ansyc communication interprocessly.rpc, thrift is an Apache project, grpc is high efficient with protobuf, but it doesn’t support well service discovery/load balance mechanism inside, which is a limitation in cloud-native applications. dubbo has a better design for service discovery and load balance, the message type by default is
json
. so all of these can be the corner-stone service in modern micro service envs. also the commonmicro-service framework
, e.g. Spring Cloud has interprocess communication component as well.
for data hungry services, batch processing
frameworks, e.g. Spring Batch, Linux Parallel should also consider.
rq
the following is from rq doc
Queues
a job
is a Python object, namely a function that is invoked async in a worker process. enqueueing
is simply pushing a reference to the func and its ars onto a queue.
we can add as many Queue instance as we need in one Redis instance, the Queue instance can’t tell each other, but they are hosted in the same redis instance, which gives the way to find jobs binding to Queue1 in worker2 from Queue2
jobs
|
|
- timeout: specifies the max runtime of job before it’s interrupted and marked as failed.
- ttl: specifies the maximum queued time(in sec) of the job before it’s dscarded. default is None(infinite TTL)
- failure_ttl: specifies how long(in sec) failed jobs are kept(default to 1 years)
the following sample is a way to find all rq:job:
s, but the return is a bytes object, which need encode as utf-8
for any further usage.
|
|
workers
workers will read jobs from the given queues(the order is important) in an endless loop, waiting for new work to arrive when all jobs done. each worker will process a single job at a time. by default, workers will start working immediately and wait until new jobs. another mode is burst
, where to finish all currently avaiable work and quit asa all given queues are emptied.
rq worker
shell script is a simple fetch-fork-execute
loop
connections
when you want to use multiple connections, you should use Connection
contexts or pass connections around explicitly.
|
|
Every job that is enqueued on a queue will know what connection it belongs to. The same goes for the workers.
within the Connection context, every newly created RQ object instance will have the connection argument set implicitly.
|
|
this should be the way to handle distributed queues.
results
if a job returns a non-None
value, the worker will write that return value back to the job’s Redis hash under result
key. the job’s Redis hash itself expire in 500sec by default after the job is finished.
|
|
when an exception is thrown inside a job, it’s caught by the worker, serialized and stored under exc_info
key. By default, jobs should execute within 180 seconds. After that, the worker kills the work horse and puts the job onto the failed queue, indicating the job timed out.
|
|
job registries
each queue maintains a set of Job Registries. e.g. StartedJobRegistry
, FinishedJobRegistry
e.t.c. we can find these after log in redis-cli
version bug
when run rq
demo, it reports:
|
|
manually change /rq/registry.py
:
|
|
data pipeline for ADS function verification
- queues
each queue instance can be taken as a separate namespace in the Redis instance, so the workers only process the jobs in the same queue. but if multi-queues are hosted in the same Redis instance, then Redis api can find Queue A’s jobs in Queue B’s workers.
|
|
- jobs
if the handler_fun has return values, namely status. rq
store its status at job.result
, thle lifecycle of which can be controlled by result_ttl
, e.t.c.
to control the order of running the jobs, jobs can have depend
.
|
|
- workers
|
|
- runners
|
|
since the output of mf4
jobs is the input of aeb
, so we need runners
, similar for db
.
summary
rq
is a good framework for this level data pipleine. for even bigger and complex system, rq
maybe just a middleware, and there should be an even larger framework. the software engineering idea about framework
and middleware
in a large system gradually become the foundataion of ADS team
in distributed system, there are a few basic concepts distributed consensus
, the popular choice e.g. zookeeper, etcd; interprocess communication
, distributed cache
e.t.c. really cool to know.