-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreactor_multithread.cpp
More file actions
103 lines (84 loc) · 3.34 KB
/
reactor_multithread.cpp
File metadata and controls
103 lines (84 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <iostream>
#include <memory>
#include <unordered_map>
#include <vector>
#include <functional>
#include <unistd.h>
#include <cassert>
#include <thread>
using namespace std;
#include "dispatcher.h"
#include "tcp_listener.h"
#include "block_queue.h"
#include "connection_channel.h"
#include "acceptor_channel.h"
void on_message(TcpConnection& conn) {
if (!conn.buffer_out.eof()) {
string msg = conn.buffer_read_line();
cout << "Tcp Server on message: " << msg;
// handle_write will be called because the write event occurs and will be returned together with the read event
// the most correct approach is always to try with blocking send first
// and if the write operation is not available, add it to the dispatcher and let it notify
// this approach will also improve the performance when using poll dispatcher and epoll dispatcher with level-triggered
conn.buffer_write(msg);
// conn.blocking_send(msg);
// sleep(10);
}
}
void on_connection_closed(TcpConnection& conn) {
cout << "Tcp Server on connection closed.\n";
}
void worker(const string &name, block_queue<ChannelOp> &work_queue) {
unordered_map<int, unique_ptr<Channel>> channel_map;
unique_ptr<Dispatcher> dispatcher{new EpollDispatcher()};
while (true) {
auto conn_op = work_queue.try_pop();
if (conn_op.has_value()) {
cout << "hi\n";
assert(conn_op->op == CHANNEL_ADD);
assert(conn_op->channel_type == CHANNEL_CONN);
auto *channel = new ConnectionChannel(conn_op->fd);
channel->message_callback = on_message;
channel->connection_closed_callback = on_connection_closed;
dispatcher->add(channel);
channel_map[conn_op->fd] = unique_ptr<Channel>(channel);
cout << "[worker " << name << "] new connection established: fd=" << conn_op->fd << endl;
}
vector<DispatcherEvent> events = move(dispatcher->dispatch());
for (auto &event : events) {
if (!channel_map.count(event.fd)) {
continue;
}
Channel *channel = channel_map[event.fd].get();
vector<ChannelOp> channel_ops{channel->on_event(event.revents)};
cout << "[worker " << name << "] handle event\n";
for (auto &op : channel_ops) {
assert(op.op == CHANNEL_REMOVE);
Channel *channel = channel_map[op.fd].get();
dispatcher->remove(channel);
channel_map.erase(op.fd);
close(op.fd);
cout << "[worker " << name << "] remove conn fd=" << op.fd << endl;
}
}
}
}
int main() {
block_queue<ChannelOp> work_queue;
unique_ptr<Dispatcher> dispatcher{new EpollDispatcher()};
unique_ptr<Channel> acceptor{new AcceptorChannel(8888)};
dispatcher->add(acceptor.get());
thread worker1{worker, "1", ref(work_queue)};
thread worker2{worker, "2", ref(work_queue)};
while (true) {
vector<DispatcherEvent> events = move(dispatcher->dispatch());
for (auto &event : events) {
vector<ChannelOp> channel_ops{acceptor->on_event(event.revents)};
for (auto &op : channel_ops) {
cout << "push op\n";
work_queue.push(op);
}
}
}
return 0;
}