Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: handle future-time operations for conflict resolution requests #59693

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func EndTxn(
return result.Result{}, roachpb.NewTransactionStatusError("could not commit in one phase as requested")
}
if args.Commit && args.Poison {
return result.Result{}, errors.Errorf("cannot poison during a committing EndTxn request")
return result.Result{}, errors.AssertionFailedf("cannot poison during a committing EndTxn request")
}

key := keys.TransactionKey(h.Txn.Key, h.Txn.ID)
Expand Down Expand Up @@ -1089,11 +1089,11 @@ func mergeTrigger(
) (result.Result, error) {
desc := rec.Desc()
if !bytes.Equal(desc.StartKey, merge.LeftDesc.StartKey) {
return result.Result{}, errors.Errorf("LHS range start keys do not match: %s != %s",
return result.Result{}, errors.AssertionFailedf("LHS range start keys do not match: %s != %s",
desc.StartKey, merge.LeftDesc.StartKey)
}
if !desc.EndKey.Less(merge.LeftDesc.EndKey) {
return result.Result{}, errors.Errorf("original LHS end key is not less than the post merge end key: %s >= %s",
return result.Result{}, errors.AssertionFailedf("original LHS end key is not less than the post merge end key: %s >= %s",
desc.EndKey, merge.LeftDesc.EndKey)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Migrate(

fn, ok := migrationRegistry[migrationVersion]
if !ok {
return result.Result{}, errors.Newf("migration for %s not found", migrationVersion)
return result.Result{}, errors.AssertionFailedf("migration for %s not found", migrationVersion)
}
pd, err := fn(ctx, readWriter, cArgs)
if err != nil {
Expand Down
38 changes: 18 additions & 20 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,25 @@ func PushTxn(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if h.Timestamp.Less(args.PushTo) {
// Verify that the PushTxn's timestamp is not less than the timestamp that
// the request intends to push the transaction to. Transactions should not
// be pushed into the future or their effect may not be fully reflected in
// a future leaseholder's timestamp cache. This is analogous to how reads
// should not be performed at a timestamp in the future.
return result.Result{}, errors.Errorf("request timestamp %s less than PushTo timestamp %s", h.Timestamp, args.PushTo)
if h.WriteTimestamp().Less(args.PushTo) {
// Verify that the PushTxn's header timestamp (the one checked against
// the lease) is not less than the timestamp that the request intends to
// push the transaction to. Transactions should not be pushed into the
// future past the leaseholder's lease expiration or their effect may
// not be fully reflected in a future leaseholder's timestamp cache.
return result.Result{}, errors.AssertionFailedf("request timestamp %s less than PushTo timestamp %s",
h.Timestamp, args.PushTo)
}
if h.Timestamp.Less(args.PusheeTxn.WriteTimestamp) {
// This condition must hold for the timestamp cache access/update to be safe.
return result.Result{}, errors.Errorf("request timestamp %s less than pushee txn timestamp %s", h.Timestamp, args.PusheeTxn.WriteTimestamp)
}
now := cArgs.EvalCtx.Clock().Now()
// TODO(nvanbenschoten): remove this limitation. But when doing so,
// keep the h.Timestamp.Less(args.PushTo) check above.
if now.Less(h.Timestamp) {
// The batch's timestamp should have been used to update the clock.
return result.Result{}, errors.Errorf("request timestamp %s less than current clock time %s", h.Timestamp, now)
if h.WriteTimestamp().Less(args.PusheeTxn.MinTimestamp) {
// This condition must hold for the timestamp cache access in
// SynthesizeTxnFromMeta and the timestamp cache update in
// Replica.updateTimestampCache to be safe.
return result.Result{}, errors.AssertionFailedf("request timestamp %s less than pushee txn MinTimestamp %s",
h.Timestamp, args.PusheeTxn.MinTimestamp)
}
if !bytes.Equal(args.Key, args.PusheeTxn.Key) {
return result.Result{}, errors.Errorf("request key %s should match pushee txn key %s", args.Key, args.PusheeTxn.Key)
return result.Result{}, errors.AssertionFailedf("request key %s should match pushee txn key %s",
args.Key, args.PusheeTxn.Key)
}
key := keys.TransactionKey(args.PusheeTxn.Key, args.PusheeTxn.ID)

Expand Down Expand Up @@ -220,7 +218,7 @@ func PushTxn(
var reason string

switch {
case txnwait.IsExpired(now, &reply.PusheeTxn):
case txnwait.IsExpired(cArgs.EvalCtx.Clock().Now(), &reply.PusheeTxn):
reason = "pushee is expired"
// When cleaning up, actually clean up (as opposed to simply pushing
// the garbage in the path of future writers).
Expand Down Expand Up @@ -288,7 +286,7 @@ func PushTxn(
// timestamp beneath this timestamp.
reply.PusheeTxn.WriteTimestamp.Forward(args.PushTo)
default:
return result.Result{}, errors.Errorf("unexpected push type: %v", pushType)
return result.Result{}, errors.AssertionFailedf("unexpected push type: %v", pushType)
}

// If the transaction record was already present, persist the updates to it.
Expand Down
22 changes: 19 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

func init() {
Expand Down Expand Up @@ -50,6 +51,24 @@ func QueryIntent(
h := cArgs.Header
reply := resp.(*roachpb.QueryIntentResponse)

ownTxn := false
if h.Txn != nil {
// Determine if the request is querying an intent in its own
// transaction. If not, the request is rejected as querying one
// transaction's intent from within another transaction is unsupported.
if h.Txn.ID == args.Txn.ID {
ownTxn = true
} else {
return result.Result{}, ErrTransactionUnsupported
}
}
if h.WriteTimestamp().Less(args.Txn.WriteTimestamp) {
// This condition must hold for the timestamp cache update in
// Replica.updateTimestampCache to be safe.
return result.Result{}, errors.AssertionFailedf("QueryIntent request timestamp %s less than txn WriteTimestamp %s",
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read at the specified key at the maximum timestamp. This ensures that we
// see an intent if one exists, regardless of what timestamp it is written
// at.
Expand All @@ -66,9 +85,6 @@ func QueryIntent(
return result.Result{}, err
}

// Determine if the request is querying an intent in its own transaction.
ownTxn := h.Txn != nil && h.Txn.ID == args.Txn.ID

var curIntentPushed bool
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_query_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ func QueryTxn(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if h.Timestamp.Less(args.Txn.WriteTimestamp) {
// This condition must hold for the timestamp cache access to be safe.
return result.Result{}, errors.Errorf("QueryTxn request timestamp %s less than txn timestamp %s",
h.Timestamp, args.Txn.WriteTimestamp)
if h.WriteTimestamp().Less(args.Txn.MinTimestamp) {
// This condition must hold for the timestamp cache access in
// SynthesizeTxnFromMeta to be safe.
return result.Result{}, errors.AssertionFailedf("QueryTxn request timestamp %s less than txn MinTimestamp %s",
h.Timestamp, args.Txn.MinTimestamp)
}
if !args.Key.Equal(args.Txn.Key) {
return result.Result{}, errors.Errorf("QueryTxn request key %s does not match txn key %s",
return result.Result{}, errors.AssertionFailedf("QueryTxn request key %s does not match txn key %s",
args.Key, args.Txn.Key)
}
key := keys.TransactionKey(args.Txn.Key, args.Txn.ID)
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ func RecoverTxn(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if h.Timestamp.Less(args.Txn.WriteTimestamp) {
// This condition must hold for the timestamp cache access/update to be safe.
return result.Result{}, errors.Errorf("RecoverTxn request timestamp %s less than txn timestamp %s",
h.Timestamp, args.Txn.WriteTimestamp)
if h.WriteTimestamp().Less(args.Txn.MinTimestamp) {
// This condition must hold for the timestamp cache access in
// SynthesizeTxnFromMeta and the timestamp cache update in
// Replica.updateTimestampCache to be safe.
return result.Result{}, errors.AssertionFailedf("RecoverTxn request timestamp %s less than txn MinTimestamp %s",
h.Timestamp, args.Txn.MinTimestamp)
}
if !args.Key.Equal(args.Txn.Key) {
return result.Result{}, errors.Errorf("RecoverTxn request key %s does not match txn key %s",
return result.Result{}, errors.AssertionFailedf("RecoverTxn request key %s does not match txn key %s",
args.Key, args.Txn.Key)
}
key := keys.TransactionKey(args.Txn.Key, args.Txn.ID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RefreshRange(
h := cArgs.Header

if h.Txn == nil {
return result.Result{}, errors.Errorf("no transaction specified to %s", args.Method())
return result.Result{}, errors.AssertionFailedf("no transaction specified to %s", args.Method())
}

// We're going to refresh up to the transaction's read timestamp.
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

func init() {
Expand Down Expand Up @@ -68,7 +67,7 @@ func RevertRange(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
if cArgs.Header.Txn != nil {
return result.Result{}, errors.New("cannot execute RevertRange within a transaction")
return result.Result{}, ErrTransactionUnsupported
}
log.VEventf(ctx, 2, "RevertRange %+v", cArgs.Args)

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func Subsume(
desc := cArgs.EvalCtx.Desc()
if !bytes.Equal(desc.StartKey, args.RightDesc.StartKey) ||
!bytes.Equal(desc.EndKey, args.RightDesc.EndKey) {
return result.Result{}, errors.Errorf("RHS range bounds do not match: %s != %s",
return result.Result{}, errors.AssertionFailedf("RHS range bounds do not match: %s != %s",
args.RightDesc, desc)
}

// Sanity check that the requesting range is our left neighbor. The ordering
// of operations in the AdminMerge transaction should make it impossible for
// these ranges to be nonadjacent, but double check.
if !bytes.Equal(args.LeftDesc.EndKey, desc.StartKey) {
return result.Result{}, errors.Errorf("ranges are not adjacent: %s != %s",
return result.Result{}, errors.AssertionFailedf("ranges are not adjacent: %s != %s",
args.LeftDesc.EndKey, desc.StartKey)
}

Expand All @@ -97,13 +97,13 @@ func Subsume(
if err != nil {
return result.Result{}, errors.Errorf("fetching local range descriptor: %s", err)
} else if intent == nil {
return result.Result{}, errors.New("range missing intent on its local descriptor")
return result.Result{}, errors.AssertionFailedf("range missing intent on its local descriptor")
}
val, _, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, cArgs.Header.Timestamp, intent.Txn)
if err != nil {
return result.Result{}, errors.Errorf("fetching local range descriptor as txn: %s", err)
} else if val != nil {
return result.Result{}, errors.New("non-deletion intent on local range descriptor")
return result.Result{}, errors.AssertionFailedf("non-deletion intent on local range descriptor")
}

// We prevent followers of the RHS from being able to serve follower reads on
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (

// ErrTransactionUnsupported is returned when a non-transactional command is
// evaluated in the context of a transaction.
var ErrTransactionUnsupported = errors.New("not supported within a transaction")
var ErrTransactionUnsupported = errors.AssertionFailedf("not supported within a transaction")

// VerifyTransaction runs sanity checks verifying that the transaction in the
// header and the request are compatible.
func VerifyTransaction(
h roachpb.Header, args roachpb.Request, permittedStatuses ...roachpb.TransactionStatus,
) error {
if h.Txn == nil {
return errors.Errorf("no transaction specified to %s", args.Method())
return errors.AssertionFailedf("no transaction specified to %s", args.Method())
}
if !bytes.Equal(args.Header().Key, h.Txn.Key) {
return errors.Errorf("request key %s should match txn key %s", args.Header().Key, h.Txn.Key)
return errors.AssertionFailedf("request key %s should match txn key %s", args.Header().Key, h.Txn.Key)
}
statusPermitted := false
for _, s := range permittedStatuses {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,18 @@ func (ir *IntentResolver) MaybePushTransactions(
log.Eventf(ctx, "pushing %d transaction(s)", len(pushTxns))

// Attempt to push the transaction(s).
pushTo := h.Timestamp.Next()
b := &kv.Batch{}
b.Header.Timestamp = ir.clock.Now()
b.Header.Timestamp.Forward(pushTo)
for _, pushTxn := range pushTxns {
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pushTxn.Key,
},
PusherTxn: pusherTxn,
PusheeTxn: *pushTxn,
PushTo: h.Timestamp.Next(),
PushTo: pushTo,
PushType: pushType,
})
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ go_library(
embed = [":kvserverpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb",
visibility = ["//visibility:public"],
deps = ["//pkg/roachpb"],
deps = [
"//pkg/roachpb",
"//pkg/util/hlc",
],
)

proto_library(
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package kvserverpb

import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// IsValid returns whether the lease was valid at the time that the
// lease status was computed.
Expand All @@ -22,3 +25,15 @@ func (st LeaseStatus) IsValid() bool {
func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool {
return st.Lease.OwnedBy(storeID)
}

// Expiration returns the expiration of the lease.
func (st LeaseStatus) Expiration() hlc.Timestamp {
switch st.Lease.Type() {
case roachpb.LeaseExpiration:
return st.Lease.GetExpiration()
case roachpb.LeaseEpoch:
return st.Liveness.Expiration.ToTimestamp()
default:
panic("unexpected")
}
}
51 changes: 5 additions & 46 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,11 @@ func (r *Replica) checkExecutionCanProceed(
}
} else {
// If the request is a write or a consistent read, it requires the
// replica serving it to hold the range lease.
st, shouldExtend, err = r.leaseGoodToGoRLocked(ctx, now, ba.Timestamp)
// replica serving it to hold the range lease. We pass the write
// timestamp of the request because this is the maximum timestamp that
// the request will operate at, ignoring the uncertainty interval, which
// is already accounted for in LeaseStatus's stasis period handling.
st, shouldExtend, err = r.leaseGoodToGoRLocked(ctx, now, ba.WriteTimestamp())
if err != nil {
// If not, can we serve this request on a follower?
// TODO(nvanbenschoten): once we make this check cheaper
Expand Down Expand Up @@ -1471,50 +1474,6 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool {
r.mu.replicaID > rightDesc.ReplicaID
}

// TODO(nvanbenschoten): move endCmds to replica_send.go.

// endCmds holds necessary information to end a batch after Raft
// command processing.
type endCmds struct {
repl *Replica
g *concurrency.Guard
}

// move moves the endCmds into the return value, clearing and making
// a call to done on the receiver a no-op.
func (ec *endCmds) move() endCmds {
res := *ec
*ec = endCmds{}
return res
}

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
//
// No-op if the receiver has been zeroed out by a call to move.
// Idempotent and is safe to call more than once.
func (ec *endCmds) done(
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) {
if ec.repl == nil {
// The endCmds were cleared.
return
}
defer ec.move() // clear

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed.
ec.repl.updateTimestampCache(ctx, ba, br, pErr)

// Release the latches acquired by the request and exit lock wait-queues.
// Must be done AFTER the timestamp cache is updated. ec.g is only set when
// the Raft proposal has assumed responsibility for the request.
if ec.g != nil {
ec.repl.concMgr.FinishReq(ec.g)
}
}

// maybeWatchForMerge checks whether a merge of this replica into its left
// neighbor is in its critical phase and, if so, arranges to block all requests,
// except for read-only requests that are older than `freezeStart`, until the
Expand Down
Loading