Skip to content

Commit

Permalink
fix: store 2.9 entropy in all relevant storage modules
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Berman authored and JamesPiechota committed Jan 8, 2025
1 parent dc0c53f commit b36a8b1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 41 deletions.
77 changes: 42 additions & 35 deletions apps/arweave/src/ar_entropy_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
delete_record/2, generate_entropies/3, generate_missing_entropy/2, generate_entropy_keys/3,
shift_entropy_offset/2, store_entropy/8, record_chunk/6]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_chunk_storage.hrl").
-include("../include/ar.hrl").
-include("../include/ar_chunk_storage.hrl").

-include_lib("eunit/include/eunit.hrl").

%% @doc Return true if the given sub-chunk bucket contains the 2.9 entropy.
Expand Down Expand Up @@ -139,16 +140,16 @@ store_entropy(Entropies,
RewardAddr,
SubChunkStartOffset,
Keys),
FindModule =
case ar_storage_module:get_strict(PaddedEndOffset, {replica_2_9, RewardAddr}) of
not_found ->
?LOG_WARNING([{event, failed_to_find_storage_module_for_2_9_entropy},
FindModules =
case ar_storage_module:get_all_packed(PaddedEndOffset, {replica_2_9, RewardAddr}) of
[] ->
?LOG_WARNING([{event, failed_to_find_storage_modules_for_2_9_entropy},
{padded_end_offset, PaddedEndOffset}]),
not_found;
{ok, StoreID} ->
{ok, StoreID}
StoreIDs ->
{ok, StoreIDs}
end,
case FindModule of
case FindModules of
not_found ->
PaddedEndOffset2 = shift_entropy_offset(PaddedEndOffset, 1),
store_entropy(Rest,
Expand All @@ -159,32 +160,38 @@ store_entropy(Entropies,
RewardAddr,
N,
WaitN);
{ok, StoreID2} ->
{ok, StoreIDs2} ->
From = self(),
spawn_link(fun() ->
StartTime = erlang:monotonic_time(),

record_entropy(ChunkEntropy,
PaddedEndOffset,
StoreID2,
RewardAddr),

EndTime = erlang:monotonic_time(),
ElapsedTime =
erlang:convert_time_unit(EndTime - StartTime,
native,
microsecond),
%% bytes per second
WriteRate =
case ElapsedTime > 0 of
true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime;
false -> 0
end,
prometheus_gauge:set(replica_2_9_entropy_store_rate,
[StoreID2],
WriteRate),
From ! {store_entropy_sub_chunk_written, WaitN + 1}
end),
WaitN2 = lists:foldl(fun(StoreID2, WaitNAcc) ->
spawn_link(fun() ->
StartTime = erlang:monotonic_time(),

record_entropy(ChunkEntropy,
PaddedEndOffset,
StoreID2,
RewardAddr),

EndTime = erlang:monotonic_time(),
ElapsedTime =
erlang:convert_time_unit(EndTime - StartTime,
native,
microsecond),
%% bytes per second
WriteRate =
case ElapsedTime > 0 of
true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime;
false -> 0
end,
prometheus_gauge:set(replica_2_9_entropy_store_rate,
[StoreID2],
WriteRate),
From ! {store_entropy_sub_chunk_written, WaitNAcc + 1}
end),
WaitNAcc + 1
end,
WaitN,
StoreIDs2
),
PaddedEndOffset2 = shift_entropy_offset(PaddedEndOffset, 1),
store_entropy(Rest,
PaddedEndOffset2,
Expand All @@ -193,7 +200,7 @@ store_entropy(Entropies,
Keys,
RewardAddr,
N + length(Keys),
WaitN + 1)
WaitN2)
end
end.

Expand Down
37 changes: 31 additions & 6 deletions apps/arweave/src/ar_storage_module.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
-export([id/1, label/1, address_label/2, module_address/1,
module_packing_difficulty/1, packing_label/1, label_by_id/1, get_by_id/1,
get_range/1, module_range/1, module_range/2, get_packing/1, get_size/1,
get/2, get_strict/2, get_all/1, get_all/2, has_any/1, has_range/2, get_cover/3]).
get/2, get_strict/2, get_all/1, get_all/2, get_all_packed/2,
has_any/1, has_range/2, get_cover/3]).

-export([get_unique_sorted_intervals/1]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("eunit/include/eunit.hrl").

-include("../include/ar.hrl").
-include("../include/ar_consensus.hrl").
-include("../include/ar_config.hrl").

-include_lib("eunit/include/eunit.hrl").

Expand Down Expand Up @@ -228,6 +227,18 @@ get_all(Offset) ->
{ok, Config} = application:get_env(arweave, config),
get_all(Offset, Config#config.storage_modules, []).

%% @doc Return the list of identifiers of all configured storage modules
%% covering the given Offset and Packing. If a module is configured with
%% in-place repacking, pick the target packing (the one we are repacking to.)
get_all_packed(Offset, Packing) ->
{ok, Config} = application:get_env(arweave, config),
RepackInPlaceModulesStoreIDs = [
{{BucketSize, Bucket, TargetPacking}, ar_storage_module:id(Module)}
|| {{BucketSize, Bucket, _Packing} = Module, TargetPacking} <- Config#config.repack_in_place_storage_modules],
ModuleStoreIDs = [{Module, ar_storage_module:id(Module)}
|| Module <- Config#config.storage_modules],
get_all_packed(Offset, Packing, ModuleStoreIDs ++ RepackInPlaceModulesStoreIDs).

%% @doc Return the list of configured storage modules whose ranges intersect
%% the given interval.
get_all(Start, End) ->
Expand Down Expand Up @@ -340,6 +351,20 @@ get_all(Offset, [{BucketSize, Bucket, Packing} = StorageModule | StorageModules]
get_all(_Offset, [], FoundModules) ->
FoundModules.

get_all_packed(Offset, Packing,
[{{BucketSize, Bucket, Packing}, StoreID} | StorageModules]) ->
case Offset =< BucketSize * Bucket
orelse Offset > BucketSize * (Bucket + 1) + get_overlap(Packing) of
true ->
get_all_packed(Offset, Packing, StorageModules);
false ->
[StoreID | get_all_packed(Offset, Packing, StorageModules)]
end;
get_all_packed(Offset, Packing, [_Element | StorageModules]) ->
get_all_packed(Offset, Packing, StorageModules);
get_all_packed(_Offset, _Packing, []) ->
[].

get_all(Start, End, [{BucketSize, Bucket, Packing} = StorageModule | StorageModules], FoundModules) ->
case End =< BucketSize * Bucket
orelse Start >= BucketSize * (Bucket + 1) + get_overlap(Packing) of
Expand Down

0 comments on commit b36a8b1

Please sign in to comment.