Skip to content

Commit

Permalink
Merge #60836 #60992
Browse files Browse the repository at this point in the history
60836: opt: support UPDATE with partial UNIQUE WITHOUT INDEX constraints r=mgartner a=mgartner

This commit add uniqueness checks for partial `UNIQUE WITHOUT INDEX`
constraints during `UPDATE` statements.

As partial of this change, I discovered that #60535 introduced a
regression where columns not required by uniqueness checks are not
pruned. I've left TODOs in the column pruning tests and plan on fixing
this in a follow-up PR.

There is no release note because these constraints are gated behind the
experimental_enable_unique_without_index_constraints session variable.

Release note: None

60992: kv: make RaftCommand.ClosedTimestamp nullable r=nvanbenschoten a=nvanbenschoten

Fixes #60852.
Fixes #60833.
Fixes #58298.
Fixes #59428.
Fixes #60756.
Fixes #60848.
Fixes #60849.

In #60852 and related issues, we saw that the introduction of a non-nullable `RaftCommand.ClosedTimestamp`, coupled with the `ClosedTimestampFooter` encoding strategy we use, led to encoded `RaftCommand` protos with their ClosedTimestamp field set twice. This is ok from a correctness perspective, at least as protobuf is concerned, but it led to a subtle interaction where the process of passing through sideloading (`maybeInlineSideloadedRaftCommand(maybeSideloadEntriesImpl(e))`) would reduce the size of an encoded RaftCommand by 3 bytes (the encoded size of an empty `hlc.Timestamp`). This was resulting in an `uncommittedSize` leak in Raft, which was eventually stalling on its `MaxUncommittedEntriesSize` limit.

This commit fixes this issue by making `RaftCommand.ClosedTimestamp` nullable. With the field marked as nullable, it will no longer be encoded as an empty timestamp when unset, ensuring that when the encoded `ClosedTimestampFooter` is appended, it contains the only instance of the `ClosedTimestamp` field.

cc. @cockroachdb/bulk-io 

Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Feb 23, 2021
3 parents 0edd7e8 + e5d499a + e46511a commit a660767
Show file tree
Hide file tree
Showing 16 changed files with 1,246 additions and 273 deletions.
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3356,15 +3356,15 @@ func TestProposalOverhead(t *testing.T) {
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
// NB: the expected overhead reflects the space overhead currently
// present in Raft commands. This test will fail if that overhead
// changes. Try to make this number go down and not up. It slightly
// undercounts because our proposal filter is called before
// maxLeaseIndex is filled in. The difference between the user and system
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
// NB: the expected overhead reflects the space overhead currently present
// in Raft commands. This test will fail if that overhead changes. Try to
// make this number go down and not up. It slightly undercounts because our
// proposal filter is called before MaxLeaseIndex or ClosedTimestamp are
// filled in. The difference between the user and system overhead is that
// users ranges do not have rangefeeds on by default whereas system ranges
// do.
const (
expectedUserOverhead uint32 = 45
expectedUserOverhead uint32 = 42
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxRaftCommandFooterSize = (&RaftCommandFooter{
var maxMaxLeaseFooterSize = (&MaxLeaseFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

Expand All @@ -28,10 +28,10 @@ var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
},
}).Size()

// MaxRaftCommandFooterSize returns the maximum possible size of an
// encoded RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
// MaxMaxLeaseFooterSize returns the maximum possible size of an encoded
// MaxLeaseFooter proto.
func MaxMaxLeaseFooterSize() int {
return maxMaxLeaseFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
Expand Down
301 changes: 160 additions & 141 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,12 @@ message RaftCommand {
// updated accordingly. Managing retry of proposals becomes trickier as
// well as that uproots whatever ordering was originally envisioned.
//
// This field is set through RaftCommandFooter hackery.
// This field is set through MaxLeaseFooter hackery. Unlike with the
// ClosedTimestamp, which needs to be nullable in this proto (see comment),
// there are no nullability concerns with this field. This is because
// max_lease_index is a primitive type, so it does not get encoded when zero.
// This alone ensures that the field is not encoded twice in the combined
// RaftCommand+MaxLeaseFooter proto.
uint64 max_lease_index = 4;

// The closed timestamp carried by this command. Once a follower is told to
Expand All @@ -255,8 +260,11 @@ message RaftCommand {
// in a command-specific way. If the value is not zero, the value is greater
// or equal to that of the previous commands (and all before it).
//
// This field is set through ClosedTimestampFooter hackery.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
// This field is set through ClosedTimestampFooter hackery. Unlike in the
// ClosedTimestampFooter, the field is nullable here so that it does not get
// encoded when empty. This prevents the field from being encoded twice in the
// combined RaftCommand+ClosedTimestampFooter proto.
util.hlc.Timestamp closed_timestamp = 17;

reserved 3;

Expand Down Expand Up @@ -284,19 +292,22 @@ message RaftCommand {
reserved 1, 2, 10001 to 10014;
}

// RaftCommandFooter contains a subset of the fields in RaftCommand. It is used
// MaxLeaseFooter contains a subset of the fields in RaftCommand. It is used
// to optimize a pattern where most of the fields in RaftCommand are marshaled
// outside of a heavily contended critical section, except for the fields in the
// footer, which are assigned and marhsaled inside of the critical section and
// appended to the marshaled byte buffer. This minimizes the memory allocation
// and marshaling work performed under lock.
message RaftCommandFooter {
message MaxLeaseFooter {
uint64 max_lease_index = 4;
}

// ClosedTimestampFooter is similar to RaftCommandFooter, allowing the proposal
// ClosedTimestampFooter is similar to MaxLeaseFooter, allowing the proposal
// buffer to fill in the closed_timestamp field after most of the proto has been
// marshaled already.
message ClosedTimestampFooter {
// NOTE: unlike in RaftCommand, there's no reason to make this field nullable.
// If we don't want to include the field, we don't need to append the encoded
// footer to an encoded RaftCommand buffer.
util.hlc.Timestamp closed_timestamp = 17 [(gogoproto.nullable) = false];
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.raftCmd.WriteBatch = nil
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp.Reset()
cmd.raftCmd.ClosedTimestamp = nil
} else {
// Assert that we're not writing under the closed timestamp. We can only do
// these checks on IsIntentWrite requests, since others (for example,
Expand Down Expand Up @@ -639,7 +639,7 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
//
// Alternatively if we discover that the RHS has already been removed
// from this store, clean up its data.
splitPreApply(ctx, b.batch, res.Split.SplitTrigger, b.r, cmd.raftCmd.ClosedTimestamp)
splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.raftCmd.ClosedTimestamp)

// The rangefeed processor will no longer be provided logical ops for
// its entire range, so it needs to be shut down and all registrations
Expand Down Expand Up @@ -823,13 +823,13 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
if cts := cmd.raftCmd.ClosedTimestamp; !cts.IsEmpty() {
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
if cts.Less(b.state.ClosedTimestamp) {
log.Fatalf(ctx,
"closed timestamp regressing from %s to %s when applying command %x",
b.state.ClosedTimestamp, cts, cmd.idKey)
}
b.state.ClosedTimestamp = cts
b.state.ClosedTimestamp = *cts
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ProposalData struct {
quotaAlloc *quotapool.IntAlloc

// tmpFooter is used to avoid an allocation.
tmpFooter kvserverpb.RaftCommandFooter
tmpFooter kvserverpb.MaxLeaseFooter

// ec.done is called after command application to update the timestamp
// cache and optionally release latches and exits lock wait-queues.
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (pc proposalCreator) newProposal(ba roachpb.BatchRequest) (*ProposalData, [
func (pc proposalCreator) encodeProposal(p *ProposalData) []byte {
cmdLen := p.command.Size()
needed := raftCommandPrefixLen + cmdLen +
kvserverpb.MaxRaftCommandFooterSize() +
kvserverpb.MaxMaxLeaseFooterSize() +
kvserverpb.MaxClosedTimestampFooterSize()
data := make([]byte, raftCommandPrefixLen, needed)
encodeRaftCommandPrefix(data, raftVersionStandard, p.idKey)
Expand Down Expand Up @@ -716,7 +716,12 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
type reqType int
checkClosedTS := func(t *testing.T, r *testProposerRaft, exp hlc.Timestamp) {
require.Len(t, r.lastProps, 1)
require.Equal(t, exp, r.lastProps[0].ClosedTimestamp)
if exp.IsEmpty() {
require.Nil(t, r.lastProps[0].ClosedTimestamp)
} else {
require.NotNil(t, r.lastProps[0].ClosedTimestamp)
require.Equal(t, exp, *r.lastProps[0].ClosedTimestamp)
}
}

// The lease that the proposals are made under.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Replica) propose(

// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
// buffer as a MaxLeaseFooter.
p.command.MaxLeaseIndex = 0

// Determine the encoding style for the Raft command.
Expand Down Expand Up @@ -320,7 +320,7 @@ func (r *Replica) propose(
// Allocate the data slice with enough capacity to eventually hold the two
// "footers" that are filled later.
needed := preLen + cmdLen +
kvserverpb.MaxRaftCommandFooterSize() +
kvserverpb.MaxMaxLeaseFooterSize() +
kvserverpb.MaxClosedTimestampFooterSize()
data := make([]byte, preLen, needed)
// Encode prefix with command ID, if necessary.
Expand Down
17 changes: 8 additions & 9 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,23 @@ func (rsl StateLoader) LoadMVCCStats(
// keys. We now deem those keys to be "legacy" because they have been replaced
// by the range applied state key.
//
// TODO(andrei): closedTimestamp is a pointer to avoid an allocation when
// putting it in RangeAppliedState. RangeAppliedState.ClosedTimestamp is made
// non-nullable (see comments on the field), this argument should be taken by
// value.
// TODO(andrei): closedTS is a pointer to avoid an allocation when putting it in
// RangeAppliedState. RangeAppliedState.ClosedTimestamp is made non-nullable
// (see comments on the field), this argument should be taken by value.
func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
newMS *enginepb.MVCCStats,
closedTimestamp *hlc.Timestamp,
closedTS *hlc.Timestamp,
) error {
as := enginepb.RangeAppliedState{
RaftAppliedIndex: appliedIndex,
LeaseAppliedIndex: leaseAppliedIndex,
RangeStats: newMS.ToPersistentStats(),
}
if closedTimestamp != nil && !closedTimestamp.IsEmpty() {
as.ClosedTimestamp = closedTimestamp
if closedTS != nil && !closedTS.IsEmpty() {
as.ClosedTimestamp = closedTS
}
// The RangeAppliedStateKey is not included in stats. This is also reflected
// in C.MVCCComputeStats and ComputeStatsForRange.
Expand Down Expand Up @@ -498,15 +497,15 @@ func (rsl StateLoader) SetMVCCStats(

// SetClosedTimestamp overwrites the closed timestamp.
func (rsl StateLoader) SetClosedTimestamp(
ctx context.Context, readWriter storage.ReadWriter, closedTS hlc.Timestamp,
ctx context.Context, readWriter storage.ReadWriter, closedTS *hlc.Timestamp,
) error {
as, err := rsl.LoadRangeAppliedState(ctx, readWriter)
if err != nil {
return err
}
return rsl.SetRangeAppliedState(
ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex,
as.RangeStats.ToStatsPtr(), &closedTS)
as.RangeStats.ToStatsPtr(), closedTS)
}

// SetLegacyRaftTruncatedState overwrites the truncated state.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import (
// split commit.
func splitPreApply(
ctx context.Context,
r *Replica,
readWriter storage.ReadWriter,
split roachpb.SplitTrigger,
r *Replica,
// The closed timestamp used to initialize the RHS.
closedTS hlc.Timestamp,
initClosedTS *hlc.Timestamp,
) {
// Sanity check that the store is in the split.
//
Expand Down Expand Up @@ -123,7 +122,7 @@ func splitPreApply(
}

// Persist the closed timestamp.
if err := rsl.SetClosedTimestamp(ctx, readWriter, closedTS); err != nil {
if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil {
log.Fatalf(ctx, "%s", err)
}

Expand Down
65 changes: 55 additions & 10 deletions pkg/sql/logictest/testdata/logic_test/unique
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ CREATE TABLE uniq_enum (
UNIQUE WITHOUT INDEX (s, j)
)

statement ok
CREATE TABLE uniq_partial_index (
i INT,
UNIQUE WITHOUT INDEX (i),
UNIQUE INDEX (i) WHERE i > 0
)

statement ok
CREATE TABLE other (k INT, v INT, w INT NOT NULL, x INT, y INT)

Expand Down Expand Up @@ -437,6 +430,51 @@ r s i j
eu-west bar 2 2
us-west foo 1 1

# Set a to the same value it already has.
statement ok
UPDATE uniq_partial SET a = 1 WHERE a = 1 AND b = 1

# Set a to an existing value.
statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_a"\nDETAIL: Key \(a\)=\(1\) already exists\.
UPDATE uniq_partial SET a = 1 WHERE a = 2

# Make b of (1, -1) positive so that it conflicts with (1, 1)
statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_a"\nDETAIL: Key \(a\)=\(1\) already exists\.
UPDATE uniq_partial SET b = 10 WHERE a = 1 AND b = -1

# Set a to NULL.
statement ok
UPDATE uniq_partial SET a = NULL, b = 10 WHERE a = 1 AND b = -1

# Update two existing, non-conflicting rows resulting in a conflict.
statement error pgcode 23505 pq: duplicate key value violates unique constraint "unique_a"\nDETAIL: Key \(a\)=\(10\) already exists\.
UPDATE uniq_partial SET a = 10 WHERE a IS NULL AND b = 5

# Set a to a non-existing value.
statement ok
UPDATE uniq_partial SET a = 10 WHERE a = 9 AND b = 9

# Set a to a value that would conflict if it was a non-partial unique constraint.
statement ok
UPDATE uniq_partial SET a = 1 WHERE b = -7

query II colnames,rowsort
SELECT * FROM uniq_partial
----
a b
1 1
1 -3
1 -7
2 2
5 5
6 6
7 7
9 -9
10 9
NULL 5
NULL 5
NULL 10


# -- Tests with UPSERT --
subtest Upsert
Expand Down Expand Up @@ -616,11 +654,18 @@ eu-west bar 2 2
# Ensure that we do not choose a partial index as the arbiter when there is a
# UNIQUE WITHOUT INDEX constraint.
statement ok
INSERT INTO uniq_partial_index VALUES (-1) ON CONFLICT (i) WHERE i > 0 DO UPDATE SET i = 1;
INSERT INTO uniq_partial_index VALUES (-1) ON CONFLICT (i) WHERE i > 0 DO UPDATE SET i = 2
CREATE TABLE uniq_partial_index_and_constraint (
i INT,
UNIQUE WITHOUT INDEX (i),
UNIQUE INDEX (i) WHERE i > 0
)

statement ok
INSERT INTO uniq_partial_index_and_constraint VALUES (-1) ON CONFLICT (i) WHERE i > 0 DO UPDATE SET i = 1;
INSERT INTO uniq_partial_index_and_constraint VALUES (-1) ON CONFLICT (i) WHERE i > 0 DO UPDATE SET i = 2

query I colnames
SELECT * FROM uniq_partial_index
SELECT * FROM uniq_partial_index_and_constraint
----
i
2
Expand Down
Loading

0 comments on commit a660767

Please sign in to comment.