Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Header Synchronization Improvement #625

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
%% The default number of chunks fetched from disk at a time during in-place repacking.
-define(DEFAULT_REPACK_BATCH_SIZE, 100).

%% default filtering value for the peer list (30days)
-define(CURRENT_PEERS_LIST_FILTER, 30*60*60*24).

%% The default rocksdb databases flush interval, 30 minutes.
-define(DEFAULT_ROCKSDB_FLUSH_INTERVAL_S, 1800).
%% The default rocksdb WAL sync interval, 1 minute.
Expand Down
5 changes: 3 additions & 2 deletions apps/arweave/src/ar_header_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ check_fork(Height, H, TXRoot) ->
end.

download_block(H, H2, TXRoot) ->
Peers = ar_peers:get_peers(lifetime),
Peers = ar_peers:get_peers(current),
case ar_storage:read_block(H) of
unavailable ->
download_block(Peers, H, H2, TXRoot);
Expand All @@ -492,7 +492,8 @@ download_block(H, H2, TXRoot) ->

download_block(Peers, H, H2, TXRoot) ->
Fork_2_0 = ar_fork:height_2_0(),
case ar_http_iface_client:get_block_shadow(Peers, H) of
Opts = #{ rand_min => length(Peers) },
case ar_http_iface_client:get_block_shadow(Peers, H, Opts) of
unavailable ->
?LOG_WARNING([
{event, ar_header_sync_failed_to_download_block_header},
Expand Down
5 changes: 5 additions & 0 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ handle_info({gun_up, PID, _Protocol}, #state{ status_by_pid = StatusByPID } = St
[gen_server:reply(ReplyTo, {ok, PID}) || {ReplyTo, _} <- PendingRequests],
StatusByPID2 = maps:put(PID, {connected, MonitorRef, Peer}, StatusByPID),
prometheus_gauge:inc(outbound_connections),
ar_peers:connected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2 }};
{connected, _MonitorRef, Peer} ->
?LOG_WARNING([{event, gun_up_pid_already_exists},
{peer, ar_util:format_peer(Peer)}]),
ar_peers:connected_peer(Peer),
{noreply, State}
end;

Expand Down Expand Up @@ -179,6 +181,7 @@ handle_info({gun_error, PID, Reason},
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
gun:shutdown(PID),
?LOG_DEBUG([{event, connection_error}, {reason, io_lib:format("~p", [Reason])}]),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
Expand Down Expand Up @@ -208,6 +211,7 @@ handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStream
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
end;

Expand All @@ -226,6 +230,7 @@ handle_info({'DOWN', _Ref, process, PID, Reason},
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
end;

Expand Down
65 changes: 43 additions & 22 deletions apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@

-module(ar_http_iface_client).

-export([send_block_json/3, send_block_binary/3, send_block_binary/4, send_tx_json/3,
send_tx_binary/3, send_block_announcement/2, get_block_shadow/2, get_block_shadow/3,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2, get_tx_data/2,
get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2,
add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1,
get_block_index/3, get_sync_record/1, get_sync_record/3,
get_chunk_binary/3, get_mempool/1, get_sync_buckets/1,
get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3,
push_nonce_limiter_update/3, get_vdf_update/1, get_vdf_session/1,
get_previous_vdf_session/1, get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]).
-export([send_block_json/3, send_block_binary/3, send_block_binary/4,
send_tx_json/3, send_tx_binary/3, send_block_announcement/2,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2,
get_tx_data/2, get_wallet_list_chunk/2, get_wallet_list_chunk/3,
get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1,
get_time/2, get_height/1, get_block_index/3, get_sync_record/1,
get_sync_record/3, get_chunk_binary/3, get_mempool/1,
get_sync_buckets/1, get_recent_hash_list/1,
get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3, push_nonce_limiter_update/3,
get_vdf_update/1, get_vdf_session/1, get_previous_vdf_session/1,
get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2,
post_cm_partition_table_to_pool/2]).
-export([get_block_shadow/2, get_block_shadow/3, get_block_shadow/4]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -122,20 +125,38 @@ get_block(Peer, H, TXIndices) ->
{B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from one of the given peers.
get_block_shadow([], _ID) ->
unavailable;
%%--------------------------------------------------------------------
%% @doc get a block shadow using default parameter.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(Peers, ID) ->
Peer = lists:nth(rand:uniform(min(5, length(Peers))), Peers),
case get_block_shadow(ID, Peer, binary) of
get_block_shadow(Peers, ID, #{}).

%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from one of the given
%% peers. Some options can be modified like `rand_min',
%% `connect_timeout' and `timeout'.
%% @see get_block_shadow/4
%% @end
%%--------------------------------------------------------------------
get_block_shadow([], _ID, _Opts) ->
unavailable;
get_block_shadow(Peers, ID, Opts) ->
RandMin = maps:get(rand_min, Opts, 5),
Random = rand:uniform(min(RandMin, length(Peers))),
Peer = lists:nth(Random, Peers),
case get_block_shadow(ID, Peer, binary, Opts) of
not_found ->
get_block_shadow(Peers -- [Peer], ID);
get_block_shadow(Peers -- [Peer], ID, Opts);
{ok, B, Time, Size} ->
{Peer, B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from the given peer.
get_block_shadow(ID, Peer, Encoding) ->
%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from the given peer.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(ID, Peer, Encoding, _Opts) ->
handle_block_response(Peer, Encoding,
ar_http:req(#{
method => get,
Expand Down Expand Up @@ -1097,7 +1118,7 @@ get_info(Peer) ->
timeout => 2 * 1000
})
of
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
case ar_serialize:json_decode(JSON, [return_maps]) of
{ok, JsonMap} ->
JsonMap;
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ handle(<<"GET">>, [<<"peers">>], Req, _Pid) ->
[
list_to_binary(ar_util:format_peer(P))
||
P <- ar_peers:get_peers(lifetime),
P <- ar_peers:get_peers(current),
P /= ar_http_util:arweave_peer(Req),
ar_peers:is_public_peer(P)
]
Expand Down
159 changes: 151 additions & 8 deletions apps/arweave/src/ar_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([set_tag/3, get_tag/2]).
-export([connected_peer/1, disconnected_peer/1, is_connected_peer/1]).
-export([get_connection_timestamp_peer/1]).

%% The frequency in seconds of re-resolving DNS of peers configured by domain names.
-define(STORE_RESOLVED_DOMAIN_S, 60).
Expand Down Expand Up @@ -149,25 +152,47 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%%--------------------------------------------------------------------
%% @doc Return the list of peers in the given ranking order.
%% Rating is an estimate of the peer's effective throughput in bytes per millisecond.
%% Rating is an estimate of the peer's effective throughput in
%% bytes per millisecond.
%%
%% 'lifetime' considers all data ever received from this peer and is most useful when we care
%% more about identifying "good samaritans" rather than maximizing throughput (e.g. when
%% polling for new blocks are determing which peer's blocks to validated first).
%% `lifetime' considers all data ever received from this peer and
%% is most useful when we care more about identifying "good
%% samaritans" rather than maximizing throughput (e.g. when
%% polling for new blocks are determing which peer's blocks to
%% validated first).
%%
%% 'current' weights recently received data higher than old data and is most useful when we care
%% more about maximizing throughput (e.g. when syncing chunks).
%% `current' weights recently received data higher than old data
%% and is most useful when we care more about maximizing throughput
%% (e.g. when syncing chunks).
%%
%% @end
%%--------------------------------------------------------------------
get_peers(Ranking) ->
case catch ets:lookup(?MODULE, {peers, Ranking}) of
{'EXIT', _} ->
[];
[] ->
[];
[{{peers, lifetime}, Peers}] ->
Peers;
[{{peers, current}, Peers}] ->
filter_peers(Peers, {timestamp, ?CURRENT_PEERS_LIST_FILTER});
[{_, Peers}] ->
Peers
end.

filter_peers(Peers, {timestamp, Seconds})
when is_integer(Seconds) ->
Timefilter = erlang:system_time(seconds) - Seconds,
Tag = {connection, last},
Pattern = {{ar_tags, ?MODULE, '$1', Tag}, '$3'},
Guard = [{'>=', '$3', Timefilter}],
Select = ['$1'],
TaggedPeers = ets:select(?MODULE, [{Pattern, Guard, Select}]),
[ P || T <- TaggedPeers, P <- Peers, T =:= P ].

get_peer_performances(Peers) ->
lists:foldl(
fun(Peer, Map) ->
Expand Down Expand Up @@ -541,6 +566,15 @@ load_peers() ->
recalculate_total_rating(lifetime),
recalculate_total_rating(current),
?LOG_INFO([{event, polled_saved_peers}]),
ar:console("Polled saved peers.~n");
{ok, {_TotalRating, Records, Tags}} ->
?LOG_INFO([{event, polling_saved_peers}, {records, length(Records)}]),
ar:console("Polling saved peers...~n"),
load_peers(Records),
recalculate_total_rating(lifetime),
recalculate_total_rating(current),
[ ets:insert(?MODULE, {K, V}) || {K, V} <- Tags ],
?LOG_INFO([{event, polled_saved_peers}]),
ar:console("Polled saved peers.~n")
end.

Expand Down Expand Up @@ -904,13 +938,122 @@ store_peers() ->
[],
?MODULE
),
?LOG_INFO([{event, store_peers}, {total, Total}, {records, length(Records)}]),
ar_storage:write_term(peers, {Total, Records})
Tags = ets:foldl(fun ({{ar_tags, _, _, _}, _} = Tag, Acc) ->
[Tag|Acc];
(_, Acc) -> Acc
end, [], ?MODULE),
?LOG_INFO([{event, store_peers}
, {total, Total}
, {records, length(Records)}
, {tags, length(Tags)}]),
ar_storage:write_term(peers, {Total, Records, Tags})
end.

%%--------------------------------------------------------------------
%% @hidden
%% @doc internal function to tag a peer.
%% @end
%%--------------------------------------------------------------------
set_tag(Peer, Tag, Value) ->
ets:insert(?MODULE, {{ar_tags, ?MODULE, Peer, Tag}, Value}).

%%--------------------------------------------------------------------
%% @hidden
%% @doc internal function to get tag value set on a peer.
%% @end
%%--------------------------------------------------------------------
get_tag(Peer, Tag) ->
Pattern = {{ar_tags, ?MODULE, Peer, Tag}, '$1'},
Guard = [],
Select = ['$1'],
case ets:select(?MODULE, [{Pattern, Guard, Select}]) of
[] -> {error, not_found};
[V] -> {ok, V}
end.

%%--------------------------------------------------------------------
%% @doc defined a peer as connected (in HTTP sense).
%% @end
%%--------------------------------------------------------------------
connected_peer(Peer) ->
set_tag(Peer, {connection, last}, erlang:system_time(second)),
set_tag(Peer, {connection, active}, true).

%%--------------------------------------------------------------------
%% @doc defined a peer as disconnected (in HTTP sense).
%% @end
%%--------------------------------------------------------------------
disconnected_peer(Peer) ->
set_tag(Peer, {connection, active}, false).

%%--------------------------------------------------------------------
%% @doc returns peer's timestamp.
%% @end
%%--------------------------------------------------------------------
get_connection_timestamp_peer(Peer) ->
case get_tag(Peer, {connection, last}) of
{ok, V} -> V;
_ -> undefined
end.

%%--------------------------------------------------------------------
%% @doc returns the HTTP connection state of a peer.
%% @end
%%--------------------------------------------------------------------
is_connected_peer(Peer) ->
case get_tag(Peer, {connection, active}) of
{ok, V} -> V;
{error, _} -> false
end.

%%%===================================================================
%%% Tests.
%%%===================================================================
connected_peer_test() ->
Peer = {100, 117, 109, 98, 1234},

% drop all objects from the table to start with a clean state
ets:delete_all_objects(?MODULE),

% get all peers connected, it should returns nothing by
% default because the table is empty.
?assertEqual(undefined, get_connection_timestamp_peer(Peer)),

% manually add a new peer using set_ranked_peers function.
% the node is not connected because gun did not manage the
% connection in this test.
set_ranked_peers(lifetime, [Peer]),
set_ranked_peers(current, [Peer]),
?assertEqual(false, is_connected_peer(Peer)),
?assertEqual(undefined, get_connection_timestamp_peer(Peer)),

% force this peer to be connected using connected_peer
% function. A timestamp is created.
connected_peer(Peer),
Timestamp = get_connection_timestamp_peer(Peer),
?assertEqual(true, is_connected_peer(Peer)),
?assertEqual(Timestamp, get_connection_timestamp_peer(Peer)),
?assertNotEqual(undefined, get_connection_timestamp_peer(Peer)),
?assertEqual([Peer], get_peers(lifetime)),
?assertEqual([Peer], get_peers(current)),

% Now remove the connection to the peer. A timestamp must
% still be there.
disconnected_peer(Peer),
?assertEqual(false, is_connected_peer(Peer)),
?assertNotEqual(undefined, get_connection_timestamp_peer(Peer)),
?assertEqual([Peer], get_peers(lifetime)),
?assertEqual([Peer], get_peers(current)),

% let modify manually the timestamp to check get_peers/1
% function, and overwrite Peer timestamp with some defined
% values.
Time = erlang:system_time(second),
% Go above the limit
Limit = Time-((?CURRENT_PEERS_LIST_FILTER+10)*60*60*24),
set_tag(Peer, {connection, last}, Limit),
?assertEqual([], get_peers(current)),
?assertEqual([Peer], get_peers(lifetime)).

rotate_peer_ports_test() ->
Peer = {2, 2, 2, 2, 1},
Expand Down
5 changes: 4 additions & 1 deletion apps/arweave/test/ar_fork_recovery_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ test_invalid_block_with_high_cumulative_difficulty() ->
%% Assert the nodes have continued building on the original fork.
[{H3, _, _} | _] = ar_test_node:wait_until_height(peer1, 2),
?assertNotEqual(B2#block.indep_hash, H3),
{_Peer, B3, _Time, _Size} = ar_http_iface_client:get_block_shadow(1, ar_test_node:peer_ip(peer1), binary),
{_Peer, B3, _Time, _Size} =
ar_http_iface_client:get_block_shadow(1,
ar_test_node:peer_ip(peer1),
binary, #{}),
?assertEqual(H2, B3#block.indep_hash).

fake_block_with_strong_cumulative_difficulty(B, PrevB, CDiff) ->
Expand Down
Loading
Loading