Skip to content

Commit

Permalink
Use 'RR' when a prunable vclock is replicated (#1866)
Browse files Browse the repository at this point in the history
* Use 'RR' when a prunable vclock is replicated

There may be some situations whereby a vector clock grows beyond the prescribed limits on the source cluster - in particular following read repair.

In these cases the new object needs to be replicated but with the same resulting vector clock (assuming no siblings).  If the same vector clock does not result on the sink - any full-sync operation may continuously detect the  delta, but not be able to resolve it (as the sink vnodes prune each time).

The 'rr' option will, on riak_kv_vnode, ensure pruning is bypassed so that we avoid pruning on a sink, if we have not pruned on a source.  The 'rr' option is only used when the clock is prunable (as otherwise the delta could occur in the reverse direction).

The 'rr' option also blocks some sibling constraint checks (e.g. maximum number of siblings.  However, as the most likely cause of it being applied is 'rr' on the src side - this is still generally a win for replication consistency.

* Switch logic to put_fsm

Already know bucket props at this point.  case only to be considered when `asis` - so should also work for riak_repl aae full-sync

* Lose a line
  • Loading branch information
martinsumner committed Jul 18, 2023
1 parent e80a069 commit ba58a97
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions src/riak_kv_put_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -396,21 +396,39 @@ validate(timeout, StateData0 = #state{from = {raw, ReqId, _Pid},
{[{returnbody,true}], false}
end
end,
RObj = apply_updates(RObj0, Options),
RR =
case get_option(asis, Options) of
true ->
Clock = riak_object:vclock(RObj),
MaybePrunedClock =
vclock:prune(
Clock, riak_core_util:moment(), BucketProps),
case {length(MaybePrunedClock), length(Clock)} of
{PVL, UPVL} when PVL < UPVL ->
true;
_ ->
false
end;
_ ->
false
end,
PutCore = riak_kv_put_core:init(N, W, PW, NodeConfirms, DW,
AllowMult,
ReturnBody,
IdxType),
Options1 = lists:keydelete(sync_on_write, 1, Options),
Options2 = [{sync_on_write, SyncOnWrite}|Options1],
VNodeOpts = handle_options(Options2, VNodeOpts0),
Options3 = [{rr, RR}|Options2],
VNodeOpts = handle_options(Options3, VNodeOpts0),
StateData = StateData0#state{n=N,
w=W,
pw=PW, node_confirms=NodeConfirms, dw=DW,
allowmult=AllowMult,
precommit = Precommit,
postcommit = Postcommit,
req_id = ReqId,
robj = apply_updates(RObj0, Options),
robj = RObj,
putcore = PutCore,
vnode_options = VNodeOpts,
timeout = Timeout},
Expand Down Expand Up @@ -777,6 +795,8 @@ handle_options([{K, _V} = Opt|T], Acc)
when K == sloppy_quorum; K == n_val ->
%% Take these options as-is
handle_options(T, [Opt|Acc]);
handle_options([{rr, true}|T], Acc) ->
handle_options(T, [{rr, true}|Acc]);
handle_options([{_,_}|T], Acc) -> handle_options(T, Acc).

%% Invokes the hook and returns a tuple of
Expand Down Expand Up @@ -970,14 +990,15 @@ late_put_fsm_coordinator_ack(_Node) ->

-spec get_bucket_props(riak_object:bucket()) -> list().
get_bucket_props(Bucket) ->
{ok, DefaultProps} = application:get_env(riak_core,
default_bucket_props),
{ok, DefaultProps} = application:get_env(riak_core, default_bucket_props),
BucketProps = riak_core_bucket:get_bucket(Bucket),
%% typed buckets never fall back to defaults
case is_tuple(Bucket) of
false ->
lists:keymerge(1, lists:keysort(1, BucketProps),
lists:keysort(1, DefaultProps));
lists:keymerge(
1,
lists:keysort(1, BucketProps),
lists:keysort(1, DefaultProps));
true ->
BucketProps
end.
Expand Down

0 comments on commit ba58a97

Please sign in to comment.