Skip to content

Commit

Permalink
Merge pull request #1147 from basho/feature/gc-start-end-key
Browse files Browse the repository at this point in the history
Specify start/end timestamp in riak-cs-admin gc command

Reviewed-by: shino
  • Loading branch information
borshop committed May 26, 2015
2 parents 8300983 + ffa5467 commit 49669e5
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 110 deletions.
16 changes: 8 additions & 8 deletions include/riak_cs_gc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@
-type index_result_keys() :: keys().

-record(gc_batch_state, {
%% start of the current gc interval
%% start of the current gc batch
batch_start :: undefined | non_neg_integer(),
%% start of a range in riak-cs-gc bucket to be collected in this batch
start_key :: non_neg_integer(),
%% end of a range in riak-cs-gc bucket to be collected in this batch
end_key :: non_neg_integer(),
batch_count=0 :: non_neg_integer(),
%% Count of filesets skipped in this batch
batch_skips=0 :: non_neg_integer(),
batch=[] :: undefined | [index_result_keys()], % `undefined' only for testing
manif_count=0 :: non_neg_integer(),
block_count=0 :: non_neg_integer(),
%% used when moving from paused -> idle
interval_remaining :: undefined | non_neg_integer(),
leeway :: non_neg_integer(),
worker_pids=[] :: [pid()],
max_workers :: non_neg_integer(),
%% Used for paginated 2I querying of GC bucket
key_list_state :: undefined | gc_key_list_state(),
%% Options to use when start workers
bag_id :: binary(),
testing=false :: boolean()
bag_id :: binary()
}).

-record(gc_worker_state, {
Expand All @@ -69,8 +70,8 @@
current_riak_client :: undefined | riak_client(),
current_bag_id :: bag_id(),
%% start of the current gc interval
batch_start :: undefined | non_neg_integer(),
leeway :: non_neg_integer(),
start_key :: binary(),
end_key :: binary(),
%% Used for paginated 2I querying of GC bucket
continuation :: continuation()
}).
Expand Down Expand Up @@ -103,4 +104,3 @@
initial_delay :: non_neg_integer(),
timer_ref :: undefined | reference()
}).

2 changes: 1 addition & 1 deletion rel/files/riak-cs-admin
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ case "$1" in
$NODETOOL rpc riak_cs_gc_console "$@"
;;
*)
echo "Usage: $SCRIPT gc { batch [<leeway_seconds>] | status | pause | resume | cancel |"
echo "Usage: $SCRIPT gc { batch [<leeway_seconds>|--help] | status | pause | resume | cancel |"
echo " set-interval <interval_seconds> | set-leeway <leeway_seconds> }"
exit 1
;;
Expand Down
21 changes: 18 additions & 3 deletions riak_test/src/rtcs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,13 @@ riakcs_switchcmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-stanchion ~s", [riakcs_binpath(Path, N), Cmd])).

riakcs_gccmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-gc ~s", [riakcs_binpath(Path, N), Cmd])).
lists:flatten(io_lib:format("~s-admin gc ~s", [riakcs_binpath(Path, N), Cmd])).

riakcs_accesscmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-access ~s", [riakcs_binpath(Path, N), Cmd])).
lists:flatten(io_lib:format("~s-admin access ~s", [riakcs_binpath(Path, N), Cmd])).

riakcs_storagecmd(Path, N, Cmd) ->
lists:flatten(io_lib:format("~s-storage ~s", [riakcs_binpath(Path, N), Cmd])).
lists:flatten(io_lib:format("~s-admin storage ~s", [riakcs_binpath(Path, N), Cmd])).

stanchion_binpath(Prefix) ->
io_lib:format("~s/dev/stanchion/bin/stanchion", [Prefix]).
Expand Down Expand Up @@ -1191,3 +1191,18 @@ get_cmd_result(Port, WaitTime) ->
after WaitTime ->
{error, timeout}
end.

%% Copy from rts:iso8601/1
iso8601(Timestamp) when is_integer(Timestamp) ->
GregSec = Timestamp + 719528 * 86400,
Datetime = calendar:gregorian_seconds_to_datetime(GregSec),
{{Y,M,D},{H,I,S}} = Datetime,
io_lib:format("~4..0b~2..0b~2..0bT~2..0b~2..0b~2..0bZ",
[Y, M, D, H, I, S]).

reset_log(Node) ->
{ok, _Logs} = rpc:call(Node, gen_event, delete_handler,
[lager_event, riak_test_lager_backend, normal]),
ok = rpc:call(Node, gen_event, add_handler,
[lager_event, riak_test_lager_backend,
[rt_config:get(lager_level, info), false]]).
92 changes: 78 additions & 14 deletions riak_test/tests/gc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
-define(TEST_KEY, "riak_test_key").
-define(TEST_KEY_MP, "riak_test_mp").
-define(TEST_KEY_BAD_STATE, "riak_test_key_bad_state").

-define(TIMESLICES, 5).

confirm() ->
NumNodes = 1,
Expand All @@ -58,22 +58,44 @@ confirm() ->
RiakIDs = rtcs:riak_id_per_cluster(NumNodes),
[repair_gc_bucket(ID) || ID <- RiakIDs],
ok = verify_gc_run2(hd(CSNodes)),

%% Determinisitc GC test

%% Create keys not to be deleted
setup_normal_obj([{"spam", 42}, {"ham", 65536}, {"egg", 7}], UserConfig),
timer:sleep(1000), %% Next timestamp...

%% Create keys to be deleted
Start = os:timestamp(),
[begin
setup_normal_obj([{"hop", 42}, {"step", 65536}, {"jump", 7}], UserConfig),
timer:sleep(2000)
end || _ <- lists:seq(0,?TIMESLICES) ],
End = os:timestamp(),

timer:sleep(1000), %% Next timestamp...
setup_normal_obj([{"spam", 42}, {"ham", 65536}, {"egg", 7}], UserConfig),

verify_partial_gc_run(hd(CSNodes), RiakNodes, Start, End),
pass.

setup_normal_obj(ObjSpecs, UserConfig) ->
%% Put and delete some objects
[begin
Block = crypto:rand_bytes(Size),
Key = ?TEST_KEY ++ Suffix,
erlcloud_s3:put_object(?TEST_BUCKET, Key, Block, UserConfig),
erlcloud_s3:delete_object(?TEST_BUCKET, Key, UserConfig)
end || {Suffix, Size} <- ObjSpecs].

setup_obj(RiakNodes, UserConfig) ->
%% Setup bucket
lager:info("User is valid on the cluster, and has no buckets"),
?assertEqual([{buckets, []}], erlcloud_s3:list_buckets(UserConfig)),
lager:info("creating bucket ~p", [?TEST_BUCKET]),
?assertEqual(ok, erlcloud_s3:create_bucket(?TEST_BUCKET, UserConfig)),

%% Put and delete some objects
[begin
Block = crypto:rand_bytes(Size),
Key = ?TEST_KEY ++ Suffix,
erlcloud_s3:put_object(?TEST_BUCKET, Key, Block, UserConfig),
erlcloud_s3:delete_object(?TEST_BUCKET, Key, UserConfig)
end || {Suffix, Size} <- [{"1", 100}, {"2", 200}, {"3", 0}]],
setup_normal_obj([{"1", 100}, {"2", 200}, {"3", 0}], UserConfig),

%% Put and delete, but modified to pretend it is in wrong state
SingleBlock = crypto:rand_bytes(400),
Expand All @@ -88,12 +110,7 @@ setup_obj(RiakNodes, UserConfig) ->
{ok, GCKey, UUID} = change_state_to_active(GCPbc, BKey, GCKeys),

%% Put and delete some more objects
[begin
Block = crypto:rand_bytes(Size),
Key = ?TEST_KEY ++ Suffix,
erlcloud_s3:put_object(?TEST_BUCKET, Key, Block, UserConfig),
erlcloud_s3:delete_object(?TEST_BUCKET, Key, UserConfig)
end || {Suffix, Size} <- [{"Z", 0}, {"Y", 150}, {"X", 1}]],
setup_normal_obj([{"Z", 0}, {"Y", 150}, {"X", 1}], UserConfig),

riakc_pb_socket:stop(GCPbc),
{GCKey, {BKey, UUID}}.
Expand Down Expand Up @@ -206,3 +223,50 @@ verify_riak_object_remaining_for_bad_key(RiakNodes, GCKey, {{Bucket, Key}, UUID}
lager:info("As expected, BAD manifest in GC bucket remains,"
" stand off orphan manfiests/blocks: ~p", [Manifest]),
ok.

verify_partial_gc_run(CSNode, RiakNodes,
{MegaSec0, Sec0, _},
{MegaSec1, Sec1, _}) ->
Start0 = MegaSec0 * 1000000 + Sec0,
End0 = MegaSec1 * 1000000 + Sec1,
Interval = erlang:max(1, (End0 - Start0) div ?TIMESLICES),
Starts = [ {Start0 + N * Interval, Start0 + (N+1) * Interval}
|| N <- lists:seq(0, ?TIMESLICES-1) ] ++
[{Start0 + ?TIMESLICES * Interval, End0}],

[begin
%% We have to clear log as the message 'Finished garbage
%% col...' has been output many times before, during this
%% test.
rtcs:reset_log(CSNode),

lager:debug("GC: (start, end) = (~p, ~p)", [S0, E0]),
S = rtcs:iso8601(S0),
E = rtcs:iso8601(E0),
BatchCmd = "batch -s " ++ S ++ " -e " ++ E,
rtcs:gc(1, BatchCmd),

true = rt:expect_in_log(CSNode,
"Finished garbage collection: \\d+ seconds, "
"\\d+ batch_count, 0 batch_skips, "
"\\d+ manif_count, \\d+ block_count")
end || {S0, E0} <- Starts],
lager:info("GC target period: (~p, ~p)", [Start0, End0]),
%% Reap!
timer:sleep(3000),
GCPbc = rtcs:pbc(RiakNodes, objects, ?TEST_BUCKET),
{ok, Keys} = riakc_pb_socket:list_keys(GCPbc, ?GC_BUCKET),
lager:debug("Keys: ~p", [Keys]),
StartKey = list_to_binary(integer_to_list(Start0)),
EndKey = list_to_binary(integer_to_list(End0)),
EndKeyHPF = fun(Key) -> EndKey < Key end,
StartKeyLPF = fun(Key) -> Key < StartKey end,
BPF = fun(Key) -> StartKey < Key andalso Key < EndKey end,

lager:debug("Remaining Keys: ~p", [Keys]),
lager:debug("HPF result: ~p", [lists:filter(EndKeyHPF, Keys)]),
lager:debug("LPF result: ~p", [lists:filter(StartKeyLPF, Keys)]),
?assertEqual(3, length(lists:filter(EndKeyHPF, Keys))),
?assertEqual(3, length(lists:filter(StartKeyLPF, Keys))),
?assertEqual([], lists:filter(BPF, Keys)),
ok.
22 changes: 14 additions & 8 deletions src/riak_cs_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
set_leeway_seconds/1,
max_scheduled_delete_manifests/0,
move_manifests_to_gc_bucket/2,
timestamp/0]).
timestamp/0,
default_batch_end/2]).

%% export for repl debugging and testing
-export([get_active_manifests/3]).
Expand Down Expand Up @@ -254,14 +255,15 @@ gc_max_workers() ->
%% @doc Return the start of GC epoch represented as a binary.
%% This is the time that the GC daemon uses to begin collecting keys
%% from the `riak-cs-gc' bucket.
-spec epoch_start() -> binary().
-spec epoch_start() -> non_neg_integer().
epoch_start() ->
case application:get_env(riak_cs, epoch_start) of
undefined ->
?EPOCH_START;
{ok, EpochStart} ->
EpochStart
end.
Bin = case application:get_env(riak_cs, epoch_start) of
undefined ->
?EPOCH_START;
{ok, EpochStart} ->
EpochStart
end,
list_to_integer(binary_to_list(Bin)).

%% @doc Return the minimum number of seconds a file manifest waits in
%% the `scheduled_delete' state before being garbage collected.
Expand Down Expand Up @@ -303,6 +305,10 @@ timestamp() ->
timestamp(ErlangTime) ->
riak_cs_utils:second_resolution_timestamp(ErlangTime).

-spec default_batch_end(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
default_batch_end(BatchStart, Leeway) ->
BatchStart - Leeway.

%%%===================================================================
%%% Internal functions
%%%===================================================================
Expand Down
70 changes: 56 additions & 14 deletions src/riak_cs_gc_batch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,40 @@ stop(Pid) ->

%% @doc Read the storage schedule and go to idle.

init([State]) ->
{ok, prepare, State, 0}.

has_batch_finished(?STATE{worker_pids=[],
batch=[],
key_list_state=KeyListState} = _State) ->
case KeyListState of
undefined -> true;
_ -> not riak_cs_gc_key_list:has_next(KeyListState)
init([#gc_batch_state{
batch_start=BatchStart,
start_key=StartKey,
end_key=EndKey,
leeway=Leeway,
max_workers=MaxWorkers} = State])
when
%% StartKey can be negative as <<"-">> is smaller than any numeric digits
0 =< StartKey andalso
%% EndKey cannot be like 300, or would collect all gc keys
StartKey =< EndKey ->
case riak_cs_gc:default_batch_end(BatchStart, Leeway) of
DefaultEndKey when EndKey =< DefaultEndKey andalso
1000000000 =< EndKey ->
%% StartKey < EndKey
%% EndKey <= BatchStart - Leeway
_ = lager:info("Starting garbage collection: "
"(start, end) = (~p, ~p), "
"leeway=~p, batch_start=~p, max_workers=~p",
[StartKey, EndKey, Leeway, BatchStart, MaxWorkers]),
{ok, prepare, State, 0};
DefaultEndKey ->
_ = lager:error("GC did not start: "
"End of GC target period was too recent (~p > ~p)",
[EndKey, DefaultEndKey]),
{stop, {error, invalid_gc_end_key}}
end;
has_batch_finished(_) ->
false.
init([#gc_batch_state{start_key=StartKey,
end_key=EndKey}]) ->
_ = lager:error("GC did not start due to wrong GC target period: "
"(start, end) = (~p, ~p)",
[StartKey, EndKey]),
{stop, {error, invalid_gc_start_key}}.


%% Asynchronous events

Expand Down Expand Up @@ -163,6 +185,14 @@ terminate(normal, _StateName, State) ->
State?STATE.batch_skips, State?STATE.manif_count,
State?STATE.block_count]),
riak_cs_gc_manager:finished(State);
terminate(cancel, _StateName, State) ->
_ = lager:warning("Garbage collection has been canceled: "
"~b seconds, ~p batch_count, ~p batch_skips, "
"~p manif_count, ~p block_count\n",
[elapsed(State?STATE.batch_start), State?STATE.batch_count,
State?STATE.batch_skips, State?STATE.manif_count,
State?STATE.block_count]),
ok;
terminate(_Reason, _StateName, _State) ->
ok.

Expand All @@ -174,8 +204,20 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%%% Internal functions
%%%===================================================================

fetch_first_keys(?STATE{batch_start=BatchStart,
leeway=Leeway} = State) ->
has_batch_finished(?STATE{worker_pids=[],
batch=[],
key_list_state=KeyListState} = _State) ->
case KeyListState of
undefined -> true;
_ -> not riak_cs_gc_key_list:has_next(KeyListState)
end;
has_batch_finished(_) ->
false.

fetch_first_keys(?STATE{batch_start=_BatchStart,
start_key=StartKey,
end_key=EndKey,
leeway=_Leeway} = State) ->

%% [Fetch the first set of manifests for deletion]
%% this does not check out a worker from the riak connection pool;
Expand All @@ -186,7 +228,7 @@ fetch_first_keys(?STATE{batch_start=BatchStart,
%% connection, and avoids duplicating the configuration lookup
%% code.
{KeyListRes, KeyListState} =
riak_cs_gc_key_list:new(BatchStart, Leeway),
riak_cs_gc_key_list:new(StartKey, EndKey),
#gc_key_list_result{bag_id=BagId, batch=Batch} = KeyListRes,
_ = lager:debug("Initial batch keys: ~p", [Batch]),
State?STATE{batch=Batch,
Expand Down
Loading

0 comments on commit 49669e5

Please sign in to comment.