moduo 2

The several classes defined in Moduo: TcpServer, Acceptor, TcpConnection, EventLoop, epoller, channel, eventLoopThread, eventLoopThreadPool.

1) TcpServer construction

each TcpServer suppose have multi- tcp connections, the acceptor works as the main I/O thread, which listen the server-side I/O socket, and handle all client input connections at first, (later will assign the connection task to each worker thread from pool). so the input argument “loop” during TcpServer construction is the main I/O eventLoop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)), ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)), connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}
```
one advantage of bind/callback is to import functors to different class domain. and server has always one I/O socket, but client-socket-fd suppose be a lot.
## 2) TcpServer newConnection
Each new tcp connection, will assign a worker thread to execute the speical callback functor for this new connection, How to schedule thread in threadpool is implemented by round-robin, which are implemented as:
```c
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
EventLoopThreadPool::getNextLoop()

does each worker thread run EventLoop.loop(), which will call epoll_wait(), and execute all active channels? suppose no.

3) TcpConnection construction

each new client connection will reponse to a new socket fd, and a new channel. (channel is actually the container of socket fd); and the functors on this connection is also imported to channel callbacks. basically from outside, we only see channel objects, TcpConnection object is the inner class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

so where are channel (read/write/error/close) handleEvent callbacks triggered? it is during the eventloop.loop, after epoller return the active events, based on the status of each revent, special handleEvent is called.

4) eventLoop construction

during constrution of eventLoop, a new poller is created based on this eventLoop itself, also a new wakeupfd and a new wakeupChannel. the purpose of wakeup fd/channel is to immediately wake up the work thread, rather than waiting till PollTime. and the wakeupfd bind handleRead, in which read one byte to make this wakeupfd I/O readable, which then is ready for I/O.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd
wakeupChannel_->enableReading();
}

“activeChannels” suppose to be a class variable, which is shared by eventLoop objects, but it’s ok to keep a copy for each eventLoop object to avoid multi-thread competing. and suppose epoll_wait() is thread-safe, so later during construction of eventLoopThreadPool, multi eventLoopThreads won’t conflict with “active channels”

5) epoller construction

epoller object is created during eventloop construction. since the three interface of epoll instance are thread-safe, they look like global funcs.

epoll_create(), return an epfd referring to the new epoll instance, this epfd is used by all subsequent calsl to the epoll interface.

epoll_wait(), return all ready events on the epoll instance referred by epfd.

epoll_ctl(), traverse the red-black tree strucutre to return the existing fd, or add new fd to the tree.

6) channel construction

channel is the container of one fd, and is related to one eventLoop. channel is not responsible to create/delete fd, the real owner of each fd is TcpConnection or acceptor. Channel object works like a pipe to send fd from inner object TcpConnection to the eventLoop. the advantage here is eventLoop is independent from connections.

7) channel:update()

1
2
3
4
channel::update() -->
loop->updateChannel() -->
poller->updateChannel() -->
//poller maintain a channel list, call epoll_ctl to add/return/delete the requiring channel

8) one epoll + threadpool vs per thread per epoll

the first method, one global epoll listens all new connections, and send each connection callback to a new thread to execute. method 2, to listen the server I/O socket need to bind a unique epoll, in Moduo which is the acceptor epoll. then all client connection socket will be dealt with their own worker epoll.