Skip to content

Commit

Permalink
sql: expose mvcc timestamps to SQL
Browse files Browse the repository at this point in the history
Fixes cockroachdb#50102.

This PR adds introspection into the KV layer's concurrency control from
the SQL level. In particular, we now expose the MVCC HLC timestamp of a
row as a special system column on every table. This system column is
exposed as a decimal, and is computed as `wall time * 10^10 + logical time`.

To accomplish this, this PR adds planning and execution infrastructure
for implicit system columns that can be produced by the execution layer
for a particular row.

Release note (sql change): Expose the MVCC timestamp of each row as a
system column on tables. This column is named
`crdb_internal_mvcc_timestamp` and is accessible only in a limited set
of contexts.
  • Loading branch information
rohany committed Jul 20, 2020
1 parent b54e948 commit 55193b5
Show file tree
Hide file tree
Showing 70 changed files with 2,105 additions and 1,102 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2685,7 +2685,7 @@ func TestRestoreAsOfSystemTimeGCBounds(t *testing.T) {
ctx, tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
defer cleanupFn()
const dir = "nodelocal://0/"
preGC := tree.TimestampToDecimal(tc.Server(0).Clock().Now()).String()
preGC := tree.TimestampToDecimalDatum(tc.Server(0).Clock().Now()).String()

gcr := roachpb.GCRequest{
// Bogus span to make it a valid request.
Expand All @@ -2701,7 +2701,7 @@ func TestRestoreAsOfSystemTimeGCBounds(t *testing.T) {
t.Fatal(err)
}

postGC := tree.TimestampToDecimal(tc.Server(0).Clock().Now()).String()
postGC := tree.TimestampToDecimalDatum(tc.Server(0).Clock().Now()).String()

lateFullTableBackup := dir + "/tbl-after-gc"
sqlDB.Exec(t, `BACKUP data.bank TO $1 WITH revision_history`, lateFullTableBackup)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (e *jsonEncoder) EncodeResolvedTimestamp(
_ context.Context, _ string, resolved hlc.Timestamp,
) ([]byte, error) {
meta := map[string]interface{}{
`resolved`: tree.TimestampToDecimal(resolved).Decimal.String(),
`resolved`: tree.TimestampToDecimalDatum(resolved).Decimal.String(),
}
var jsonEntries interface{}
if e.wrapped {
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/partitioning
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ TABLE ok1
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -465,6 +466,7 @@ TABLE ok2
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -507,6 +509,7 @@ TABLE ok3
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -552,6 +555,7 @@ TABLE ok4
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -590,6 +594,7 @@ TABLE ok5
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -653,6 +658,7 @@ TABLE ok6
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
└── b int not null
Expand Down Expand Up @@ -688,6 +694,7 @@ TABLE ok7
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
└── b int not null
Expand Down Expand Up @@ -729,6 +736,7 @@ TABLE ok8
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
└── b int not null
Expand Down Expand Up @@ -772,6 +780,7 @@ TABLE ok9
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
└── b int not null
Expand Down Expand Up @@ -817,6 +826,7 @@ TABLE ok10
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
└── b int not null
Expand Down Expand Up @@ -871,6 +881,7 @@ TABLE ok11
├── a int not null
├── b int not null
├── c int not null
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down Expand Up @@ -916,6 +927,7 @@ TABLE ok12
├── a int not null
├── b int not null
├── c int
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
└── INDEX primary
├── a int not null
├── b int not null
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/zone
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down Expand Up @@ -82,6 +83,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down Expand Up @@ -145,6 +147,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down Expand Up @@ -247,6 +250,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down Expand Up @@ -300,6 +304,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down Expand Up @@ -401,6 +406,7 @@ EXPLAIN (OPT, CATALOG) SELECT * FROM t
TABLE t
├── k int not null
├── v string
├── crdb_internal_mvcc_timestamp decimal [hidden] [system]
├── FAMILY fam_0_k_v (k, v)
├── INDEX primary
│ ├── k int not null
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,7 @@ func NewColOperator(
// still responsible for doing the cancellation check on their own while
// performing long operations.
result.Op = colexec.NewCancelChecker(result.Op)
returnMutations := core.TableReader.Visibility == execinfra.ScanVisibilityPublicAndNotPublic
result.ColumnTypes = core.TableReader.Table.ColumnTypesWithMutations(returnMutations)
result.ColumnTypes = scanOp.ResultTypes
case core.Aggregator != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
Expand Down
86 changes: 74 additions & 12 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strings"

"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -93,6 +95,15 @@ type cTableInfo struct {
// id pair at the start of the key.
knownPrefixLength int

// The following fields contain MVCC metadata for each row and may be
// returned to users of cFetcher immediately after NextBatch returns.
//
// rowLastModified is the timestamp of the last time any family in the row
// was modified in any way.
rowLastModified hlc.Timestamp
// timestampOutputIdx controls at what row ordinal to write the timestamp.
timestampOutputIdx int

keyValTypes []*types.T
extraTypes []*types.T

Expand Down Expand Up @@ -143,6 +154,10 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) {
return 0, false
}

// noTimestampColumn is a sentinel value to denote that the MVCC timestamp
// column is not part of the output.
const noTimestampColumn = -1

// cFetcher handles fetching kvs and forming table rows for an
// arbitrary number of tables.
// Usage:
Expand Down Expand Up @@ -186,6 +201,11 @@ type cFetcher struct {
// when beginning a new scan.
traceKV bool

// mvccDecodeStrategy controls whether or not MVCC timestamps should
// be decoded from KV's fetched. It is set if any of the requested tables
// are required to produce an MVCC timestamp system column.
mvccDecodeStrategy row.MVCCDecodingStrategy

// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

Expand Down Expand Up @@ -221,6 +241,11 @@ type cFetcher struct {
// colvecs is a slice of the ColVecs within batch, pulled out to avoid
// having to call batch.Vec too often in the tight loop.
colvecs []coldata.Vec

// timestampCol is the underlying ColVec for the timestamp output column,
// or nil if the timestamp column was not requested. It is pulled out from
// colvecs to avoid having to cast the vec to decimal on every write.
timestampCol []apd.Decimal
}

// adapter is a utility struct that helps with memory accounting.
Expand Down Expand Up @@ -267,22 +292,20 @@ func (rf *cFetcher) Init(
sort.Sort(m)
colDescriptors := tableArgs.Cols
table := &cTableInfo{
spans: tableArgs.Spans,
desc: tableArgs.Desc,
colIdxMap: m,
index: tableArgs.Index,
isSecondaryIndex: tableArgs.IsSecondaryIndex,
cols: colDescriptors,
spans: tableArgs.Spans,
desc: tableArgs.Desc,
colIdxMap: m,
index: tableArgs.Index,
isSecondaryIndex: tableArgs.IsSecondaryIndex,
cols: colDescriptors,
timestampOutputIdx: noTimestampColumn,
}

typs := make([]*types.T, len(colDescriptors))
for i := range typs {
typs[i] = colDescriptors[i].Type
}

rf.machine.batch = allocator.NewMemBatch(typs)
rf.machine.colvecs = rf.machine.batch.ColVecs()

var err error

var neededCols util.FastIntSet
Expand All @@ -294,10 +317,24 @@ func (rf *cFetcher) Init(
// The idx-th column is required.
neededCols.Add(int(col))
table.neededColsList = append(table.neededColsList, int(col))
// If this column is the timestamp column, set up the output index.
sysColKind := sqlbase.GetSystemColumnKindFromColumnID(col)
if sysColKind == sqlbase.SystemColumnKind_MVCCTIMESTAMP {
table.timestampOutputIdx = idx
rf.mvccDecodeStrategy = row.MVCCDecodingRequired
}
}
}
sort.Ints(table.neededColsList)

rf.machine.batch = allocator.NewMemBatch(typs)
rf.machine.colvecs = rf.machine.batch.ColVecs()
// If the fetcher is requested to produce a timestamp column, pull out the
// column as a decimal and save it.
if table.timestampOutputIdx != noTimestampColumn {
rf.machine.timestampCol = rf.machine.colvecs[table.timestampOutputIdx].Decimal()
}

table.knownPrefixLength = len(sqlbase.MakeIndexKeyPrefix(codec, table.desc.TableDesc(), table.index.ID))

var indexColumnIDs []sqlbase.ColumnID
Expand All @@ -309,6 +346,15 @@ func (rf *cFetcher) Init(
}

table.neededValueColsByIdx = tableArgs.ValNeededForCol.Copy()

// If system columns are requested, they are present in ValNeededForCol.
// However, we don't want to include them in neededValueColsByIdx, because
// the handling of system columns is separate from the standard value
// decoding process.
if table.timestampOutputIdx != noTimestampColumn {
table.neededValueColsByIdx.Remove(table.timestampOutputIdx)
}

neededIndexCols := 0
nIndexCols := len(indexColumnIDs)
if cap(table.indexColOrdinals) >= nIndexCols {
Expand Down Expand Up @@ -590,7 +636,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
case stateInvalid:
return nil, errors.New("invalid fetcher state")
case stateInitFetch:
moreKeys, kv, newSpan, err := rf.fetcher.NextKV(ctx)
moreKeys, kv, newSpan, err := rf.fetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return nil, colexecerror.NewStorageError(err)
}
Expand Down Expand Up @@ -628,6 +674,9 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
rf.machine.batch.ResetInternalBatch()
rf.shiftState()
case stateDecodeFirstKVOfRow:
// Reset MVCC metadata for the table, since this is the first KV of a row.
rf.table.rowLastModified = hlc.Timestamp{}

// foundNull is set when decoding a new index key for a row finds a NULL value
// in the index key. This is used when decoding unique secondary indexes in order
// to tell whether they have extra columns appended to the key.
Expand Down Expand Up @@ -732,6 +781,10 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
if rf.traceKV {
log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal)
}
// Update the MVCC values for this row.
if rf.table.rowLastModified.Less(rf.machine.nextKV.Value.Timestamp) {
rf.table.rowLastModified = rf.machine.nextKV.Value.Timestamp
}
if len(rf.table.desc.Families) == 1 {
rf.machine.state[0] = stateFinalizeRow
rf.machine.state[1] = stateInitFetch
Expand All @@ -740,7 +793,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
rf.machine.state[0] = stateFetchNextKVWithUnfinishedRow
case stateSeekPrefix:
for {
moreRows, kv, _, err := rf.fetcher.NextKV(ctx)
moreRows, kv, _, err := rf.fetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return nil, colexecerror.NewStorageError(err)
}
Expand Down Expand Up @@ -772,7 +825,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
rf.shiftState()

case stateFetchNextKVWithUnfinishedRow:
moreKVs, kv, _, err := rf.fetcher.NextKV(ctx)
moreKVs, kv, _, err := rf.fetcher.NextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return nil, colexecerror.NewStorageError(err)
}
Expand Down Expand Up @@ -823,6 +876,11 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal)
}

// Update the MVCC values for this row.
if rf.table.rowLastModified.Less(rf.machine.nextKV.Value.Timestamp) {
rf.table.rowLastModified = rf.machine.nextKV.Value.Timestamp
}

if familyID == rf.table.maxColumnFamilyID {
// We know the row can't have any more keys, so finalize the row.
rf.machine.state[0] = stateFinalizeRow
Expand All @@ -833,6 +891,10 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
}

case stateFinalizeRow:
// Populate the row with the buffered MVCC information.
if rf.table.timestampOutputIdx != noTimestampColumn {
rf.machine.timestampCol[rf.machine.rowIdx] = tree.TimestampToDecimal(rf.table.rowLastModified)
}
// We're finished with a row. Bump the row index, fill the row in with
// nulls if necessary, emit the batch if necessary, and move to the next
// state.
Expand Down
Loading

0 comments on commit 55193b5

Please sign in to comment.