Skip to content

Commit

Permalink
Manually trigger the garbage collector periodically.
Browse files Browse the repository at this point in the history
Several of the processes involved in mining "touch" chunks. The
way Erlang's memory managment works each time a process touches
a chunk it gets a reference to the full 100MiB recall range.

Also the default garbage collection frequency is based on the
amount of activity a given process does. So for some processes
the default GC may happen infrequently. This can lead to
the 100MiB recall ranges staying in memory long enough to
trigger memory issues.

By manually GC'ing periodically we ensure memory is released
regularly.
  • Loading branch information
JamesPiechota committed Feb 12, 2024
1 parent 40d4682 commit 2b0bb57
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 23 deletions.
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2691,7 +2691,7 @@ pack_and_store_chunk(Args, State) ->
{relative_offset, Offset},
{required_packing,
ar_chunk_storage:encode_packing(RequiredPacking)},
{packing, Packing2}]),
{packing, ar_chunk_storage:encode_packing(Packing2)}]),
ar_util:cast_after(600000, self(),
{expire_repack_chunk_request,
{AbsoluteOffset, RequiredPacking}}),
Expand Down
62 changes: 59 additions & 3 deletions apps/arweave/src/ar_mining_hash.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

-behaviour(gen_server).

-export([start_link/0, compute_h0/2, compute_h1/2, compute_h2/2]).
-export([start_link/0, compute_h0/2, compute_h1/2, compute_h2/2,
set_cache_limit/1, garbage_collect/0]).

-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).

Expand All @@ -12,9 +13,13 @@
-include_lib("arweave/include/ar_mining.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(SAMPLE_PROCESS_INTERVAL, 1000).

-record(state, {
hashing_threads = queue:new(),
hashing_thread_monitor_refs = #{}
hashing_thread_monitor_refs = #{},
chunks_seen = 0,
chunk_cache_limit = infinity
}).

%%%===================================================================
Expand All @@ -34,6 +39,12 @@ compute_h1(Worker, Candidate) ->
compute_h2(Worker, Candidate) ->
gen_server:cast(?MODULE, {compute, h2, Worker, Candidate}).

set_cache_limit(CacheLimit) ->
gen_server:cast(?MODULE, {set_cache_limit, CacheLimit}).

garbage_collect() ->
gen_server:cast(?MODULE, garbage_collect).

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
Expand All @@ -46,22 +57,68 @@ init([]) ->
#state{},
lists:seq(1, Config#config.hashing_threads)
),
% ar_util:cast_after(?SAMPLE_PROCESS_INTERVAL, ?MODULE, sample_process),
{ok, State}.

handle_call(Request, _From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
{reply, ok, State}.

handle_cast({set_cache_limit, CacheLimit}, State) ->
{noreply, State#state{ chunk_cache_limit = CacheLimit }};

handle_cast(sample_process, State) ->
[{binary, BinInfoBefore}] = process_info(self(), [binary]),
?LOG_DEBUG([{event, mining_hash_process_sample},{pid, self()}, {b, length(BinInfoBefore)},
{binary_before, BinInfoBefore}]),
queue:fold(
fun(Thread, _) ->
[{binary, BinInfoBefore2}] = process_info(Thread, [binary]),
?LOG_DEBUG([{event, mining_hash_thread_sample}, {thread, Thread}, {b, length(BinInfoBefore2)},
{binary_before, BinInfoBefore2}])
end,
ok,
State#state.hashing_threads
),
ar_util:cast_after(?SAMPLE_PROCESS_INTERVAL, ?MODULE, sample_process),
{noreply, State};

handle_cast({compute, HashType, Worker, Candidate},
#state{ hashing_threads = Threads } = State) ->
{Thread, Threads2} = pick_hashing_thread(Threads),
Thread ! {compute, HashType, Worker, Candidate},
{noreply, State#state{ hashing_threads = Threads2 }};

handle_cast(garbage_collect, State) ->
erlang:garbage_collect(self(),
[{async, {ar_mining_hash, self(), erlang:monotonic_time()}}]),
queue:fold(
fun(Thread, _) ->
erlang:garbage_collect(Thread,
[{async, {ar_mining_hash_worker, Thread, erlang:monotonic_time()}}])
end,
ok,
State#state.hashing_threads
),
{noreply, State};

handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_info({garbage_collect, {Name, Pid, StartTime}, GCResult}, State) ->
EndTime = erlang:monotonic_time(),
ElapsedTime = erlang:convert_time_unit(EndTime-StartTime, native, millisecond),
case GCResult == false orelse ElapsedTime > 100 of
true ->
?LOG_DEBUG([
{event, mining_debug_garbage_collect}, {process, Name}, {pid, Pid},
{gc_time, ElapsedTime}, {gc_result, GCResult}]);
false ->
ok
end,
{noreply, State};

handle_info({'DOWN', Ref, process, _, Reason},
#state{ hashing_thread_monitor_refs = HashingThreadRefs } = State) ->
case maps:is_key(Ref, HashingThreadRefs) of
Expand Down Expand Up @@ -122,7 +179,6 @@ hashing_thread() ->
hashing_thread()
end.


pick_hashing_thread(Threads) ->
{{value, Thread}, Threads2} = queue:out(Threads),
{Thread, queue:in(Thread, Threads2)}.
78 changes: 68 additions & 10 deletions apps/arweave/src/ar_mining_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

-behaviour(gen_server).

-export([start_link/0, set_largest_seen_upper_bound/1, get_partitions/0, get_partitions/1,
read_recall_range/4]).
-export([start_link/0, set_largest_seen_upper_bound/1,
get_partitions/0, get_partitions/1, read_recall_range/4, garbage_collect/0]).

-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).

Expand All @@ -13,6 +13,8 @@
-include_lib("arweave/include/ar_mining.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(SAMPLE_PROCESS_INTERVAL, 1000).

-record(state, {
partition_upper_bound = 0,
io_threads = #{},
Expand Down Expand Up @@ -55,6 +57,9 @@ get_partitions(PartitionUpperBound) ->
))
).

garbage_collect() ->
gen_server:cast(?MODULE, garbage_collect).

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
Expand All @@ -69,9 +74,9 @@ init([]) ->
#state{},
get_io_channels()
),
% ar_util:cast_after(?SAMPLE_PROCESS_INTERVAL, ?MODULE, sample_process),
{ok, State}.


handle_call({set_largest_seen_upper_bound, PartitionUpperBound}, _From, State) ->
#state{ partition_upper_bound = CurrentUpperBound } = State,
case PartitionUpperBound > CurrentUpperBound of
Expand All @@ -89,22 +94,75 @@ handle_call({read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart}
#mining_candidate{ mining_address = MiningAddress } = Candidate,
PartitionNumber = ar_node:get_partition_number(RecallRangeStart),
RangeEnd = RecallRangeStart + ?RECALL_RANGE_SIZE,
case find_thread(PartitionNumber, MiningAddress, RangeEnd, RecallRangeStart, IOThreads) of
ThreadFound = case find_thread(PartitionNumber, MiningAddress, RangeEnd, RecallRangeStart, IOThreads) of
not_found ->
{reply, false, State};
false;
Thread ->
Thread ! {WhichChunk, {Worker, Candidate, RecallRangeStart}},
{reply, true, State}
end;
true
end,
{reply, ThreadFound, State};

handle_call(Request, _From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
{reply, ok, State}.

handle_cast(sample_process, State) ->
[{binary, BinInfoBefore}] = process_info(self(), [binary]),
?LOG_DEBUG([{event, mining_io_process_sample},{pid, self()}, {b, length(BinInfoBefore)},
{binary_before, BinInfoBefore}]),
% [{binary, BinInfoBefore}] = process_info(self(), [binary]),
% garbage_collect(self()),
% [{binary, BinInfoAfter}] = process_info(self(), [binary]),
% ?LOG_DEBUG([{event, mining_io_process_sample},{pid, self()}, {b, length(BinInfoBefore)},
% {a, length(BinInfoAfter)}, {binary_before, BinInfoBefore}, {binary_after, BinInfoAfter}]),
maps:fold(
fun(_Key, Thread, _) ->
[{binary, BinInfoBefore2}] = process_info(Thread, [binary]),
?LOG_DEBUG([{event, mining_io_thread_sample}, {thread, Thread}, {b, length(BinInfoBefore2)},
{binary_before, BinInfoBefore2}])
% [{binary, BinInfoBefore2}] = process_info(Thread, [binary]),
% garbage_collect(self()),
% [{binary, BinInfoAfter2}] = process_info(Thread, [binary]),
% ?LOG_DEBUG([{event, mining_io_thread_sample}, {thread, Thread}, {b, length(BinInfoBefore2)},
% {a, length(BinInfoAfter2)}, {binary_before, BinInfoBefore2}, {binary_after, BinInfoAfter2}])
end,
ok,
State#state.io_threads
),
ar_util:cast_after(?SAMPLE_PROCESS_INTERVAL, ?MODULE, sample_process),
{noreply, State};

handle_cast(garbage_collect, State) ->
erlang:garbage_collect(self(),
[{async, {ar_mining_io, self(), erlang:monotonic_time()}}]),
maps:fold(
fun(_Key, Thread, _) ->
erlang:garbage_collect(Thread,
[{async, {ar_mining_io_worker, Thread, erlang:monotonic_time()}}])
end,
ok,
State#state.io_threads
),
{noreply, State};

handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_info({garbage_collect, {Name, Pid, StartTime}, GCResult}, State) ->
EndTime = erlang:monotonic_time(),
ElapsedTime = erlang:convert_time_unit(EndTime-StartTime, native, millisecond),
case GCResult == false orelse ElapsedTime > 100 of
true ->
?LOG_DEBUG([
{event, mining_debug_garbage_collect}, {process, Name}, {pid, Pid},
{gc_time, ElapsedTime}, {gc_result, GCResult}]);
false ->
ok
end,
{noreply, State};

handle_info({'DOWN', Ref, process, _, Reason},
#state{ io_thread_monitor_refs = IOThreadRefs } = State) ->
case maps:is_key(Ref, IOThreadRefs) of
Expand Down Expand Up @@ -228,9 +286,10 @@ read_range(WhichChunk, Worker, Candidate, RangeStart, StoreID) ->
StartTime = erlang:monotonic_time(),
Size = ?RECALL_RANGE_SIZE,
#mining_candidate{ mining_address = MiningAddress } = Candidate,
Intervals = get_packed_intervals(RangeStart, RangeStart + Size,
UniqueSize = Size, %% + (rand:uniform(100)*?DATA_CHUNK_SIZE),
Intervals = get_packed_intervals(RangeStart, RangeStart + UniqueSize,
MiningAddress, StoreID, ar_intervals:new()),
ChunkOffsets = ar_chunk_storage:get_range(RangeStart, Size, StoreID),
ChunkOffsets = ar_chunk_storage:get_range(RangeStart, UniqueSize, StoreID),
ChunkOffsets2 = filter_by_packing(ChunkOffsets, Intervals, StoreID),
NonceMax = max(0, (Size div ?DATA_CHUNK_SIZE - 1)),
read_range(WhichChunk, Worker, Candidate, RangeStart, 0, NonceMax, ChunkOffsets2),
Expand Down Expand Up @@ -322,4 +381,3 @@ find_thread3([Key | Keys], RangeEnd, RangeStart, Max, MaxKey) ->
end;
find_thread3([], _RangeEnd, _RangeStart, _Max, MaxKey) ->
MaxKey.

52 changes: 51 additions & 1 deletion apps/arweave/src/ar_mining_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
active_sessions = sets:new(),
diff = infinity,
chunk_cache_size_limit = 0,
gc_frequency_ms = undefined,
merkle_rebase_threshold = infinity,
is_pool_client = false
}).
Expand Down Expand Up @@ -157,6 +158,30 @@ handle_cast({post_solution, Solution}, State) ->
post_solution(Solution, State),
{noreply, State};

handle_cast(manual_garbage_collect, State) ->
%% Reading recall ranges from disk causes a large amount of binary data to be allocated and
%% references to that data is spread among all the different mining processes. Because of this
%% it can take the default garbage collection to clean up all references and deallocate the
%% memory - which in turn can cause memory to be exhausted.
%%
%% To address this the mining server will force a garbage collection on all mining processes
%% every time we process a few VDF steps. The exact number of VDF steps is determined by
%% the chunk cache size limit in order to roughly align garbage collection with when we
%% expect all references to a recall range's chunks to be evicted from the cache.
?LOG_DEBUG([{event, mining_debug_garbage_collect_start}]),
ar_mining_io:garbage_collect(),
ar_mining_hash:garbage_collect(),
erlang:garbage_collect(self(), [{async, erlang:monotonic_time()}]),
maps:foreach(
fun(_Partition, Worker) ->
ar_mining_worker:garbage_collect(Worker)
end,
State#state.workers
),
ar_util:cast_after(State#state.gc_frequency_ms, ?MODULE, manual_garbage_collect),
{noreply, State};


handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.
Expand All @@ -177,6 +202,19 @@ handle_info({event, nonce_limiter, Message}, State) ->
?LOG_DEBUG([{event, mining_debug_skipping_nonce_limiter}, {message, Message}]),
{noreply, State};

handle_info({garbage_collect, StartTime, GCResult}, State) ->
EndTime = erlang:monotonic_time(),
ElapsedTime = erlang:convert_time_unit(EndTime-StartTime, native, millisecond),
case GCResult == false orelse ElapsedTime > 100 of
true ->
?LOG_DEBUG([
{event, mining_debug_garbage_collect}, {process, ar_mining_server}, {pid, self()},
{gc_time, ElapsedTime}, {gc_result, GCResult}]);
false ->
ok
end,
{noreply, State};

handle_info(Message, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {message, Message}]),
{noreply, State}.
Expand Down Expand Up @@ -281,6 +319,7 @@ update_cache_limits(State) ->
end,
State#state.workers
),
ar_mining_hash:set_cache_limit(OverallCacheLimit),

ar:console(
"~nSetting the mining chunk cache size limit to ~B chunks "
Expand All @@ -296,7 +335,18 @@ update_cache_limits(State) ->
[IdealCacheLimit]);
false -> ok
end,
State#state{ chunk_cache_limit = NewCacheLimit }
GarbageCollectionFrequency = 2 * VDFQueueLimit * 1000,
case State#state.gc_frequency_ms == undefined of
true ->
%% This is the first time setting the garbage collection frequency, so kick
%% off the periodic call.
ar_util:cast_after(GarbageCollectionFrequency, ?MODULE, manual_garbage_collect);
false -> ok
end,
State#state{
chunk_cache_limit = NewCacheLimit,
gc_frequency_ms = GarbageCollectionFrequency
}
end.

distribute_output(Candidate, State) ->
Expand Down
Loading

0 comments on commit 2b0bb57

Please sign in to comment.