Optimize command formatting and representation#162
Open
zuiderkwast wants to merge 2 commits intoEricsson:mainfrom
Open
Optimize command formatting and representation#162zuiderkwast wants to merge 2 commits intoEricsson:mainfrom
zuiderkwast wants to merge 2 commits intoEricsson:mainfrom
Conversation
Build the RESP binary by appending directly instead of constructing
an iolist and converting it. This avoids the intermediate iolist
allocation and is faster for the full data path (cross-process copy
+ TCP send) which is the common case.
Benchmark results (cross-process copy + TCP send, OTP 28):
iolist_to_binary iolist bin-append
small (37B) 2105 ns/op 1128 888
pipeline (1.1KB) 2492 ns/op 2930 1226
Change redis_command() representation from a list of per-command
binaries to a single concatenated binary. Response classes are
precomputed at convert_to time and stored in the tuple, making
get_data and get_response_class simple field accesses.
New type:
{redis_command, {single, response_class()}, binary()}
{redis_command, {pipeline, [response_class()]}, binary()}
Also simplify resp_class to work on raw commands instead of
RESP-formatted binaries, and preserve the original response
class in add_asking (needed for sharded pub/sub commands like
SSUBSCRIBE that can receive ASK redirects).
Benchmark results (convert_to + cross-process copy + TCP send, OTP 28):
before after
single small 2053 ns/op 2372
pipeline 10 3584 ns/op 2712
large real 3823 ns/op 3050
Collaborator
Author
|
The failing tests are flaky tests, most of which will be fixed in the feature branch for single-process client. @WilliamVoong @bjosv PTAL |
Collaborator
Author
|
The benchmarks were run using some code directly on the command line, like this: rebar3 shell --eval '
Small = [<<"SET">>, <<"mykey">>, <<"myvalue">>],
Medium = [[<<"HSET">>, <<"hash:key">>, <<"field">>, crypto:strong_rand_bytes(64)] || _ <- lists:seq(1, 10)],
Large = amf_valkey_data:commands(),
N = 1000000,
Bench = fun(Label, Nn, Cmd) ->
Recv = spawn_link(fun() -> (fun F(0) -> receive {done,P} -> P ! ok end; F(I) -> receive {cmd,_} -> F(I-1) end end)(Nn) end),
T0 = erlang:monotonic_time(microsecond),
lists:foreach(fun(_) -> C = ered_command:convert_to(Cmd), Recv ! {cmd, C} end, lists:seq(1,Nn)),
Recv ! {done, self()}, receive ok -> ok end,
T1 = erlang:monotonic_time(microsecond),
io:format("~-20s ~.1f~n", [Label, (T1-T0)*1000/Nn])
end,
io:format("=== commit 2: flatten pipeline (convert_to + copy) ===~n"),
Bench("single small", N, Small),
Bench("pipeline 10", N div 10, Medium),
Bench("large real 7x5KB", N div 50, Large),
halt().
' 2>&1 | grep -E "===|small|pipeline|large"I have also a temporary module that does more-or-less the same: src/ered_command_bench.erl-module(ered_command_bench).
-export([run/0]).
run() ->
Small = [<<"SET">>, <<"mykey">>, <<"myvalue">>],
Medium = [[<<"HSET">>, <<"hash:key">>, <<"field">>, crypto:strong_rand_bytes(64)]
|| _ <- lists:seq(1, 10)],
Large = try amf_valkey_data:commands()
catch _:_ -> [[<<"MSET">> | [crypto:strong_rand_bytes(128) || _ <- lists:seq(1, 100)]]]
end,
BigVal = [<<"SET">>, <<"mykey">>, crypto:strong_rand_bytes(100000)],
N = 500000,
io:format("=== convert_to only ===~n"),
bench_convert("single small", N, Small),
bench_convert("pipeline 10", N div 10, Medium),
bench_convert("large real", N div 50, Large),
bench_convert("big value 100KB", N div 50, BigVal),
io:format("~n=== convert_to: iolist_to_binary vs iolist vs bin-append ===~n"),
bench_convert_variants("single small", N, Small),
bench_convert_variants("pipeline 10", N div 10, Medium),
bench_convert_variants("large real", N div 50, Large),
bench_convert_variants("big value 100KB", N div 50, BigVal),
io:format("~n=== convert_to + cross-process copy ===~n"),
bench_copy("single small", N, Small),
bench_copy("pipeline 10", N div 10, Medium),
bench_copy("large real", N div 50, Large),
bench_copy("big value 100KB", N div 50, BigVal),
io:format("~n=== convert_to + cross-process copy + port send ===~n"),
{ok, LSock} = gen_tcp:listen(0, [{active, false}, binary]),
{ok, Port} = inet:port(LSock),
spawn_link(fun() -> {ok, S} = gen_tcp:accept(LSock), drain(S) end),
{ok, Sock} = gen_tcp:connect("localhost", Port, [binary, {active, false}]),
bench_send("single small", N, Small, Sock),
bench_send("pipeline 10", N div 10, Medium, Sock),
bench_send("large real", N div 50, Large, Sock),
bench_send("big value 100KB", N div 50, BigVal, Sock),
gen_tcp:close(Sock),
gen_tcp:close(LSock),
io:format("~n=== binary vs iolist vs bin-append: cross-process copy + send ===~n"),
bench_three_variants(N, Small, "small (37B)"),
MedBin = ered_command:get_data(ered_command:convert_to(Medium)),
MedSize = iolist_size(MedBin),
bench_three_variants(N div 10, Medium, io_lib:format("pipeline 10 (~pB)", [MedSize])),
ok.
%% --- convert_to only ---
bench_convert(Label, N, Cmd) ->
_ = [ered_command:convert_to(Cmd) || _ <- lists:seq(1, 100)],
T0 = erlang:monotonic_time(microsecond),
repeat(fun() -> ered_command:convert_to(Cmd) end, N),
T1 = erlang:monotonic_time(microsecond),
io:format("~-20s ~.1f ns/op~n", [Label, (T1 - T0) * 1000 / N]).
bench_convert_variants(Label, N, Cmd) ->
T0 = erlang:monotonic_time(microsecond),
repeat(fun() -> ered_command:convert_to(Cmd) end, N),
T1 = erlang:monotonic_time(microsecond),
repeat(fun() -> make_iolist(Cmd) end, N),
T2 = erlang:monotonic_time(microsecond),
repeat(fun() -> make_bin_append(Cmd) end, N),
T3 = erlang:monotonic_time(microsecond),
io:format("~-20s iolist_to_binary: ~.1f iolist: ~.1f bin-append: ~.1f ns/op~n",
[Label, (T1 - T0) * 1000 / N, (T2 - T1) * 1000 / N, (T3 - T2) * 1000 / N]).
%% --- convert_to + send as message to another process ---
bench_copy(Label, N, Cmd) ->
Receiver = spawn_link(fun() -> sink(N) end),
T0 = erlang:monotonic_time(microsecond),
repeat(fun() ->
Converted = ered_command:convert_to(Cmd),
Receiver ! {cmd, Converted}
end, N),
Receiver ! {done, self()},
receive ok -> ok end,
T1 = erlang:monotonic_time(microsecond),
io:format("~-20s ~.1f ns/op~n", [Label, (T1 - T0) * 1000 / N]).
%% --- convert_to + send message + tcp send (mimics full path) ---
bench_send(Label, N, Cmd, Sock) ->
Sender = spawn_link(fun() -> tcp_sender(N, Sock) end),
T0 = erlang:monotonic_time(microsecond),
repeat(fun() ->
Converted = ered_command:convert_to(Cmd),
Data = ered_command:get_data(Converted),
Sender ! {send, Data}
end, N),
Sender ! {done, self()},
receive ok -> ok end,
T1 = erlang:monotonic_time(microsecond),
io:format("~-20s ~.1f ns/op~n", [Label, (T1 - T0) * 1000 / N]).
%% --- Compare binary vs iolist vs bin-append for cross-process copy + tcp send ---
bench_three_variants(N, Cmd, Label) ->
BinData = ered_command:get_data(ered_command:convert_to(Cmd)),
IoListData = make_iolist(Cmd),
BinAppendData = make_bin_append(Cmd),
{ok, LSock} = gen_tcp:listen(0, [{active, false}, binary]),
{ok, Port} = inet:port(LSock),
Socks = [begin
spawn_link(fun() -> {ok, S} = gen_tcp:accept(LSock), drain(S) end),
{ok, S} = gen_tcp:connect("localhost", Port, [binary, {active, false}]),
S
end || _ <- [1,2,3]],
[Sock1, Sock2, Sock3] = Socks,
T0 = bench_send_data(N, BinData, Sock1),
T1 = bench_send_data(N, IoListData, Sock2),
T2 = bench_send_data(N, BinAppendData, Sock3),
[gen_tcp:close(S) || S <- Socks],
gen_tcp:close(LSock),
io:format("~s~n iolist_to_binary: ~.1f ns/op~n iolist: ~.1f ns/op~n bin-append: ~.1f ns/op~n",
[Label, T0 * 1000 / N, T1 * 1000 / N, T2 * 1000 / N]).
bench_send_data(N, Data, Sock) ->
Sender = spawn_link(fun() -> tcp_sender(N, Sock) end),
T0 = erlang:monotonic_time(microsecond),
repeat(fun() -> Sender ! {send, Data} end, N),
Sender ! {done, self()},
receive ok -> ok end,
T1 = erlang:monotonic_time(microsecond),
T1 - T0.
%% Build the iolist that command_to_bin would produce before iolist_to_binary
make_iolist([E|_] = Cmds) when is_list(E) ->
[make_iolist_single(C) || C <- Cmds];
make_iolist(Cmd) ->
make_iolist_single(Cmd).
make_iolist_single(RawCmd) ->
Len = integer_to_list(length(RawCmd)),
Elements = [[$$, integer_to_list(size(Bin)), $\r, $\n, Bin, $\r, $\n] || Bin <- RawCmd],
[$*, Len, $\r, $\n, Elements].
%% Build binary by appending (no iolist intermediate)
make_bin_append([E|_] = Cmds) when is_list(E) ->
[make_bin_append_single(C) || C <- Cmds];
make_bin_append(Cmd) ->
make_bin_append_single(Cmd).
make_bin_append_single(RawCmd) ->
Len = integer_to_binary(length(RawCmd)),
Header = <<"*", Len/binary, "\r\n">>,
lists:foldl(fun(Bin, Acc) ->
Size = integer_to_binary(byte_size(Bin)),
<<Acc/binary, $$, Size/binary, "\r\n", Bin/binary, "\r\n">>
end, Header, RawCmd).
%% Helpers
sink(0) ->
receive {done, Pid} -> Pid ! ok end;
sink(N) ->
receive {cmd, _} -> sink(N - 1) end.
tcp_sender(0, _Sock) ->
receive {done, Pid} -> Pid ! ok end;
tcp_sender(N, Sock) ->
receive {send, Data} ->
gen_tcp:send(Sock, Data),
tcp_sender(N - 1, Sock)
end.
drain(Sock) ->
case gen_tcp:recv(Sock, 0, 100) of
{ok, _} -> drain(Sock);
{error, _} -> ok
end.
repeat(Fun, N) when N > 0 -> Fun(), repeat(Fun, N - 1);
repeat(_, 0) -> ok.Details |
Collaborator
|
I took a quick look and overall this looks great, very clever to convert the list of commands to a binary for cheaper copying between processes. I will review it in detail next week. |
Collaborator
Author
It was your idea to do this. 😄 (although I took it further) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR optimizes how commands are formatted and represented internally, reducing overhead in the hot path (command formatting → cross-process copy → TCP send).
Changes
Commit 1: Use binary append instead of iolist_to_binary in command_to_bin
Build the RESP binary by appending directly (
<<Acc/binary, ...>>) instead of constructing an intermediate iolist and converting it with iolist_to_binary. This avoids one allocation.Commit 2: Flatten pipeline to single binary with precomputed response class
New type:
Benchmark results
Results (convert_to + cross-process copy, ns/op, OTP 28):
Commit 1 (bin-append) is roughly neutral — small win on single/pipeline, small loss on large.
Commit 2 (flatten pipeline) is where the big gains are: ~38% faster for pipelines. The single binary is much cheaper to copy between processes than a list of binaries.