Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: multi-node unit tests #85735

Merged
merged 1 commit into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ingestionPlanHook(
}

// Create a new tenant for the replication stream
if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), nil, newTenantID.ToUint64()); err == nil {
if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), p.Txn(), newTenantID.ToUint64()); err == nil {
return errors.Newf("tenant with id %s already exists", newTenantID)
}
tenantInfo := &descpb.TenantInfoWithUsage{
Expand All @@ -129,7 +129,7 @@ func ingestionPlanHook(
State: descpb.TenantInfo_ADD,
},
}
if err := sql.CreateTenantRecord(ctx, p.ExecCfg(), nil, tenantInfo); err != nil {
if err := sql.CreateTenantRecord(ctx, p.ExecCfg(), p.Txn(), tenantInfo); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(string(token), startTime)
streamingKnobs.BeforeClientSubscribe(addr, string(token), startTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

lastClientStart := make(map[string]hlc.Timestamp)
streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(token string, clientStartTime hlc.Timestamp) {
streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) {
lastClientStart[token] = clientStartTime
}}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
Expand Down
120 changes: 100 additions & 20 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand All @@ -42,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -219,10 +219,6 @@ func createTenantStreamingClusters(
tenantArgs := base.TestTenantArgs{
TenantID: args.srcTenantID,
TestingKnobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableSplitQueue: true,
DisableMergeQueue: true,
},
TenantTestingKnobs: &sql.TenantTestingKnobs{
AllowSplitAndScatter: true,
}},
Expand Down Expand Up @@ -283,6 +279,19 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) {
exec(c.t, c.srcSysSQL, c.srcTenantSQL)
}

func createScatteredTable(t *testing.T, c *tenantStreamingClusters) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
rowsPerRange := 20
c.srcTenantSQL.Exec(t, fmt.Sprintf(`
CREATE TABLE d.scattered (key INT PRIMARY KEY);
INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d));
ALTER TABLE d.scattered SCATTER;
`, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange))
c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}})
}

var defaultSrcClusterSetting = map[string]string{
`kv.rangefeed.enabled`: `true`,
`kv.closed_timestamp.target_duration`: `'1s'`,
Expand Down Expand Up @@ -361,7 +370,6 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
// require.Error(t, err)

c.cutover(producerJobID, ingestionJobID, cutoverTime)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
Expand Down Expand Up @@ -559,7 +567,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
lastClientStart := make(map[string]hlc.Timestamp)
args := defaultTenantStreamingClustersArgs
args.testingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(token string, clientStartTime hlc.Timestamp) {
BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) {
lastClientStart[token] = clientStartTime
},
}
Expand Down Expand Up @@ -734,7 +742,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 85283, "flaky test due to multi-node")
skip.UnderRace(t, "takes too long with multiple nodes")

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
Expand All @@ -743,15 +751,8 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
rowsPerRange := 20
c.srcTenantSQL.Exec(t, fmt.Sprintf(`
CREATE TABLE d.scattered (key INT PRIMARY KEY);
INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d));
ALTER TABLE d.scattered SCATTER;
`, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange))
createScatteredTable(t, c)
srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.startStreamReplication()
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
Expand All @@ -768,6 +769,8 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
streamAddresses := progress.GetStreamIngest().StreamAddresses
require.Greater(t, len(streamAddresses), 1)

destroyedAddress := c.srcURL.String()

require.NoError(t, c.srcTenantConn.Close())
c.srcTenantServer.Stopper().Stop(ctx)
c.srcCluster.StopServer(0)
Expand Down Expand Up @@ -801,18 +804,26 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
c.destSysSQL.Exec(c.t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

// The destroyed address should have been removed from the topology
progress = jobutils.GetJobProgress(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
require.Contains(t, streamAddresses, destroyedAddress)
require.NotContains(t, newStreamAddresses, destroyedAddress)

alternateCompareResult("SELECT * FROM d.t1")
alternateCompareResult("SELECT * FROM d.t2")
alternateCompareResult("SELECT * FROM d.x")
alternateCompareResult("SELECT * FROM d.scattered")

// We can't use alternateCompareResult because it'll try to contact the deceased
// n1 even if the lease holders for d.scattered have all moved to other nodes
dstScatteredData := c.destTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key")
require.Equal(t, srcScatteredData, dstScatteredData)
}

func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
c, cleanup := createTenantStreamingClusters(ctx, t, args)
Expand Down Expand Up @@ -913,3 +924,72 @@ func TestTenantStreamingDeleteRange(t *testing.T) {
checkDelRangeOnTable("t1", true /* embeddedInSST */)
checkDelRangeOnTable("t2", false /* embeddedInSST */)
}

func TestTenantStreamingMultipleNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long with multiple nodes")

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
args.srcNumNodes = 4
args.destNumNodes = 4

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
args.testingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
},
}

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

createScatteredTable(t, c)

producerJobID, ingestionJobID := c.startStreamReplication()
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (1, 1)")
})

c.destSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID))
c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (2, 2)")
})
c.destSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.destSysSQL, jobspb.JobID(ingestionJobID))

c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (3, 3)")
})

var cutoverTime time.Time
c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
})

c.cutover(producerJobID, ingestionJobID, cutoverTime)

cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
require.NoError(t, cleanupTenant())
}()

c.compareResult("SELECT * FROM d.t1")
c.compareResult("SELECT * FROM d.t2")
c.compareResult("SELECT * FROM d.x")
c.compareResult("SELECT * FROM d.scattered ORDER BY key")

// Since the data was distributed across multiple nodes, multiple nodes should've been connected to
require.Greater(t, len(clientAddresses), 1)
}
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ type StreamingTestingKnobs struct {

// BeforeClientSubscribe allows observation of parameters about to be passed
// to a streaming client
BeforeClientSubscribe func(token string, startTime hlc.Timestamp)
BeforeClientSubscribe func(addr string, token string, startTime hlc.Timestamp)
}

var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}
Expand Down