Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 64 additions & 38 deletions http2/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <nghttp2/nghttp2.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

Expand Down Expand Up @@ -44,9 +44,7 @@ struct ClientSession {
struct event_base* evbase;

int requests_completed;
int requests_in_flight;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing for now

int total_requests_submitted;
struct timeval start_time;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

std::map<int32_t, std::chrono::steady_clock::time_point> request_start_times;
std::vector<double> latencies;
};
Expand All @@ -62,27 +60,31 @@ static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size
}

static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) {
struct ClientSession* client = (struct ClientSession*)user_data;

VLOG(2) << "Client received frame type " << static_cast<int>(frame->hd.type) << " on stream " << frame->hd.stream_id;
VLOG(2) << "Received frame type " << static_cast<int>(frame->hd.type) << " on stream " << frame->hd.stream_id;

if (frame->hd.type == NGHTTP2_DATA && (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
client->requests_completed++;
client->requests_in_flight--;
struct ClientSession* client = (struct ClientSession*)user_data;

auto it = client->request_start_times.find(frame->hd.stream_id);
if (it != client->request_start_times.end()) {
auto end_time = std::chrono::steady_clock::now();
double duration = std::chrono::duration<double, std::milli>(end_time - it->second).count();
client->latencies.push_back(duration);
client->request_start_times.erase(it);
if (frame->hd.type == NGHTTP2_DATA) {
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
client->requests_completed++;

auto it = client->request_start_times.find(frame->hd.stream_id);
if (it != client->request_start_times.end()) {
auto end_time = std::chrono::steady_clock::now();
double duration = std::chrono::duration<double, std::milli>(end_time - it->second).count();
client->latencies.push_back(duration);
client->request_start_times.erase(it);
}

int loopexit_result = event_base_loopexit(client->evbase, NULL);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding return value checking for ones that may fail with an error

if (loopexit_result != 0) {
LOG(FATAL) << "event_base_loopexit() failed";
}
}

event_base_loopexit(client->evbase, NULL);
} else if (frame->hd.type == NGHTTP2_HEADERS && (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
// If server sent end stream on headers (e.g., error without body)
LOG(INFO) << "Client received END_STREAM on headers";
LOG(FATAL) << "Received END_STREAM on headers unexpectedly";
}

return 0;
}

Expand Down Expand Up @@ -131,44 +133,60 @@ static void submit_request(struct ClientSession* client) {
data_prd.source.ptr = bytes_sent;
data_prd.read_callback = data_provider_callback;

int32_t stream_id = nghttp2_submit_request(client->session, NULL, hdrs, 4, &data_prd, bytes_sent);
if (stream_id < 0) {
LOG(FATAL) << "nghttp2_submit_request error";
delete bytes_sent;
int32_t submit_result = nghttp2_submit_request(client->session, NULL, hdrs, 4, &data_prd, bytes_sent);
if (submit_result < 0) {
LOG(FATAL) << "nghttp2_submit_request() failed: " << nghttp2_strerror(submit_result);
} else {
client->requests_in_flight++;
client->total_requests_submitted++;
client->request_start_times[stream_id] = std::chrono::steady_clock::now();
client->request_start_times[submit_result] = std::chrono::steady_clock::now();
}

int send_result = nghttp2_session_send(client->session);
if (send_result < 0) {
LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result);
}
nghttp2_session_send(client->session);
}

static void readcb(struct bufferevent* bev, void* ptr) {
struct ClientSession* client = (struct ClientSession*)ptr;

struct evbuffer* input = bufferevent_get_input(bev);
size_t datalen = evbuffer_get_length(input);
VLOG(2) << "Client readcb: got " << datalen << " bytes";
VLOG(2) << "readcb(): got " << datalen << " bytes";

if (datalen == 0) return;

unsigned char* data = evbuffer_pullup(input, -1);
ssize_t readlen = nghttp2_session_mem_recv(client->session, data, datalen);
ssize_t readlen = nghttp2_session_mem_recv2(client->session, data, datalen);
if (readlen < 0) {
LOG(FATAL) << "Client: nghttp2_session_mem_recv error: " << nghttp2_strerror(static_cast<int>(readlen));
return;
LOG(FATAL) << "nghttp2_session_mem_recv2() failed: " << nghttp2_strerror(static_cast<int>(readlen));
}

VLOG(2) << "Client readcb: consumed " << readlen << " bytes";
VLOG(2) << "readcb(): consumed " << readlen << " bytes";

evbuffer_drain(input, readlen);
nghttp2_session_send(client->session);

int send_result = nghttp2_session_send(client->session);
if (send_result < 0) {
LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result);
}
}

static void eventcb(struct bufferevent* bev, short events, void* ptr) {
struct ClientSession* client = (struct ClientSession*)ptr;

if (events & BEV_EVENT_CONNECTED) {
int fd = bufferevent_getfd(bev);
if (fd < 0) {
LOG(FATAL) << "bufferevent_getfd() failed";
}

int nodelay = 1;
int setopt_result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
if (setopt_result != 0) {
LOG(FATAL) << "setsockopt(TCP_NODELAY) failed";
}

nghttp2_session_callbacks* callbacks;
nghttp2_session_callbacks_new(&callbacks);

Expand All @@ -181,12 +199,15 @@ static void eventcb(struct bufferevent* bev, short events, void* ptr) {
nghttp2_session_callbacks_del(callbacks);

// Send connection preface and initial settings
nghttp2_settings_entry iv[1] = {
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 128 * 1024 * 1024}};
nghttp2_submit_settings(client->session, NGHTTP2_FLAG_NONE, iv, 1);
nghttp2_submit_window_update(client->session, NGHTTP2_FLAG_NONE, 0, 128 * 1024 * 1024);
nghttp2_settings_entry iv[2] = {
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 15 * 1024 * 1024},
{NGHTTP2_SETTINGS_MAX_FRAME_SIZE, 1 << 20}};
int submit_settings_result = nghttp2_submit_settings(client->session, NGHTTP2_FLAG_NONE, iv, 2);
if (submit_settings_result < 0) {
LOG(FATAL) << "nghttp2_submit_settings() failed: " << nghttp2_strerror(submit_settings_result);
}

gettimeofday(&client->start_time, NULL);
nghttp2_session_set_local_window_size(client->session, NGHTTP2_FLAG_NONE, 0, 15 * 1024 * 1024);

event_base_loopexit(client->evbase, NULL);

Expand All @@ -205,7 +226,6 @@ static void BM_HTTP2Client(benchmark::State& state) {

ClientSession client = {};
client.requests_completed = 0;
client.requests_in_flight = 0;
client.total_requests_submitted = 0;

struct event_base* base = event_base_new();
Expand Down Expand Up @@ -259,6 +279,12 @@ static void BM_HTTP2Client(benchmark::State& state) {
BENCHMARK(BM_HTTP2Client)
->Args({0})
->Args({1 << 10})
->Args({2 << 10})
->Args({4 << 10})
->Args({8 << 10})
->Args({16 << 10})
->Args({32 << 10})
->Args({64 << 10})
->Args({128 << 10});

int main(int argc, char** argv) {
Expand Down
Loading