Skip to content

Commit

Permalink
do not merge: force parquet cloud storage tests
Browse files Browse the repository at this point in the history
This change forces all tests, including tests for `diff` and `end_time`
to run with the `cloudstorage` sink and `format=parquet` where possible.

Informs: #103129
Informs: #99028
Epic: CRDB-27372
Release note: None
  • Loading branch information
jayshrivastava committed Jun 13, 2023
1 parent 32d7706 commit d5d1c88
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
}

withFormatParquet := ""
if rand.Intn(2) < 1 {
if rand.Intn(2) < 2 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestChangefeedBasicQueryWrapped(t *testing.T) {
// Currently, parquet format (which may be injected by feed() call), doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff
CREATE CHANGEFEED WITH envelope='wrapped', format='parquet', diff
AS SELECT b||a AS ba, event_op() AS op FROM foo`)
defer closeFeed(t, foo)

Expand Down Expand Up @@ -343,7 +343,7 @@ AS SELECT b||a AS ba, event_op() AS op FROM foo`)
})
}

cdcTest(t, testFn, feedTestForceSink("webhook"))
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

// Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO.
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestChangefeedDiff(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff, format=parquet`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestChangefeedDiff(t *testing.T) {
})
}

cdcTest(t, testFn)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedTenants(t *testing.T) {
Expand Down Expand Up @@ -7208,7 +7208,7 @@ func TestChangefeedEndTime(t *testing.T) {
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")

fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime)
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1, format=parquet", fakeEndTime)
defer closeFeed(t, feed)

assertPayloads(t, feed, []string{
Expand All @@ -7225,7 +7225,7 @@ func TestChangefeedEndTime(t *testing.T) {
}))
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedEndTimeWithCursor(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights[sinkType] = 0
}
}
if weight, ok := sinkWeights["cloudstorage"]; ok && weight != 0 {
sinkWeights = map[string]int{"cloudstorage": 1}
}
weightTotal := 0
for _, weight := range sinkWeights {
weightTotal += weight
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ func (f *cloudFeedFactory) Feed(
}
}
randNum := rand.Intn(5)
if randNum < 3 {
if randNum < 0 {
parquetPossible = false
}
if parquetPossible {
Expand Down

0 comments on commit d5d1c88

Please sign in to comment.