This repository was archived by the owner on Jan 11, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTransmitterPool.cpp
More file actions
79 lines (69 loc) · 2.17 KB
/
TransmitterPool.cpp
File metadata and controls
79 lines (69 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <iostream>
#include <chrono>
#include <thread>
#include "common.h"
#include "TransmitterPool.h"
#include "NanomsgTransmitter.h"
#include "ZeroMQTransmitter.h"
#include "VMATransmitter.h"
#include "RsocketsTransmitter.h"
namespace transport {
// Default constructor
TransmitterPool::TransmitterPool(Transport transport, const std::string &address, const size_t port = 24242, const size_t threads = 1) {
switch (transport) {
case Transport::TRANSPORT_NANOMSG:
cout << "starting " << threads << " Nanomsg transmitter threads" << endl;
for (size_t i=0; i< threads; i++) {
transmitters.emplace_back(std::thread(NanomsgTransmitter(i, address, port +i, *this)));
}
break;
case Transport::TRANSPORT_ZMQ:
cout << "starting " << threads << " ZeroMQ transmitter threads" << endl;
for (size_t i=0; i< threads; i++) {
transmitters.emplace_back(std::thread(ZeroMQTransmitter(i, address, port +i, *this)));
}
break;
case Transport::TRANSPORT_VMA:
cout << "starting " << threads << " VMA transmitter threads" << endl;
for (size_t i=0; i< threads; i++) {
transmitters.emplace_back(std::thread(VMATransmitter(i, address, port +i, *this)));
}
break;
case Transport::TRANSPORT_RDMACM:
cout << "starting " << threads << " Rsockets transmitter threads" << endl;
for (size_t i=0; i< threads; i++) {
transmitters.emplace_back(std::thread(RsocketsTransmitter(i, address, port +i, *this)));
}
break;
}
/*cout << "Starting Statistic thread" << endl;
std::thread([this]() {
while (true)
{
print_stats();
std::this_thread::sleep_for(interval);
}
}).detach();*/
}
TransmitterPool::~TransmitterPool() {
for (auto &transmitter: transmitters)
transmitter.join();
}
TransmitterPool &
TransmitterPool::Instance(Transport transport, const std::string &address, const size_t port, const size_t threads) {
static TransmitterPool pool{transport, address, port, threads};
return pool;
}
size_t
TransmitterPool::size() {
return transmitters.size();
}
void
TransmitterPool::send(MessageItem &msg) {
msgQueue.enqueue(msg);
}
void
TransmitterPool::shutdown() {
msgQueue.enqueue(std::make_pair<void *, size_t>(nullptr, 0));
}
}