Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
30386: importccl: correctly determine start span for resume r=mjibson a=mjibson

When writing the "done spans" list, use the min key instead of trying
to determine the table start key. Same for at the end. This removes the
possibility of misusing the table span end key as described below. We
may end up changing TableDescriptor.TableSpan, but the change here is
guaranteed to be correct regardless of that.

--

The problem is that when importing a large number of tables (>=56),
regardless of pgdump or mysqldump, we have a table key decoding problem.

When determining split points, we merge two lists: 1) all of the sampled
keys, 2) all of the start and and keys of spans of all table and sequence
descriptors being added. Assume that 100 empty tables (i.e., a PGDUMP
file with 100 `CREATE TABLE` statements) are being imported. We start our
numbering at 55, so tables with ID 55 to 154 will be created. We start
with table ID 55 and get its descriptor span: `/Table/5{5-6}`, or in
bytes: `start: "\xbf", end: "\xc0"`. The end span is computed by making
the start key for a table and calling PrefixEnd on it. PrefixEnd doesn't
know anything about table IDs, it just works on bytes and finds the next
byte, hence `bf` -> `c0`. We can do this for all the tables. Great.

Now move on to the SST writer. When writing SSTs we do a check when done
writing them to see if we are at the end of a table's SSTs. If we are,
we mark the entire table range as done so its progress contribution
is complete (and it isn't imported again if we have to restart). This
works by taking the split point key and, assuming it's a valid table
key prefix, extracts out the table ID. However table ID 109 has an end
span that's not a valid table key prefix (because of the blind call to
PrefixEnd described above). For example, the following code:

```
	for i := sqlbase.ID(107); i <= 111; i++ {
		desc := sqlbase.TableDescriptor{ID: i}
		fmt.Println(desc.TableSpan())
	}
```

prints

```
/Table/10{7-8}
/Table/10{8-9}
/Table/109{-/PrefixEnd}
/Table/11{0-1}
/Table/11{1-2}
```

So when the SST writer tries to figure out the table ID on table 109 it
encounters an error because it's not a valid table prefix. This happens
because it wasn't properly encoded with encoding.EncodUvarintAscending,
which does (I guess?) other stuff above that.

Fixes cockroachdb#29348

Release note (bug fix): fix IMPORT of empty or small tables under rare
conditions.

Co-authored-by: Matt Jibson <matt.jibson@gmail.com>
  • Loading branch information
craig[bot] and maddyblue committed Sep 19, 2018
2 parents e595a1a + ebc326a commit b903a24
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 26 deletions.
19 changes: 16 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -602,6 +603,17 @@ COPY t (a, b, c) FROM stdin;
`SHOW TABLES`: {{"t"}},
},
},
{
name: "many tables",
typ: "PGDUMP",
data: func() string {
var sb strings.Builder
for i := 1; i <= 100; i++ {
fmt.Fprintf(&sb, "CREATE TABLE t%d ();\n", i)
}
return sb.String()
}(),
},

// Error
{
Expand Down Expand Up @@ -1789,8 +1801,6 @@ func TestImportLivenessWithRestart(t *testing.T) {
t.Fatalf("not all rows were present. Expecting %d, had %d", rows, rowCount)
}

rescheduled := jobutils.GetJobPayload(t, sqlDB, jobID).Details.(*jobspb.Payload_Import).Import

// Verify that all write progress coalesced into a single span
// encompassing the entire table.
spans := rescheduledProgress.Details.(*jobspb.Progress_Import).Import.SpanProgress
Expand All @@ -1799,7 +1809,10 @@ func TestImportLivenessWithRestart(t *testing.T) {
}

// Ensure that an entire table range is marked as complete
tableSpan := rescheduled.Tables[0].Desc.TableSpan()
tableSpan := roachpb.Span{
Key: keys.MinKey,
EndKey: keys.MaxKey,
}
if !tableSpan.EqualValue(spans[0]) {
t.Fatalf("expected entire table to be marked complete, had %s", spans[0])
}
Expand Down
32 changes: 20 additions & 12 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,27 @@ func (sp *sstWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
key := roachpb.Key(samples[i])
return key.Compare(span.End) >= 0
})
finished := roachpb.Span{EndKey: span.End}
// Special case: if we're processing the first span, we want
// to mark the table key itself as being complete. This
// means that at the end of importing, the table's entire
// key-space will be marked as complete.
if idx == 0 {
_, table, err := keys.DecodeTablePrefix(span.End)
if err != nil {
return errors.Wrapf(err, "expected a table key, had %s", span.End)
var finished roachpb.Span
// Mark the processed span as done for resume. If it was the first or last
// span, use min or max key. This is easier than trying to correctly determine
// the table ID we imported and getting its start span because span.End
// might be (in the case of an empty table) the start key of the next table.
switch idx {
case 0:
finished = roachpb.Span{
Key: keys.MinKey,
EndKey: span.End,
}
case len(samples):
finished = roachpb.Span{
Key: samples[idx-1],
EndKey: keys.MaxKey,
}
default:
finished = roachpb.Span{
Key: samples[idx-1],
EndKey: span.End,
}
finished.Key = keys.MakeTablePrefix(uint32(table))
} else {
finished.Key = samples[idx-1]
}
var sg roachpb.SpanGroup
sg.Add(d.SpanProgress...)
Expand Down
11 changes: 0 additions & 11 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,3 @@ func GetJobProgress(t *testing.T, db *sqlutils.SQLRunner, jobID int64) *jobspb.P
}
return ret
}

// GetJobPayload loads the Payload message associated with the job.
func GetJobPayload(t *testing.T, db *sqlutils.SQLRunner, jobID int64) *jobspb.Payload {
ret := &jobspb.Payload{}
var buf []byte
db.QueryRow(t, `SELECT payload FROM system.jobs WHERE id = $1`, jobID).Scan(&buf)
if err := protoutil.Unmarshal(buf, ret); err != nil {
t.Fatal(err)
}
return ret
}

0 comments on commit b903a24

Please sign in to comment.