Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

sc handler optimisations #1396

Merged
merged 3 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/blockchain_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ init(Args) ->
BWorkerOpts = [
{port, proplists:get_value(port, Args, 0)},
{base_dir, BaseDir},
{update_dir, proplists:get_value(update_dir, Args, undefined)}
{update_dir, proplists:get_value(update_dir, Args, undefined)},
{ets_cache, blockchain_worker:make_ets_table()}
],

BEventOpts = [],
Expand Down
48 changes: 47 additions & 1 deletion src/blockchain_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
%% ------------------------------------------------------------------
-export([
start_link/1,
make_ets_table/0,
blockchain/0, blockchain/1,
cached_blockchain/0,
num_consensus_members/0,
consensus_addrs/0,
integrate_genesis_block/1,
Expand Down Expand Up @@ -78,6 +80,9 @@
-define(WEEK_OLD_SECONDS, 7*24*60*60). %% a week's worth of seconds
-define(MAX_ATTEMPTS, 3).

-define(CACHE, worker_cache).
-define(CHAIN, chain).

-ifdef(TEST).
-define(SYNC_TIME, 1000).
-else.
Expand Down Expand Up @@ -121,7 +126,30 @@
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]).
Res = gen_server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]),
case Res of
{ok, Pid} ->
%% if we have an ETS table reference, give ownership to the new process
%% we likely are the `heir', so we'll get it back if this process dies
case proplists:get_value(ets_cache, Args, not_found) of
not_found ->
%% should ever hit here but ok....
ok;
Tab ->
true = ets:give_away(Tab, Pid, undefined)
end;
_ ->
ok
end,
Res.

-spec make_ets_table() -> ets:tab().
make_ets_table() ->
ets:new(?CACHE,
[named_table,
public, %% public as ?MODULE:init needs to write chain to the table. TODO: move chain load out of init and make this table protected
{read_concurrency, true},
{heir, self(), undefined}]).

%%--------------------------------------------------------------------
%% @doc
Expand All @@ -135,6 +163,14 @@ blockchain() ->
blockchain(Chain) ->
gen_server:call(?SERVER, {blockchain, Chain}, infinity).

-spec cached_blockchain() -> blockchain:blockchain() | undefined.
cached_blockchain() ->
try ets:lookup_element(?CACHE, ?CHAIN, 2) of
X -> X
catch
_:_ -> undefined
end.

%%--------------------------------------------------------------------
%% @doc
%% @end
Expand Down Expand Up @@ -387,6 +423,7 @@ init(Args) ->
[ libp2p_swarm:listen(SwarmTID, Addr) || Addr <- ListenAddrs ]),
NewState = #state{swarm_tid = SwarmTID, blockchain = Blockchain,
gossip_ref = Ref},
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be pretty easy to move this into a handle_continue/2 so we load the chain into the cache table before receiving any other messages post-init. would it make sense (perhaps later) to consolidate these "blockchain worker cache items" into a single shared globally readable cache, owned and updated by the worker, and consolidate the "current height" cache that's in review to merge into the txn mgr? any others doing similar work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we def need to get loading the chain out of the init, using handle_continue would work well there. I dont want to address that in this PR as its dramatically increases the impact and thus the risk.

We can address in a follow up PR and that could also include replacing all blocking calls to the worker to return the chain with the new non blocking call. Extending the cache to also include the height and additional data does make sense too.

{Mode, Info} = get_sync_mode(NewState),
SnapshotTimerRef = schedule_snapshot_timer(),
case application:get_env(blockchain, disable_prewarm, false) of
Expand Down Expand Up @@ -420,6 +457,7 @@ handle_call({blockchain, NewChain}, _From, #state{swarm_tid = SwarmTID} = State)
notify({new_chain, NewChain}),
remove_handlers(SwarmTID),
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, State#state{blockchain = NewChain, gossip_ref = GossipRef}};
handle_call({new_ledger, Dir}, _From, #state{blockchain=Chain}=State) ->
%% We do this here so the same process that normally owns the ledger
Expand Down Expand Up @@ -486,6 +524,7 @@ handle_call({install_snapshot, Height, Hash, Snapshot, BinSnap}, _From,
set_resyncing(ChainHeight, LedgerHeight, NewChain)
end,
blockchain_lock:release(),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};
true ->
Expand All @@ -506,6 +545,7 @@ handle_call({install_aux_snapshot, Snapshot}, _From,
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
notify({new_chain, NewChain}),
blockchain_lock:release(),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think that we need this one here, this is for the aux ledger.

{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};

Expand Down Expand Up @@ -538,16 +578,19 @@ handle_call({add_commit_hook, CF, HookIncFun, HookEndFun} , _From, #state{blockc
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nor here

{reply, Ref, State#state{blockchain = Chain1}};
handle_call({add_commit_hook, CF, HookIncFun, HookEndFun, Pred} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Pred, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nor here

{reply, Ref, State#state{blockchain = Chain1}};
handle_call({remove_commit_hook, RefOrCF} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
Ledger1 = blockchain_ledger_v1:remove_commit_hook(RefOrCF, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nor here

{reply, ok, State#state{blockchain = Chain1}};

handle_call(_Msg, _From, State) ->
Expand All @@ -557,6 +600,7 @@ handle_call(_Msg, _From, State) ->
handle_cast({load, BaseDir, GenDir}, #state{blockchain=undefined}=State) ->
{Blockchain, Ref} = load_chain(State#state.swarm_tid, BaseDir, GenDir),
{Mode, Info} = get_sync_mode(State#state{blockchain=Blockchain, gossip_ref=Ref}),
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
NewState = State#state{blockchain = Blockchain, gossip_ref = Ref, mode=Mode, snapshot_info=Info},
notify({new_chain, Blockchain}),
{noreply, NewState};
Expand Down Expand Up @@ -743,6 +787,7 @@ handle_info({'DOWN', RocksGCRef, process, RocksGCPid, Reason},
{noreply, State#state{rocksdb_gc_mref = undefined}};

handle_info({blockchain_event, {new_chain, NC}}, State) ->
true = ets:insert(?CACHE, {?CHAIN, NC}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure this is duplicative

{noreply, State#state{blockchain = NC}};
handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
Expand Down Expand Up @@ -805,6 +850,7 @@ integrate_genesis_block_(
blockchain = Chain,
gossip_ref = GossipRef
},
true = ets:insert(?CACHE, {?CHAIN, Chain}),
{Mode, SyncInfo} = get_sync_mode(S1),
{ok, S1#state{mode=Mode, snapshot_info=SyncInfo}}
end;
Expand Down
76 changes: 43 additions & 33 deletions src/grpc/blockchain_grpc_sc_server_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,49 +32,20 @@ close(_HandlerPid)->
-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
init(_RPC, StreamState)->
lager:debug("initiating grpc state channel server handler with state ~p", [StreamState]),
HandlerMod = application:get_env(blockchain, sc_packet_handler, undefined),
OfferLimit = application:get_env(blockchain, sc_pending_offer_limit, 5),
Blockchain = blockchain_worker:blockchain(),
Ledger = blockchain:ledger(Blockchain),
Self = self(),
case blockchain:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
ActiveSCs =
e2qc:cache(
?MODULE,
active_list,
10,
fun() -> maps:to_list(blockchain_state_channels_server:get_actives()) end
),
case ActiveSCs of
[] ->
SCBanner = blockchain_state_channel_banner_v1:new(),
lager:debug("blockchain_grpc_sc_server_handler, empty banner: ~p", [SCBanner]),
Self ! {send_banner, SCBanner};
ActiveSCs ->
[{_SCID, {ActiveSC, _, _}}|_] = ActiveSCs,
SCBanner = blockchain_state_channel_banner_v1:new(ActiveSC),
Self ! {send_banner, SCBanner}
end;
_ ->
noop
end,
HandlerState = blockchain_state_channel_common:new_handler_state(Blockchain, Ledger, #{}, [], HandlerMod,OfferLimit, false),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
NewHandlerState = maybe_initialize_state(HandlerState),
grpcbox_stream:stream_handler_state(
StreamState,
HandlerState
NewHandlerState
).

-spec msg(blockchain_state_channel_v1:message(), grpcbox_stream:t()) -> grpcbox_stream:t().
msg(#blockchain_state_channel_message_v1_pb{msg = Msg}, StreamState) ->
lager:debug("grpc msg called with ~p and state ~p", [Msg, StreamState]),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
Chain = blockchain_state_channel_common:chain(HandlerState),

%% get our chain and only handle the request if the chain is up
%% if chain not up we have no way to return routing data so just return a 14/503
Chain = blockchain_worker:cached_blockchain(),
case is_chain_ready(Chain) of
false ->
{grpc_error,
Expand Down Expand Up @@ -133,3 +104,42 @@ is_chain_ready(undefined) ->
false;
is_chain_ready(_Chain) ->
true.

%% handler state if not initialized will be undefined otherwise will be a record
-spec maybe_initialize_state(
blockchain_state_channel_common:handler_state() | undefined) ->
blockchain_state_channel_common:handler_state().
maybe_initialize_state(undefined) ->
HandlerMod = application:get_env(blockchain, sc_packet_handler, undefined),
OfferLimit = application:get_env(blockchain, sc_pending_offer_limit, 5),
Blockchain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Blockchain),
Self = self(),
case blockchain_ledger_v1:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
ActiveSCs =
e2qc:cache(
?MODULE,
active_list,
30,
fun() -> maps:to_list(blockchain_state_channels_server:get_actives()) end
),
case ActiveSCs of
[] ->
SCBanner = blockchain_state_channel_banner_v1:new(),
lager:debug("blockchain_grpc_sc_server_handler, empty banner: ~p", [SCBanner]),
Self ! {send_banner, SCBanner};
ActiveSCs ->
[{_SCID, {ActiveSC, _, _}}|_] = ActiveSCs,
SCBanner = blockchain_state_channel_banner_v1:new(ActiveSC),
Self ! {send_banner, SCBanner}
end;
_ ->
noop
end,
blockchain_state_channel_common:new_handler_state(#{}, [], HandlerMod,OfferLimit, false);
maybe_initialize_state(HandlerState) ->
HandlerState.

Loading