Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1396 from helium/andymck/optimise-core-sc-server-…
Browse files Browse the repository at this point in the history
…handler

sc handler optimisations
  • Loading branch information
evanmcc authored Jun 24, 2022
2 parents 7edfbe1 + 1610f86 commit 5a00a4b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 95 deletions.
3 changes: 2 additions & 1 deletion src/blockchain_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ init(Args) ->
BWorkerOpts = [
{port, proplists:get_value(port, Args, 0)},
{base_dir, BaseDir},
{update_dir, proplists:get_value(update_dir, Args, undefined)}
{update_dir, proplists:get_value(update_dir, Args, undefined)},
{ets_cache, blockchain_worker:make_ets_table()}
],

BEventOpts = [],
Expand Down
48 changes: 47 additions & 1 deletion src/blockchain_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
%% ------------------------------------------------------------------
-export([
start_link/1,
make_ets_table/0,
blockchain/0, blockchain/1,
cached_blockchain/0,
num_consensus_members/0,
consensus_addrs/0,
integrate_genesis_block/1,
Expand Down Expand Up @@ -78,6 +80,9 @@
-define(WEEK_OLD_SECONDS, 7*24*60*60). %% a week's worth of seconds
-define(MAX_ATTEMPTS, 3).

-define(CACHE, worker_cache).
-define(CHAIN, chain).

-ifdef(TEST).
-define(SYNC_TIME, 1000).
-else.
Expand Down Expand Up @@ -121,7 +126,30 @@
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]).
Res = gen_server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]),
case Res of
{ok, Pid} ->
%% if we have an ETS table reference, give ownership to the new process
%% we likely are the `heir', so we'll get it back if this process dies
case proplists:get_value(ets_cache, Args, not_found) of
not_found ->
%% should ever hit here but ok....
ok;
Tab ->
true = ets:give_away(Tab, Pid, undefined)
end;
_ ->
ok
end,
Res.

-spec make_ets_table() -> ets:tab().
make_ets_table() ->
ets:new(?CACHE,
[named_table,
public, %% public as ?MODULE:init needs to write chain to the table. TODO: move chain load out of init and make this table protected
{read_concurrency, true},
{heir, self(), undefined}]).

%%--------------------------------------------------------------------
%% @doc
Expand All @@ -135,6 +163,14 @@ blockchain() ->
blockchain(Chain) ->
gen_server:call(?SERVER, {blockchain, Chain}, infinity).

-spec cached_blockchain() -> blockchain:blockchain() | undefined.
cached_blockchain() ->
try ets:lookup_element(?CACHE, ?CHAIN, 2) of
X -> X
catch
_:_ -> undefined
end.

%%--------------------------------------------------------------------
%% @doc
%% @end
Expand Down Expand Up @@ -387,6 +423,7 @@ init(Args) ->
[ libp2p_swarm:listen(SwarmTID, Addr) || Addr <- ListenAddrs ]),
NewState = #state{swarm_tid = SwarmTID, blockchain = Blockchain,
gossip_ref = Ref},
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
{Mode, Info} = get_sync_mode(NewState),
SnapshotTimerRef = schedule_snapshot_timer(),
case application:get_env(blockchain, disable_prewarm, false) of
Expand Down Expand Up @@ -420,6 +457,7 @@ handle_call({blockchain, NewChain}, _From, #state{swarm_tid = SwarmTID} = State)
notify({new_chain, NewChain}),
remove_handlers(SwarmTID),
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, State#state{blockchain = NewChain, gossip_ref = GossipRef}};
handle_call({new_ledger, Dir}, _From, #state{blockchain=Chain}=State) ->
%% We do this here so the same process that normally owns the ledger
Expand Down Expand Up @@ -486,6 +524,7 @@ handle_call({install_snapshot, Height, Hash, Snapshot, BinSnap}, _From,
set_resyncing(ChainHeight, LedgerHeight, NewChain)
end,
blockchain_lock:release(),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};
true ->
Expand All @@ -506,6 +545,7 @@ handle_call({install_aux_snapshot, Snapshot}, _From,
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
notify({new_chain, NewChain}),
blockchain_lock:release(),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};

Expand Down Expand Up @@ -538,16 +578,19 @@ handle_call({add_commit_hook, CF, HookIncFun, HookEndFun} , _From, #state{blockc
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, Ref, State#state{blockchain = Chain1}};
handle_call({add_commit_hook, CF, HookIncFun, HookEndFun, Pred} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Pred, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, Ref, State#state{blockchain = Chain1}};
handle_call({remove_commit_hook, RefOrCF} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
Ledger1 = blockchain_ledger_v1:remove_commit_hook(RefOrCF, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, ok, State#state{blockchain = Chain1}};

handle_call(_Msg, _From, State) ->
Expand All @@ -557,6 +600,7 @@ handle_call(_Msg, _From, State) ->
handle_cast({load, BaseDir, GenDir}, #state{blockchain=undefined}=State) ->
{Blockchain, Ref} = load_chain(State#state.swarm_tid, BaseDir, GenDir),
{Mode, Info} = get_sync_mode(State#state{blockchain=Blockchain, gossip_ref=Ref}),
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
NewState = State#state{blockchain = Blockchain, gossip_ref = Ref, mode=Mode, snapshot_info=Info},
notify({new_chain, Blockchain}),
{noreply, NewState};
Expand Down Expand Up @@ -743,6 +787,7 @@ handle_info({'DOWN', RocksGCRef, process, RocksGCPid, Reason},
{noreply, State#state{rocksdb_gc_mref = undefined}};

handle_info({blockchain_event, {new_chain, NC}}, State) ->
true = ets:insert(?CACHE, {?CHAIN, NC}),
{noreply, State#state{blockchain = NC}};
handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
Expand Down Expand Up @@ -805,6 +850,7 @@ integrate_genesis_block_(
blockchain = Chain,
gossip_ref = GossipRef
},
true = ets:insert(?CACHE, {?CHAIN, Chain}),
{Mode, SyncInfo} = get_sync_mode(S1),
{ok, S1#state{mode=Mode, snapshot_info=SyncInfo}}
end;
Expand Down
76 changes: 43 additions & 33 deletions src/grpc/blockchain_grpc_sc_server_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,49 +32,20 @@ close(_HandlerPid)->
-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
init(_RPC, StreamState)->
lager:debug("initiating grpc state channel server handler with state ~p", [StreamState]),
HandlerMod = application:get_env(blockchain, sc_packet_handler, undefined),
OfferLimit = application:get_env(blockchain, sc_pending_offer_limit, 5),
Blockchain = blockchain_worker:blockchain(),
Ledger = blockchain:ledger(Blockchain),
Self = self(),
case blockchain:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
ActiveSCs =
e2qc:cache(
?MODULE,
active_list,
10,
fun() -> maps:to_list(blockchain_state_channels_server:get_actives()) end
),
case ActiveSCs of
[] ->
SCBanner = blockchain_state_channel_banner_v1:new(),
lager:debug("blockchain_grpc_sc_server_handler, empty banner: ~p", [SCBanner]),
Self ! {send_banner, SCBanner};
ActiveSCs ->
[{_SCID, {ActiveSC, _, _}}|_] = ActiveSCs,
SCBanner = blockchain_state_channel_banner_v1:new(ActiveSC),
Self ! {send_banner, SCBanner}
end;
_ ->
noop
end,
HandlerState = blockchain_state_channel_common:new_handler_state(Blockchain, Ledger, #{}, [], HandlerMod,OfferLimit, false),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
NewHandlerState = maybe_initialize_state(HandlerState),
grpcbox_stream:stream_handler_state(
StreamState,
HandlerState
NewHandlerState
).

-spec msg(blockchain_state_channel_v1:message(), grpcbox_stream:t()) -> grpcbox_stream:t().
msg(#blockchain_state_channel_message_v1_pb{msg = Msg}, StreamState) ->
lager:debug("grpc msg called with ~p and state ~p", [Msg, StreamState]),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
Chain = blockchain_state_channel_common:chain(HandlerState),

%% get our chain and only handle the request if the chain is up
%% if chain not up we have no way to return routing data so just return a 14/503
Chain = blockchain_worker:cached_blockchain(),
case is_chain_ready(Chain) of
false ->
{grpc_error,
Expand Down Expand Up @@ -133,3 +104,42 @@ is_chain_ready(undefined) ->
false;
is_chain_ready(_Chain) ->
true.

%% handler state if not initialized will be undefined otherwise will be a record
-spec maybe_initialize_state(
blockchain_state_channel_common:handler_state() | undefined) ->
blockchain_state_channel_common:handler_state().
maybe_initialize_state(undefined) ->
HandlerMod = application:get_env(blockchain, sc_packet_handler, undefined),
OfferLimit = application:get_env(blockchain, sc_pending_offer_limit, 5),
Blockchain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Blockchain),
Self = self(),
case blockchain_ledger_v1:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
ActiveSCs =
e2qc:cache(
?MODULE,
active_list,
30,
fun() -> maps:to_list(blockchain_state_channels_server:get_actives()) end
),
case ActiveSCs of
[] ->
SCBanner = blockchain_state_channel_banner_v1:new(),
lager:debug("blockchain_grpc_sc_server_handler, empty banner: ~p", [SCBanner]),
Self ! {send_banner, SCBanner};
ActiveSCs ->
[{_SCID, {ActiveSC, _, _}}|_] = ActiveSCs,
SCBanner = blockchain_state_channel_banner_v1:new(ActiveSC),
Self ! {send_banner, SCBanner}
end;
_ ->
noop
end,
blockchain_state_channel_common:new_handler_state(#{}, [], HandlerMod,OfferLimit, false);
maybe_initialize_state(HandlerState) ->
HandlerState.

Loading

0 comments on commit 5a00a4b

Please sign in to comment.