Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
eliminate use of state chain and ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jun 24, 2022
1 parent 6834b37 commit ee9ae87
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 88 deletions.
26 changes: 13 additions & 13 deletions src/blockchain_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(Args) ->
Res = server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]),
Res = gen_server:start_link({local, ?SERVER}, ?SERVER, Args, [{hibernate_after, 5000}]),
case Res of
{ok, Pid} ->
%% if we have an ETS table reference, give ownership to the new process
Expand All @@ -143,11 +143,11 @@ start_link(Args) ->
end,
Res.


-spec make_ets_table() -> ets:tab().
make_ets_table() ->
ets:new(?CACHE,
[named_table,
protected,
public, %% public as ?MODULE:init needs to write chain to the table. TODO: move chain load out of init and make this table protected
{heir, self(), undefined}]).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -422,7 +422,7 @@ init(Args) ->
[ libp2p_swarm:listen(SwarmTID, Addr) || Addr <- ListenAddrs ]),
NewState = #state{swarm_tid = SwarmTID, blockchain = Blockchain,
gossip_ref = Ref},
ok = ets:insert(?CACHE, {?CHAIN, Blockchain}),
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
{Mode, Info} = get_sync_mode(NewState),
SnapshotTimerRef = schedule_snapshot_timer(),
case application:get_env(blockchain, disable_prewarm, false) of
Expand Down Expand Up @@ -456,7 +456,7 @@ handle_call({blockchain, NewChain}, _From, #state{swarm_tid = SwarmTID} = State)
notify({new_chain, NewChain}),
remove_handlers(SwarmTID),
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
ok = ets:insert(?CACHE, {?CHAIN, NewChain}),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, State#state{blockchain = NewChain, gossip_ref = GossipRef}};
handle_call({new_ledger, Dir}, _From, #state{blockchain=Chain}=State) ->
%% We do this here so the same process that normally owns the ledger
Expand Down Expand Up @@ -523,7 +523,7 @@ handle_call({install_snapshot, Height, Hash, Snapshot, BinSnap}, _From,
set_resyncing(ChainHeight, LedgerHeight, NewChain)
end,
blockchain_lock:release(),
ok = ets:insert(?CACHE, {?CHAIN, NewChain}),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};
true ->
Expand All @@ -544,7 +544,7 @@ handle_call({install_aux_snapshot, Snapshot}, _From,
{ok, GossipRef} = add_handlers(SwarmTID, NewChain),
notify({new_chain, NewChain}),
blockchain_lock:release(),
ok = ets:insert(?CACHE, {?CHAIN, NewChain}),
true = ets:insert(?CACHE, {?CHAIN, NewChain}),
{reply, ok, maybe_sync(State#state{mode = normal, sync_paused = false,
blockchain = NewChain, gossip_ref = GossipRef})};

Expand Down Expand Up @@ -577,19 +577,19 @@ handle_call({add_commit_hook, CF, HookIncFun, HookEndFun} , _From, #state{blockc
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
ok = ets:insert(?CACHE, {?CHAIN, Chain1}),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, Ref, State#state{blockchain = Chain1}};
handle_call({add_commit_hook, CF, HookIncFun, HookEndFun, Pred} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
{Ref, Ledger1} = blockchain_ledger_v1:add_commit_hook(CF, HookIncFun, HookEndFun, Pred, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
ok = ets:insert(?CACHE, {?CHAIN, Chain1}),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, Ref, State#state{blockchain = Chain1}};
handle_call({remove_commit_hook, RefOrCF} , _From, #state{blockchain = Chain} = State) ->
Ledger = blockchain:ledger(Chain),
Ledger1 = blockchain_ledger_v1:remove_commit_hook(RefOrCF, Ledger),
Chain1 = blockchain:ledger(Ledger1, Chain),
ok = ets:insert(?CACHE, {?CHAIN, Chain1}),
true = ets:insert(?CACHE, {?CHAIN, Chain1}),
{reply, ok, State#state{blockchain = Chain1}};

handle_call(_Msg, _From, State) ->
Expand All @@ -599,7 +599,7 @@ handle_call(_Msg, _From, State) ->
handle_cast({load, BaseDir, GenDir}, #state{blockchain=undefined}=State) ->
{Blockchain, Ref} = load_chain(State#state.swarm_tid, BaseDir, GenDir),
{Mode, Info} = get_sync_mode(State#state{blockchain=Blockchain, gossip_ref=Ref}),
ok = ets:insert(?CACHE, {?CHAIN, Blockchain}),
true = ets:insert(?CACHE, {?CHAIN, Blockchain}),
NewState = State#state{blockchain = Blockchain, gossip_ref = Ref, mode=Mode, snapshot_info=Info},
notify({new_chain, Blockchain}),
{noreply, NewState};
Expand Down Expand Up @@ -786,7 +786,7 @@ handle_info({'DOWN', RocksGCRef, process, RocksGCPid, Reason},
{noreply, State#state{rocksdb_gc_mref = undefined}};

handle_info({blockchain_event, {new_chain, NC}}, State) ->
ok = ets:insert(?CACHE, {?CHAIN, NC}),
true = ets:insert(?CACHE, {?CHAIN, NC}),
{noreply, State#state{blockchain = NC}};
handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
Expand Down Expand Up @@ -849,7 +849,7 @@ integrate_genesis_block_(
blockchain = Chain,
gossip_ref = GossipRef
},
ok = ets:insert(?CACHE, {?CHAIN, Chain}),
true = ets:insert(?CACHE, {?CHAIN, Chain}),
{Mode, SyncInfo} = get_sync_mode(S1),
{ok, S1#state{mode=Mode, snapshot_info=SyncInfo}}
end;
Expand Down
7 changes: 3 additions & 4 deletions src/grpc/blockchain_grpc_sc_server_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ init(_RPC, StreamState)->
msg(#blockchain_state_channel_message_v1_pb{msg = Msg}, StreamState) ->
lager:debug("grpc msg called with ~p and state ~p", [Msg, StreamState]),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
Chain = blockchain_state_channel_common:chain(HandlerState),

%% get our chain and only handle the request if the chain is up
%% if chain not up we have no way to return routing data so just return a 14/503
Chain = blockchain_worker:cached_blockchain(),
case is_chain_ready(Chain) of
false ->
{grpc_error,
Expand Down Expand Up @@ -116,7 +115,7 @@ maybe_initialize_state(undefined) ->
Blockchain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Blockchain),
Self = self(),
case blockchain:config(?sc_version, Ledger) of
case blockchain_ledger_v1:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
Expand All @@ -140,7 +139,7 @@ maybe_initialize_state(undefined) ->
_ ->
noop
end,
blockchain_state_channel_common:new_handler_state(Blockchain, Ledger, #{}, [], HandlerMod,OfferLimit, false);
blockchain_state_channel_common:new_handler_state(#{}, [], HandlerMod,OfferLimit, false);
maybe_initialize_state(HandlerState) ->
HandlerState.

68 changes: 13 additions & 55 deletions src/state_channel/blockchain_state_channel_common.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

-export([
new_handler_state/0,
new_handler_state/7,
ledger/1, ledger/2,
new_handler_state/5,
pending_packet_offers/1, pending_packet_offers/2,
offer_queue/1, offer_queue/2,
handler_mod/1, handler_mod/2,
pending_offer_limit/1, pending_offer_limit/2,
chain/1, chain/2,
streaming_initialized/1, streaming_initialized/2,
encode_pb/1, encode_pb/2,
state_channel/1, state_channel/2
]).
Expand All @@ -34,9 +31,6 @@
]).

-record(handler_state, {
streaming_initialized :: boolean(),
chain :: undefined | blockchain:blockchain(),
ledger :: undefined | blockchain_ledger_v1:ledger(),
pending_packet_offers = #{} :: #{binary() => {blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}},
offer_queue = [] :: [{blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}],
handler_mod = undefined :: atom(),
Expand All @@ -52,19 +46,14 @@
-spec new_handler_state() -> handler_state().
new_handler_state()->
#handler_state{}.
-spec new_handler_state(Chain :: blockchain:blockchain(),
Ledger :: undefined | blockchain_ledger_v1:ledger(),
PendingPacketOffers :: #{binary() => {blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}},
-spec new_handler_state(PendingPacketOffers :: #{binary() => {blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}},
OfferQueue :: [{blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}],
HandlerMod :: atom(),
PendingOfferLimit :: undefined | pos_integer(),
EncodePB :: boolean()
) -> handler_state().
new_handler_state(Chain, Ledger, PendingPacketOffers, OfferQueue, HandlerMod, PendingOfferLimit, EncodePB)->
new_handler_state(PendingPacketOffers, OfferQueue, HandlerMod, PendingOfferLimit, EncodePB)->
#handler_state{
streaming_initialized = true,
chain = Chain,
ledger = Ledger,
pending_packet_offers = PendingPacketOffers,
offer_queue = OfferQueue,
handler_mod = HandlerMod,
Expand All @@ -76,18 +65,6 @@ new_handler_state(Chain, Ledger, PendingPacketOffers, OfferQueue, HandlerMod, Pe
%%
%% State getters
%%
-spec chain(handler_state()) -> undefined | blockchain:blockchain().
chain(#handler_state{chain=V}) ->
V.

-spec streaming_initialized(handler_state()) -> undefined | boolean().
streaming_initialized(#handler_state{streaming_initialized=V}) ->
V.

-spec ledger(handler_state()) -> undefined | blockchain_ledger_v1:ledger().
ledger(#handler_state{ledger=V}) ->
V.

-spec pending_packet_offers(handler_state()) -> #{binary() => {blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}}.
pending_packet_offers(#handler_state{pending_packet_offers=V}) ->
V.
Expand Down Expand Up @@ -115,18 +92,6 @@ state_channel(#handler_state{state_channel=V}) ->
%%
%% State setters
%%
-spec chain(blockchain:blockchain(), handler_state()) -> handler_state().
chain(NewV, HandlerState) ->
HandlerState#handler_state{chain=NewV}.

-spec streaming_initialized(boolean(), handler_state()) -> handler_state().
streaming_initialized(NewV, HandlerState) ->
HandlerState#handler_state{streaming_initialized=NewV}.

-spec ledger(blockchain_ledger_v1:ledger(), handler_state()) -> handler_state().
ledger(NewV, HandlerState) ->
HandlerState#handler_state{ledger=NewV}.

-spec pending_packet_offers(#{binary() => {blockchain_state_channel_packet_offer_v1:offer(), pos_integer()}}, handler_state()) -> handler_state().
pending_packet_offers(NewV, HandlerState) ->
HandlerState#handler_state{pending_packet_offers=NewV}.
Expand Down Expand Up @@ -203,11 +168,12 @@ send_response(Pid, Resp) ->
handle_server_msg(
Msg,
#handler_state{
ledger = Ledger,
pending_packet_offers = PendingOffers,
pending_offer_limit = PendingOfferLimit
}=HandlerState
)->
Chain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Chain),
Time = erlang:system_time(millisecond),
PendingOfferCount = maps:size(PendingOffers),
case Msg of
Expand All @@ -221,21 +187,21 @@ handle_server_msg(
case maps:get(PacketHash, PendingOffers, undefined) of
undefined ->
lager:debug("sc_handler server got packet: ~p", [Packet]),
blockchain_state_channels_server:handle_packet(Packet, Time, HandlerState#handler_state.handler_mod, HandlerState#handler_state.ledger, self()),
blockchain_state_channels_server:handle_packet(Packet, Time, HandlerState#handler_state.handler_mod, Ledger, self()),
{ok, HandlerState};
{PendingOffer, PendingOfferTime} ->
case blockchain_state_channel_packet_v1:validate(Packet, PendingOffer) of
{error, packet_offer_mismatch} ->
%% might as well try it, it's free
blockchain_state_channels_server:handle_packet(Packet, Time, HandlerState#handler_state.handler_mod, HandlerState#handler_state.ledger, self()),
blockchain_state_channels_server:handle_packet(Packet, Time, HandlerState#handler_state.handler_mod, Ledger, self()),
lager:warning("packet failed to validate ~p against offer ~p", [Packet, PendingOffer]),
stop;
{error, Reason} ->
lager:warning("packet failed to validate ~p reason ~p", [Packet, Reason]),
stop;
true ->
lager:debug("sc_handler server got packet: ~p", [Packet]),
blockchain_state_channels_server:handle_packet(Packet, PendingOfferTime, HandlerState#handler_state.handler_mod, HandlerState#handler_state.ledger, self()),
blockchain_state_channels_server:handle_packet(Packet, PendingOfferTime, HandlerState#handler_state.handler_mod, Ledger, self()),
handle_next_offer(HandlerState#handler_state{pending_packet_offers=maps:remove(PacketHash, PendingOffers)})
end
end;
Expand Down Expand Up @@ -276,17 +242,8 @@ handle_server_msg(
end.

handle_client_msg(Msg, HandlerState) ->
%% get ledger if we don't yet have one
Ledger = case HandlerState#handler_state.ledger of
undefined ->
case blockchain_worker:blockchain() of
undefined ->
undefined;
Chain ->
blockchain:ledger(Chain)
end;
L -> L
end,
Chain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Chain),
case Msg of
{banner, Banner} ->
case blockchain_state_channel_banner_v1:sc(Banner) of
Expand Down Expand Up @@ -322,7 +279,7 @@ handle_client_msg(Msg, HandlerState) ->
lager:debug("sc_handler client got response: ~p", [Resp]),
blockchain_state_channels_client:response(Resp)
end,
HandlerState#handler_state{ledger=Ledger}.
HandlerState.


-spec handle_next_offer(State) -> {ok, State} | {ok, State, Msg :: any()} when
Expand All @@ -346,14 +303,15 @@ handle_offer(
Offer,
Time,
#handler_state{
ledger=Ledger,
pending_packet_offers=PendingOffers,
handler_mod=Mod,
encode_pb=MaybeEncodeMsg,
state_channel=CachedSC
}=HandlerState0
) ->
lager:debug("sc_handler server got offer: ~p", [Offer]),
Chain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Chain),
case blockchain_state_channels_server:handle_offer(Offer, Mod, Ledger, self()) of
ok ->
ReqDiff = blockchain_state_channel_offer_v1:req_diff(Offer),
Expand Down
23 changes: 7 additions & 16 deletions src/state_channel/blockchain_state_channel_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ init(server, _Conn, [_Path, Blockchain]) ->
Ledger = blockchain:ledger(Blockchain),
HandlerMod = application:get_env(blockchain, sc_packet_handler, undefined),
OfferLimit = application:get_env(blockchain, sc_pending_offer_limit, 5),
HandlerState = blockchain_state_channel_common:new_handler_state(Blockchain, Ledger, #{}, [], HandlerMod, OfferLimit, true),
case blockchain:config(?sc_version, Ledger) of
HandlerState = blockchain_state_channel_common:new_handler_state(#{}, [], HandlerMod, OfferLimit, true),
case blockchain_ledger_v1:config(?sc_version, Ledger) of
%% In this case only sc_version=2 is handling banners
%% version 1 never had them and banner will be removed form future versions
{ok, 2} ->
Expand Down Expand Up @@ -99,18 +99,9 @@ init(server, _Conn, [_Path, Blockchain]) ->
HandlerState :: any()
) -> libp2p_framed_stream:handle_data_result().
handle_data(client, Data, HandlerState) ->
Chain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Chain),
%% get ledger if we don't yet have one
Ledger =
case blockchain_state_channel_common:ledger(HandlerState) of
undefined ->
case blockchain_worker:blockchain() of
undefined ->
undefined;
Chain ->
blockchain:ledger(Chain)
end;
L -> L
end,
case blockchain_state_channel_message_v1:decode(Data) of
{banner, Banner} ->
case blockchain_state_channel_banner_v1:sc(Banner) of
Expand Down Expand Up @@ -146,9 +137,10 @@ handle_data(client, Data, HandlerState) ->
lager:debug("sc_handler client got response: ~p", [Resp]),
blockchain_state_channels_client:response(Resp)
end,
NewHandlerState = blockchain_state_channel_common:ledger(Ledger, HandlerState),
{noreply, NewHandlerState};
{noreply, HandlerState};
handle_data(server, Data, HandlerState) ->
Chain = blockchain_worker:cached_blockchain(),
Ledger = blockchain:ledger(Chain),
PendingOffers = blockchain_state_channel_common:pending_packet_offers(HandlerState),
PendingOfferLimit = blockchain_state_channel_common:pending_offer_limit(HandlerState),
Time = erlang:system_time(millisecond),
Expand All @@ -165,7 +157,6 @@ handle_data(server, Data, HandlerState) ->
NewHandlerState = blockchain_state_channel_common:offer_queue(CurOfferQueue ++ [{Offer, Time}], HandlerState),
{noreply, NewHandlerState};
{packet, Packet} ->
Ledger = blockchain_state_channel_common:ledger(HandlerState),
PacketHash = blockchain_helium_packet_v1:packet_hash(blockchain_state_channel_packet_v1:packet(Packet)),
case maps:get(PacketHash, PendingOffers, undefined) of
undefined ->
Expand Down

0 comments on commit ee9ae87

Please sign in to comment.