Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor API server supervision tree to improve stability #27

Merged
merged 1 commit into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 67 additions & 29 deletions server/lib/bitcoin_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,77 @@ defmodule BitcoinStream.RPC do
{port, opts} = Keyword.pop(opts, :port);
{host, opts} = Keyword.pop(opts, :host);
IO.puts("Starting Bitcoin RPC server on #{host} port #{port}")
GenServer.start_link(__MODULE__, {host, port, nil, nil}, opts)
GenServer.start_link(__MODULE__, {host, port, nil, nil, []}, opts)
end

@impl true
def init({host, port, status, _}) do
def init({host, port, status, _, listeners}) do
# start node monitoring loop
creds = rpc_creds();

send(self(), :check_status);
{:ok, {host, port, status, creds}}
{:ok, {host, port, status, creds, listeners}}
end

def handle_info(:check_status, state) do
# Do the desired work here
state = check_status(state)
Process.send_after(self(), :check_status, 60 * 1000)
{:noreply, state}
defp notify_listeners([]) do
true
end
defp notify_listeners([head | tail]) do
GenServer.reply(head, :mempool_synced);
notify_listeners(tail)
end

@impl true
def handle_call({:request, method, params}, _from, {host, port, status, creds}) do
def handle_info(:check_status, {host, port, status, creds, listeners}) do
# poll Bitcoin Core for current status
status = check_status({host, port, creds});
case status do
# if node is connected and finished with the initial block download
{:ok, %{"initialblockdownload" => false}} ->
IO.puts("Bitcoin Core connected and fully synced");
# notify all listening processes
notify_listeners(listeners);
Process.send_after(self(), :check_status, 60 * 1000);
{:noreply, {host, port, status, creds, []}}

{:ok, %{"initialblockdownload" => true}} ->
IO.puts("Bitcoin Core connected, waiting for initial block download");
Process.send_after(self(), :check_status, 60 * 1000);
{:noreply, {host, port, status, creds, listeners}}

_ ->
IO.puts("Waiting to connect to Bitcoin Core");
Process.send_after(self(), :check_status, 60 * 1000);
{:noreply, {host, port, status, creds, listeners}}
end
end

@impl true
def handle_call({:request, method, params}, _from, {host, port, status, creds, listeners}) do
case make_request(host, port, creds, method, params) do
{:ok, code, info} ->
{:reply, {:ok, code, info}, {host, port, status, creds}}
{:reply, {:ok, code, info}, {host, port, status, creds, listeners}}

{:error, reason} ->
{:reply, {:error, reason}, {host, port, status, creds}}
{:reply, {:error, reason}, {host, port, status, creds, listeners}}

error ->
{:reply, error, {host, port, status, creds, listeners}}
end
end

@impl true
def handle_call({:get_node_status}, _from, {host, port, status, creds}) do
{:reply, {:ok, status}, {host, port, status, creds}}
def handle_call(:get_node_status, _from, {host, port, status, creds, listeners}) do
{:reply, status, {host, port, status, creds, listeners}}
end

@impl true
def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners}) do
{:noreply, {host, port, status, creds, [from | listeners]}}
end

def notify_on_ready(pid) do
GenServer.call(pid, :notify_on_ready, :infinity)
end

defp make_request(host, port, creds, method, params) do
Expand All @@ -68,29 +106,29 @@ defmodule BitcoinStream.RPC do
end

def request(pid, method, params) do
GenServer.call(pid, {:request, method, params}, 60000)
GenServer.call(pid, {:request, method, params}, 30000)
catch
:exit, reason ->
IO.puts("RPC request #{method} failed - probably timed out?")
IO.inspect(reason)
case reason do
{:timeout, _} -> {:error, :timeout}

_ -> {:error, reason}
end

error -> {:error, error}
end

def get_node_status(pid) do
GenServer.call(pid, {:get_node_status})
GenServer.call(pid, :get_node_status, 10000)
end

def check_status({host, port, status, creds}) do
with {:ok, 200, info} <- make_request(host, port, creds, "getblockchaininfo", []) do
{host, port, info, creds}
else
{:error, reason} ->
IO.puts("node status check failed");
IO.inspect(reason)
{host, port, status, creds}
err ->
IO.puts("node status check failed: (unknown reason)");
IO.inspect(err);
{host, port, status, creds}
defp check_status({host, port, creds}) do
case make_request(host, port, creds, "getblockchaininfo", []) do
{:ok, 200, info} -> {:ok, info}

{:ok, code, info} -> {:error, code, info}

_ -> {:error}
end
end

Expand Down
18 changes: 14 additions & 4 deletions server/lib/block_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ defmodule BitcoinStream.BlockData do
"""
use GenServer

alias BitcoinStream.Protocol.Block, as: BitcoinBlock

def start_link(opts) do
IO.puts("Starting block data link")
# load block
Expand Down Expand Up @@ -38,7 +36,19 @@ defmodule BitcoinStream.BlockData do
end

@impl true
def handle_cast({:json, {id, json}}, _state) do
{:noreply, {id, json}}
def handle_call({:json, {id, json}}, _from, _state) do
{:reply, :ok, {id, json}}
end

def get_json_block(pid) do
GenServer.call(pid, :json_block, 10000)
end

def get_block_id(pid) do
GenServer.call(pid, :block_id, 10000)
end

def set_json_block(pid, block_id, json) do
GenServer.call(pid, {:json, { block_id, json }}, 10000)
end
end
Loading