Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions http2/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,46 @@ struct ClientSession {
int requests_completed;
int total_requests_submitted;
std::map<int32_t, std::chrono::steady_clock::time_point> request_start_times;
std::map<int32_t, size_t> response_sizes;
std::vector<double> latencies;
};

static void submit_request(struct ClientSession* client);

static void complete_request(struct ClientSession* client, int32_t stream_id) {
size_t response_size = 0;
auto response_size_it = client->response_sizes.find(stream_id);
if (response_size_it != client->response_sizes.end()) {
response_size = response_size_it->second;
client->response_sizes.erase(response_size_it);
}

size_t expected_size = payload.length();
int* bytes_sent = static_cast<int*>(nghttp2_session_get_stream_user_data(client->session, stream_id));
if (bytes_sent) {
expected_size = static_cast<size_t>(*bytes_sent);
}

if (response_size != expected_size) {
LOG(FATAL) << "Echoed response length mismatch on stream " << stream_id << ": expected " << expected_size << " bytes, got " << response_size << " bytes";
}

client->requests_completed++;

auto it = client->request_start_times.find(stream_id);
if (it != client->request_start_times.end()) {
auto end_time = std::chrono::steady_clock::now();
double duration = std::chrono::duration<double, std::milli>(end_time - it->second).count();
client->latencies.push_back(duration);
client->request_start_times.erase(it);
}

int loopexit_result = event_base_loopexit(client->evbase, NULL);
if (loopexit_result != 0) {
LOG(FATAL) << "event_base_loopexit() failed";
}
}

static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data) {
struct ClientSession* client = (struct ClientSession*)user_data;
struct bufferevent* bev = client->bev;
Expand All @@ -66,28 +101,22 @@ static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame*

if (frame->hd.type == NGHTTP2_DATA) {
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
client->requests_completed++;

auto it = client->request_start_times.find(frame->hd.stream_id);
if (it != client->request_start_times.end()) {
auto end_time = std::chrono::steady_clock::now();
double duration = std::chrono::duration<double, std::milli>(end_time - it->second).count();
client->latencies.push_back(duration);
client->request_start_times.erase(it);
}

int loopexit_result = event_base_loopexit(client->evbase, NULL);
if (loopexit_result != 0) {
LOG(FATAL) << "event_base_loopexit() failed";
}
complete_request(client, frame->hd.stream_id);
}
} else if (frame->hd.type == NGHTTP2_HEADERS && (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) {
LOG(FATAL) << "Received END_STREAM on headers unexpectedly";
complete_request(client, frame->hd.stream_id);
}

return 0;
}

static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len,
void* user_data) {
struct ClientSession* client = (struct ClientSession*)user_data;
client->response_sizes[stream_id] += len;
return 0;
}

static ssize_t data_provider_callback(nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) {
int* bytes_sent = (int*)source->ptr;
size_t payload_len = payload.length();
Expand All @@ -111,6 +140,9 @@ static ssize_t data_provider_callback(nghttp2_session* session, int32_t stream_i
}

static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data) {
struct ClientSession* client = (struct ClientSession*)user_data;
client->response_sizes.erase(stream_id);

int* bytes_sent = (int*)nghttp2_session_get_stream_user_data(session, stream_id);
if (bytes_sent) {
delete bytes_sent;
Expand Down Expand Up @@ -192,6 +224,7 @@ static void eventcb(struct bufferevent* bev, short events, void* ptr) {

nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);

nghttp2_session_client_new(&client->session, callbacks, client);
Expand Down