Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98481: kvserver: revert recent changes to reproposals r=pavelkalinnikov a=tbg

Reverts #97606, #97564, #94825, #94633.

- Revert "kvserver: disable assertion 'finished proposal inserted'"
- Revert "kvserver: narrow down 'finishing a proposal with outstanding reproposal'"
- Revert "kvserver: fill gaps in comment near tryReproposeWithNewLeaseIndex"
- Revert "kvserver: hoist early return out of tryReproposeWithNewLeaseIndex"
- Revert "fixup! kvserver: prevent finished proposal from being present in proposals map"
- Revert "kvserver: prevent finished proposal from being present in proposals map"
- Revert "kvserver: improve reproposal assertions and documentation"

Closes #97973.

Epic: CRDB-25287
Release Note: none


98537: sql: check row level ttl change before truncating a table r=chengxiong-ruan a=chengxiong-ruan

Fixes: #93443

Release note (sql change): This commit fixed a bug where crdb paniced wehn user tried to truncate a table which is has an ongoing row level ttl change. We still don't support table truncates in this scenario, but a more gentle unimplemented error is returned instead of panic.

98575: cdc: use int64 for emitted bytes telemetry r=miretskiy a=jayshrivastava

Previously, the stored `emitted_bytes` field was an int32, which can hold a maximum value of 2.1GB. This value is too small because the logging period is 24h and changefeeds can emit much more than 2.1GB in 24h. This change updates the field to be an int64, which solves this problem.

Epic: None
Release note: None

98582: ci: allow-list `BUILD_VCS_NUMBER` env var in cloud unit tests r=jlinder a=rickystewart

This job was filing issues linking to the wrong commit.

Epic: none
Release note: None

Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Chengxiong Ruan <chengxiongruan@gmail.com>
Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
Co-authored-by: Ricky Stewart <rickybstewart@gmail.com>
  • Loading branch information
5 people committed Mar 14, 2023
5 parents ca5ae38 + 7317008 + 05f5bf2 + b0508fe + 5fc67c1 commit 3b326c3
Show file tree
Hide file tree
Showing 21 changed files with 310 additions and 264 deletions.
2 changes: 1 addition & 1 deletion build/teamcity/cockroach/nightlies/cloud_unit_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ source "$dir/teamcity-support.sh" # For $root
source "$dir/teamcity-bazel-support.sh" # For run_bazel

tc_start_block "Run cloud unit tests"
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e GITHUB_API_TOKEN -e GITHUB_REPO -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_NAME -e GOOGLE_LIMITED_KEY_ID -e ASSUME_SERVICE_ACCOUNT -e GOOGLE_LIMITED_BUCKET -e ASSUME_SERVICE_ACCOUNT_CHAIN -e AWS_DEFAULT_REGION -e AWS_SHARED_CREDENTIALS_FILE -e AWS_CONFIG_FILE -e AWS_S3_BUCKET -e AWS_ASSUME_ROLE -e AWS_ROLE_ARN_CHAIN -e AWS_KMS_KEY_ARN -e AWS_S3_ENDPOINT -e AWS_KMS_ENDPOINT -e AWS_KMS_REGION -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e AZURE_ACCOUNT_KEY -e AZURE_ACCOUNT_NAME -e AZURE_CONTAINER -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_TENANT_ID -e AZURE_VAULT_NAME -e AZURE_KMS_KEY_NAME -e AZURE_KMS_KEY_VERSION" \
BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e GITHUB_API_TOKEN -e GITHUB_REPO -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_NAME -e GOOGLE_LIMITED_KEY_ID -e ASSUME_SERVICE_ACCOUNT -e GOOGLE_LIMITED_BUCKET -e ASSUME_SERVICE_ACCOUNT_CHAIN -e AWS_DEFAULT_REGION -e AWS_SHARED_CREDENTIALS_FILE -e AWS_CONFIG_FILE -e AWS_S3_BUCKET -e AWS_ASSUME_ROLE -e AWS_ROLE_ARN_CHAIN -e AWS_KMS_KEY_ARN -e AWS_S3_ENDPOINT -e AWS_KMS_ENDPOINT -e AWS_KMS_REGION -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e AZURE_ACCOUNT_KEY -e AZURE_ACCOUNT_NAME -e AZURE_CONTAINER -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_TENANT_ID -e AZURE_VAULT_NAME -e AZURE_KMS_KEY_NAME -e AZURE_KMS_KEY_VERSION -e BUILD_VCS_NUMBER" \
run_bazel build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh "$@"
tc_end_block "Run cloud unit tests"
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (ptl *periodicTelemetryLogger) recordEmittedBytes(numBytes int) {
ptl.sinkTelemetryData.emittedBytes.Add(int64(numBytes))
}

func (ptl *periodicTelemetryLogger) resetEmittedBytes() int {
return int(ptl.sinkTelemetryData.emittedBytes.Swap(0))
func (ptl *periodicTelemetryLogger) resetEmittedBytes() int64 {
return ptl.sinkTelemetryData.emittedBytes.Swap(0)
}

// recordEmittedBytes implements the telemetryLogger interface.
Expand All @@ -96,7 +96,7 @@ func (ptl *periodicTelemetryLogger) maybeFlushLogs() {
continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{
CommonChangefeedEventDetails: ptl.changefeedDetails,
JobId: int64(ptl.job.ID()),
EmittedBytes: int32(ptl.resetEmittedBytes()),
EmittedBytes: ptl.resetEmittedBytes(),
LoggingInterval: loggingInterval,
}
log.StructuredEvent(ptl.ctx, continuousTelemetryEvent)
Expand All @@ -111,7 +111,7 @@ func (ptl *periodicTelemetryLogger) close() {
continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{
CommonChangefeedEventDetails: ptl.changefeedDetails,
JobId: int64(ptl.job.ID()),
EmittedBytes: int32(ptl.resetEmittedBytes()),
EmittedBytes: ptl.resetEmittedBytes(),
LoggingInterval: loggingInterval,
Closing: true,
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 9 additions & 24 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,30 +248,15 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {

iter := t.dec.NewCommandIter()
for iter.Valid() {
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ type Replica struct {
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf

// proposals stores the Raft in-flight commands which originated at this
// Replica, i.e. all commands for which propose has been called, but which
// have not yet applied. A proposal is "pending" until it is "finalized",
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
!cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex)

cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
Expand Down
152 changes: 61 additions & 91 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
cmd.response.Err = pErr
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
// Reset the error as it's now going to be determined by the outcome of
// reproposing (or not).
// reproposing (or not); note that tryReproposeWithNewLeaseIndex will
// return `nil` if the entry is not eligible for reproposals.
//
// Note that if pErr remains nil, we will mark the proposal as non-local at
// the end of this block and return, so we're not hitting an NPE near the end
// of this method where we're attempting to reach into `cmd.proposal`.
// If pErr gets "reset" here as a result, we will mark the proposal as
// non-local at the end of this block and return, so we're not hitting an
// NPE near the end of this method where we're attempting to reach into
// `cmd.proposal`.
//
// This control flow is sketchy but it preserves existing behavior
// that would be too risky to change at the time of writing.
//
pErr = nil
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
Expand Down Expand Up @@ -162,10 +163,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
//
// These many possible worlds are a major source of complexity, a
// reduction of which is postponed.
if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
}

pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
// entry is not superseded (i.e. we don't have a reproposal at a higher
Expand All @@ -183,40 +181,29 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// It is thus safe to signal the error back to the client, which is also
// the only sensible choice at this point.
//
// We also know that the proposal is not in the proposals map, since the
// command is local and wasn't superseded, which is the condition in
// retrieveLocalProposals for removing from the map. So we're not leaking
// a map entry here, which we assert against below (and which has coverage,
// at time of writing, through TestReplicaReproposalWithNewLeaseIndexError).
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr

// This assertion can mis-fire if we artificially inject invalid LAIs
// during proposal, see the explanatory comment above. If, in the
// current app batch, we had two identical copies of an entry (which
// maps to a local proposal), and the LAI was stale, then both entries
// would be local. The first could succeed to repropose, which, if the
// propBuf was full, would immediately insert into the proposals map.
// Normally, when we then apply the second entry, it would be superseded
// and not hit the assertion. But, if we injected a stale LAI during
// this last reproposal, we could "accidentally" assign the same LAI
// again. The second entry would then try to repropose again, which is
// fine, but it could bump into the closed timestamp, get an error,
// enter this branch, and then trip the assertion.
// Note that the proposal may or may not be in the proposals map at this
// point. For example, if we artificially inject invalid LAIs during
// proposal, see the explanatory comment above. If, in the current app
// batch, we had two identical copies of an entry (which maps to a local
// proposal), and the LAI was stale, then both entries would be local.
// The first could succeed to repropose, which, if the propBuf was full,
// would immediately insert into the proposals map. Normally, when we
// then apply the second entry, it would be superseded and not hit the
// assertion. But, if we injected a stale LAI during this last
// reproposal, we could "accidentally" assign the same LAI again. The
// second entry would then try to repropose again, which is fine, but it
// could bump into the closed timestamp, get an error, and now we are in
// a situation where a reproposal attempt failed with the proposal being
// present in the map.
//
// For proposed simplifications, see:
// https://github.com/cockroachdb/cockroach/issues/97633
r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()

if inMap {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr
} else {
// Unbind the entry's local proposal because we just succeeded in
// reproposing it or decided not to repropose. Either way, we don't want
// to acknowledge the client yet.
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
cmd.proposal = nil
return
}
Expand All @@ -237,82 +224,65 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

// reproposer is used by tryReproposeWithNewLeaseIndex.
type reproposer interface {
trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken)
propose(context.Context, *ProposalData, TrackedRequestToken) *kvpb.Error
newNotLeaseHolderError(string) *kvpb.NotLeaseHolderError
}

type replicaReproposer Replica

var _ reproposer = (*replicaReproposer)(nil)

func (r *replicaReproposer) trackEvaluatingRequest(
ctx context.Context, wts hlc.Timestamp,
) (hlc.Timestamp, TrackedRequestToken) {
// NB: must not hold r.mu here, the propBuf acquires it itself.
return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts)
}

func (r *replicaReproposer) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) *kvpb.Error {
return (*Replica)(r).propose(ctx, p, tok)
}

func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHolderError {
r.mu.RLock()
defer r.mu.RUnlock()
return kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
msg,
)
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// The caller must already have checked that the entry is local and not
// superseded, and that it was rejected with an illegal lease index error.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}

// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go back to DistSender, so send something it can digest.
return kvpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp"))
// will go to back to DistSender, so send something it can digest.
err := kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return kvpb.NewError(err)
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// Reset the command for reproposal.
prevMaxLeaseIndex := p.command.MaxLeaseIndex
prevEncodedCommand := p.encodedCommand
p.command.MaxLeaseIndex = 0
p.encodedCommand = nil
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
// On error, reset the fields we zeroed out to their old value.
// This ensures that the proposal doesn't count as Superseded
// now.
p.command.MaxLeaseIndex = prevMaxLeaseIndex
p.encodedCommand = prevEncodedCommand
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
Expand Down
Loading

0 comments on commit 3b326c3

Please sign in to comment.