-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathEventLoop.cpp
More file actions
152 lines (127 loc) · 3.67 KB
/
EventLoop.cpp
File metadata and controls
152 lines (127 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include "EventLoop.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "utils/SocketUtils.h"
#include "utils/Logging.h"
__thread EventLoop *t_loopInThisThread=0;
int createEventFd(){
int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(evtfd < 0){
LOG<<"Failed in eventfd create. Threads can connect!";
abort();
}
return evtfd;
}
EventLoop::EventLoop(): looping_(false), epoller_(new Epoll()), wakeupFd_(createEventFd()), quit_(false), eventHandling_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), pwakeupEvent_(new Event(this, wakeupFd_)){
if(!t_loopInThisThread){
t_loopInThisThread=this;
}
// set wakeup eventfd
// set event of interest on wakeup fd.
pwakeupEvent_->setEvents(EPOLLIN | EPOLLET);
pwakeupEvent_->setReadHandler(std::bind(&EventLoop::readWakeupFd,this));
pwakeupEvent_->setConnHandler(std::bind(&EventLoop::handleConn,this));
// register wakeupEvent to epoll Kernel.
epoller_->epoll_add(pwakeupEvent_,0);
}
EventLoop::~EventLoop(){
close(wakeupFd_);
t_loopInThisThread = NULL;
}
void EventLoop::wakeup(){
// write something to eventfd and wakeup other thread epoll
uint64_t one = 1;
ssize_t n = writen(wakeupFd_, (char*)(&one), sizeof(one));
if(n != sizeof(one)){
LOG<<"EventLoop::wakeup() writes "<<n<<" bytes instead of 8";
}
}
void EventLoop::readWakeupFd(){
uint64_t one = 1;
ssize_t n = readn(wakeupFd_, &one, sizeof(one));
if(n != sizeof(one)){
LOG<<"EventLoop::readWakeupFd() writes "<<n<<" bytes instead of 8";
}
pwakeupEvent_->setEvents(EPOLLIN | EPOLLET);
}
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for(size_t i = 0; i<functors.size();++i) functors[i]();
callingPendingFunctors_ = false;
}
void EventLoop::handleConn(){
epoller_->epoll_mod(pwakeupEvent_,0);
}
void EventLoop::loop(){
assert(!looping_);
assert(isInLoopThread());
looping_ = true;
quit_ = false;
std::vector<SP_Event> ret;
while(!quit_){
ret.clear();
// get event from epoll in this thread
ret = epoller_->exec_epoll_wait();
eventHandling_ = true;
// do event in this thread
for(auto& it:ret) it->handleEvents();
eventHandling_ = false;
// do event which registered from other thread
doPendingFunctors();
epoller_->handleExpired();
}
looping_=false;
}
void EventLoop::quit(){
quit_=true;
if(!isInLoopThread()){
wakeup();
}
}
// runInLoop and queueInLoop is copy from muduo
void EventLoop::runInLoop(Functor&& callBack){
if(isInLoopThread()){
callBack();
}else{
queueInLoop(std::move(callBack));
}
}
void EventLoop::queueInLoop(Functor&& callBack){
// this function will be call by main thread.
// need to use mutex to ensure synchronization.
{
MutexLockGuard lock(mutex_);
pendingFunctors_.emplace_back(std::move(callBack));
}
// if call by other(main) thread
// or
// doing pending function(1).
//
// (1) when calling pending function,
// the function may be queueInLoop again.
// wakeup in advance,
// and after complete callingPendingFunctors,
// this thread will continue to do.
if(!isInLoopThread() || callingPendingFunctors_) wakeup();
}
void EventLoop::assertInLoopThread(){
assert(isInLoopThread());
}
void EventLoop::shutdown(std::shared_ptr<Event> event){
// half close for all thread which hold this socket
shutDownWR(event->getFd());
}
void EventLoop::removeFromEpoller(std::shared_ptr<Event> event){
epoller_->epoll_del(event);
}
void EventLoop::updateEpoller(std::shared_ptr<Event> event, int timeout){
epoller_->epoll_mod(event,timeout);
}
void EventLoop::addToEpoller(std::shared_ptr<Event> event, int timeout){
epoller_->epoll_add(event,timeout);
}