diff --git a/http2/client.cc b/http2/client.cc index d31a210..c12f96b 100644 --- a/http2/client.cc +++ b/http2/client.cc @@ -3,12 +3,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include @@ -44,9 +44,7 @@ struct ClientSession { struct event_base* evbase; int requests_completed; - int requests_in_flight; int total_requests_submitted; - struct timeval start_time; std::map request_start_times; std::vector latencies; }; @@ -62,27 +60,31 @@ static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size } static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) { - struct ClientSession* client = (struct ClientSession*)user_data; - - VLOG(2) << "Client received frame type " << static_cast(frame->hd.type) << " on stream " << frame->hd.stream_id; + VLOG(2) << "Received frame type " << static_cast(frame->hd.type) << " on stream " << frame->hd.stream_id; - if (frame->hd.type == NGHTTP2_DATA && (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) { - client->requests_completed++; - client->requests_in_flight--; + struct ClientSession* client = (struct ClientSession*)user_data; - auto it = client->request_start_times.find(frame->hd.stream_id); - if (it != client->request_start_times.end()) { - auto end_time = std::chrono::steady_clock::now(); - double duration = std::chrono::duration(end_time - it->second).count(); - client->latencies.push_back(duration); - client->request_start_times.erase(it); + if (frame->hd.type == NGHTTP2_DATA) { + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { + client->requests_completed++; + + auto it = client->request_start_times.find(frame->hd.stream_id); + if (it != client->request_start_times.end()) { + auto end_time = std::chrono::steady_clock::now(); + double duration = std::chrono::duration(end_time - it->second).count(); + client->latencies.push_back(duration); + client->request_start_times.erase(it); + } + + int loopexit_result = event_base_loopexit(client->evbase, NULL); + if (loopexit_result != 0) { + LOG(FATAL) << "event_base_loopexit() failed"; + } } - - event_base_loopexit(client->evbase, NULL); } else if (frame->hd.type == NGHTTP2_HEADERS && (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)) { - // If server sent end stream on headers (e.g., error without body) - LOG(INFO) << "Client received END_STREAM on headers"; + LOG(FATAL) << "Received END_STREAM on headers unexpectedly"; } + return 0; } @@ -131,16 +133,18 @@ static void submit_request(struct ClientSession* client) { data_prd.source.ptr = bytes_sent; data_prd.read_callback = data_provider_callback; - int32_t stream_id = nghttp2_submit_request(client->session, NULL, hdrs, 4, &data_prd, bytes_sent); - if (stream_id < 0) { - LOG(FATAL) << "nghttp2_submit_request error"; - delete bytes_sent; + int32_t submit_result = nghttp2_submit_request(client->session, NULL, hdrs, 4, &data_prd, bytes_sent); + if (submit_result < 0) { + LOG(FATAL) << "nghttp2_submit_request() failed: " << nghttp2_strerror(submit_result); } else { - client->requests_in_flight++; client->total_requests_submitted++; - client->request_start_times[stream_id] = std::chrono::steady_clock::now(); + client->request_start_times[submit_result] = std::chrono::steady_clock::now(); + } + + int send_result = nghttp2_session_send(client->session); + if (send_result < 0) { + LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result); } - nghttp2_session_send(client->session); } static void readcb(struct bufferevent* bev, void* ptr) { @@ -148,27 +152,41 @@ static void readcb(struct bufferevent* bev, void* ptr) { struct evbuffer* input = bufferevent_get_input(bev); size_t datalen = evbuffer_get_length(input); - VLOG(2) << "Client readcb: got " << datalen << " bytes"; + VLOG(2) << "readcb(): got " << datalen << " bytes"; if (datalen == 0) return; unsigned char* data = evbuffer_pullup(input, -1); - ssize_t readlen = nghttp2_session_mem_recv(client->session, data, datalen); + ssize_t readlen = nghttp2_session_mem_recv2(client->session, data, datalen); if (readlen < 0) { - LOG(FATAL) << "Client: nghttp2_session_mem_recv error: " << nghttp2_strerror(static_cast(readlen)); - return; + LOG(FATAL) << "nghttp2_session_mem_recv2() failed: " << nghttp2_strerror(static_cast(readlen)); } - VLOG(2) << "Client readcb: consumed " << readlen << " bytes"; + VLOG(2) << "readcb(): consumed " << readlen << " bytes"; evbuffer_drain(input, readlen); - nghttp2_session_send(client->session); + + int send_result = nghttp2_session_send(client->session); + if (send_result < 0) { + LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result); + } } static void eventcb(struct bufferevent* bev, short events, void* ptr) { struct ClientSession* client = (struct ClientSession*)ptr; if (events & BEV_EVENT_CONNECTED) { + int fd = bufferevent_getfd(bev); + if (fd < 0) { + LOG(FATAL) << "bufferevent_getfd() failed"; + } + + int nodelay = 1; + int setopt_result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); + if (setopt_result != 0) { + LOG(FATAL) << "setsockopt(TCP_NODELAY) failed"; + } + nghttp2_session_callbacks* callbacks; nghttp2_session_callbacks_new(&callbacks); @@ -181,12 +199,15 @@ static void eventcb(struct bufferevent* bev, short events, void* ptr) { nghttp2_session_callbacks_del(callbacks); // Send connection preface and initial settings - nghttp2_settings_entry iv[1] = { - {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 128 * 1024 * 1024}}; - nghttp2_submit_settings(client->session, NGHTTP2_FLAG_NONE, iv, 1); - nghttp2_submit_window_update(client->session, NGHTTP2_FLAG_NONE, 0, 128 * 1024 * 1024); + nghttp2_settings_entry iv[2] = { + {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 15 * 1024 * 1024}, + {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, 1 << 20}}; + int submit_settings_result = nghttp2_submit_settings(client->session, NGHTTP2_FLAG_NONE, iv, 2); + if (submit_settings_result < 0) { + LOG(FATAL) << "nghttp2_submit_settings() failed: " << nghttp2_strerror(submit_settings_result); + } - gettimeofday(&client->start_time, NULL); + nghttp2_session_set_local_window_size(client->session, NGHTTP2_FLAG_NONE, 0, 15 * 1024 * 1024); event_base_loopexit(client->evbase, NULL); @@ -205,7 +226,6 @@ static void BM_HTTP2Client(benchmark::State& state) { ClientSession client = {}; client.requests_completed = 0; - client.requests_in_flight = 0; client.total_requests_submitted = 0; struct event_base* base = event_base_new(); @@ -259,6 +279,12 @@ static void BM_HTTP2Client(benchmark::State& state) { BENCHMARK(BM_HTTP2Client) ->Args({0}) ->Args({1 << 10}) + ->Args({2 << 10}) + ->Args({4 << 10}) + ->Args({8 << 10}) + ->Args({16 << 10}) + ->Args({32 << 10}) + ->Args({64 << 10}) ->Args({128 << 10}); int main(int argc, char** argv) { diff --git a/http2/server.cc b/http2/server.cc index 753b8a8..d867ff4 100644 --- a/http2/server.cc +++ b/http2/server.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -13,19 +14,18 @@ #include #include +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" #include "absl/log/initialize.h" #include "absl/log/log.h" +ABSL_FLAG(int, port, 8080, "Port to listen on"); + #define MAKE_NV(name, value) \ { \ (uint8_t*)(name), (uint8_t*)(value), strlen(name), strlen(value), \ NGHTTP2_NV_FLAG_NONE} -#define PORT 8080 - -struct app_context; -struct stream_data; - struct ClientSession { struct bufferevent* bev; nghttp2_session* session; @@ -45,40 +45,64 @@ static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size struct ClientSession* client = (struct ClientSession*)user_data; struct bufferevent* bev = client->bev; struct evbuffer* output = bufferevent_get_output(bev); - evbuffer_add(output, data, length); + int add_result = evbuffer_add(output, data, length); + if (add_result < 0) { + LOG(FATAL) << "evbuffer_add() failed"; + } return (ssize_t)length; } static ssize_t echo_data_provider_callback(nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data); static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) { + VLOG(2) << "Received frame type " << static_cast(frame->hd.type); + struct ClientSession* client = (struct ClientSession*)user_data; -// LOG(INFO) << "Server received frame type " << static_cast(frame->hd.type); + switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { struct stream_data* sd = new stream_data; + sd->stream_id = frame->hd.stream_id; sd->req_body = evbuffer_new(); - nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, sd); + + int set_stream_user_data_result = nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, sd); + if (set_stream_user_data_result < 0) { + LOG(FATAL) << "nghttp2_session_set_stream_user_data() failed: " << nghttp2_strerror(set_stream_user_data_result); + } + + int send_result = nghttp2_session_send(session); // flush to output + if (send_result < 0) { + LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result); + } } break; case NGHTTP2_DATA: - // LOG(INFO) << "Server logic: DATA frame on stream " << frame->hd.stream_id << ", flags " << static_cast(frame->hd.flags); + VLOG(2) << "Received DATA frame on stream " << frame->hd.stream_id << ", flags " << static_cast(frame->hd.flags); + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - // LOG(INFO) << "Server logic: END_STREAM seen"; + VLOG(1) << "Received END_STREAM"; + const nghttp2_nv hdrs[] = { MAKE_NV(":status", "200")}; struct stream_data* sd = (struct stream_data*)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (sd) { - nghttp2_data_provider data_prd; - data_prd.source.ptr = sd; - data_prd.read_callback = echo_data_provider_callback; - int rv = nghttp2_submit_response(session, frame->hd.stream_id, hdrs, 1, &data_prd); - // LOG(INFO) << "nghttp2_submit_response returned " << rv; - nghttp2_session_send(session); // flush to output - } else { - LOG(ERROR) << "Server logic: No stream data found!"; + if (!sd) { + LOG(FATAL) << "No stream data found"; + } + + nghttp2_data_provider data_prd; + data_prd.source.ptr = sd; + data_prd.read_callback = echo_data_provider_callback; + + int submit_result = nghttp2_submit_response(session, frame->hd.stream_id, hdrs, 1, &data_prd); + if (submit_result < 0) { + LOG(FATAL) << "nghttp2_submit_response() failed: " << nghttp2_strerror(submit_result); + } + + int send_result = nghttp2_session_send(session); // flush to output + if (send_result < 0) { + LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result); } } break; @@ -89,8 +113,12 @@ static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data) { struct stream_data* sd = (struct stream_data*)nghttp2_session_get_stream_user_data(session, stream_id); if (sd) { - evbuffer_add(sd->req_body, data, len); + int add_result = evbuffer_add(sd->req_body, data, len); + if (add_result < 0) { + LOG(FATAL) << "evbuffer_add() failed"; + } } + return 0; } @@ -106,6 +134,7 @@ static ssize_t echo_data_provider_callback(nghttp2_session* session, int32_t str if (evbuffer_get_length(sd->req_body) == 0) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; + evbuffer_free(sd->req_body); delete sd; nghttp2_session_set_stream_user_data(session, stream_id, NULL); @@ -134,43 +163,39 @@ static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_fra return 0; } -// nghttp2 callbacks -static void setup_nghttp2_callbacks(nghttp2_session_callbacks* callbacks) { - nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); - nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback); - nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback); - nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback); - nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback); -} - -// Read from network static void readcb(struct bufferevent* bev, void* ptr) { struct ClientSession* client = (struct ClientSession*)ptr; - struct evbuffer* input = bufferevent_get_input(bev); + struct evbuffer* input = bufferevent_get_input(bev); size_t datalen = evbuffer_get_length(input); -// LOG(INFO) << "Server readcb: got " << datalen << " bytes"; + VLOG(2) << "readcb(): got " << datalen << " bytes"; + if (datalen == 0) return; unsigned char* data = evbuffer_pullup(input, -1); ssize_t readlen = nghttp2_session_mem_recv(client->session, data, datalen); if (readlen < 0) { - LOG(ERROR) << "Server: nghttp2_session_mem_recv error: " << nghttp2_strerror(static_cast(readlen)); - bufferevent_free(bev); - return; + LOG(FATAL) << "nghttp2_session_mem_recv() failed: " << nghttp2_strerror(static_cast(readlen)); } -// LOG(INFO) << "Server readcb: consumed " << readlen << " bytes"; + + VLOG(2) << "readcb(): consumed " << readlen << " bytes"; + evbuffer_drain(input, readlen); - nghttp2_session_send(client->session); + + int send_result = nghttp2_session_send(client->session); + if (send_result < 0) { + LOG(FATAL) << "nghttp2_session_send() failed: " << nghttp2_strerror(send_result); + } } static void eventcb(struct bufferevent* bev, short events, void* ptr) { struct ClientSession* client = (struct ClientSession*)ptr; + if (events & BEV_EVENT_ERROR) { LOG(ERROR) << "Error from bufferevent"; } + if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { nghttp2_session_del(client->session); bufferevent_free(bev); @@ -178,9 +203,16 @@ static void eventcb(struct bufferevent* bev, short events, void* ptr) { } } -static void acceptcb(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* a, int slen, void* p) { - struct app_context* app_ctx = (struct app_context*)p; +static void acceptcb(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* addr, int slen, void* ptr) { + struct app_context* app_ctx = (struct app_context*)ptr; struct event_base* base = app_ctx->evbase; + + int nodelay = 1; + int setopt_result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); + if (setopt_result != 0) { + LOG(FATAL) << "setsockopt(TCP_NODELAY) failed"; + } + struct bufferevent* bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); struct ClientSession* client = new ClientSession; @@ -189,52 +221,60 @@ static void acceptcb(struct evconnlistener* listener, evutil_socket_t fd, struct nghttp2_session_callbacks* callbacks; nghttp2_session_callbacks_new(&callbacks); - setup_nghttp2_callbacks(callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback); + nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback); + nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback); + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback); nghttp2_session_server_new(&client->session, callbacks, client); + nghttp2_session_callbacks_del(callbacks); nghttp2_settings_entry iv[2] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}, - {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 128 * 1024 * 1024}}; + {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 15 * 1024 * 1024}, + {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, 1 << 20}}; nghttp2_submit_settings(client->session, NGHTTP2_FLAG_NONE, iv, 2); - nghttp2_submit_window_update(client->session, NGHTTP2_FLAG_NONE, 0, 128 * 1024 * 1024); + + nghttp2_session_set_local_window_size(client->session, NGHTTP2_FLAG_NONE, 0, 15 * 1024 * 1024); + nghttp2_session_send(client->session); bufferevent_setcb(bev, readcb, NULL, eventcb, client); - bufferevent_enable(bev, EV_READ | EV_WRITE); + int enable_result = bufferevent_enable(bev, EV_READ | EV_WRITE); + if (enable_result != 0) { + LOG(FATAL) << "bufferevent_enable() failed"; + } } int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); absl::InitializeLog(); - struct event_base* base; - struct evconnlistener* listener; - struct sockaddr_in sin; - struct app_context app_ctx; - - base = event_base_new(); + struct event_base* base = event_base_new(); if (!base) { - LOG(FATAL) << "Could not initialize libevent!"; - return 1; + LOG(FATAL) << "event_base_new() failed"; } + struct app_context app_ctx; app_ctx.evbase = base; + struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(0); - sin.sin_port = htons(PORT); + sin.sin_port = htons(absl::GetFlag(FLAGS_port)); - listener = evconnlistener_new_bind(base, acceptcb, (void*)&app_ctx, - LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, - (struct sockaddr*)&sin, sizeof(sin)); + struct evconnlistener* listener = evconnlistener_new_bind(base, acceptcb, (void*)&app_ctx, + LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, + (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { - LOG(FATAL) << "Could not create a listener!"; - return 1; + LOG(FATAL) << "evconnlistener_new_bind() failed"; } - LOG(INFO) << "Listening on port " << PORT << "..."; + LOG(INFO) << "Listening on port " << absl::GetFlag(FLAGS_port) << "..."; event_base_dispatch(base);