diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index f20ea06cae81..dfa1216ee705 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -120,7 +120,7 @@ func ingestionPlanHook( } // Create a new tenant for the replication stream - if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), nil, newTenantID.ToUint64()); err == nil { + if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), p.Txn(), newTenantID.ToUint64()); err == nil { return errors.Newf("tenant with id %s already exists", newTenantID) } tenantInfo := &descpb.TenantInfoWithUsage{ @@ -129,7 +129,7 @@ func ingestionPlanHook( State: descpb.TenantInfo_ADD, }, } - if err := sql.CreateTenantRecord(ctx, p.ExecCfg(), nil, tenantInfo); err != nil { + if err := sql.CreateTenantRecord(ctx, p.ExecCfg(), p.Txn(), tenantInfo); err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 64658d04df68..10f3b676bc2c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -328,7 +328,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(string(token), startTime) + streamingKnobs.BeforeClientSubscribe(addr, string(token), startTime) } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index f3ec19e117a8..9137af0fac83 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -286,7 +286,7 @@ func TestStreamIngestionProcessor(t *testing.T) { } lastClientStart := make(map[string]hlc.Timestamp) - streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(token string, clientStartTime hlc.Timestamp) { + streamingTestingKnobs := &sql.StreamingTestingKnobs{BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) { lastClientStart[token] = clientStartTime }} out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index d1c9d86f6ceb..b2748a1d6df8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -42,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -219,10 +219,6 @@ func createTenantStreamingClusters( tenantArgs := base.TestTenantArgs{ TenantID: args.srcTenantID, TestingKnobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - DisableSplitQueue: true, - DisableMergeQueue: true, - }, TenantTestingKnobs: &sql.TenantTestingKnobs{ AllowSplitAndScatter: true, }}, @@ -283,6 +279,19 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) { exec(c.t, c.srcSysSQL, c.srcTenantSQL) } +func createScatteredTable(t *testing.T, c *tenantStreamingClusters) { + // Create a source table with multiple ranges spread across multiple nodes + numRanges := 50 + rowsPerRange := 20 + c.srcTenantSQL.Exec(t, fmt.Sprintf(` + CREATE TABLE d.scattered (key INT PRIMARY KEY); + INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, %d); + ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d)); + ALTER TABLE d.scattered SCATTER; + `, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange)) + c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}}) +} + var defaultSrcClusterSetting = map[string]string{ `kv.rangefeed.enabled`: `true`, `kv.closed_timestamp.target_duration`: `'1s'`, @@ -361,7 +370,6 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { // require.Error(t, err) c.cutover(producerJobID, ingestionJobID, cutoverTime) - jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) cleanupTenant := c.createDestTenantSQL(ctx) defer func() { @@ -559,7 +567,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) { lastClientStart := make(map[string]hlc.Timestamp) args := defaultTenantStreamingClustersArgs args.testingKnobs = &sql.StreamingTestingKnobs{ - BeforeClientSubscribe: func(token string, clientStartTime hlc.Timestamp) { + BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) { lastClientStart[token] = clientStartTime }, } @@ -734,7 +742,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 85283, "flaky test due to multi-node") + skip.UnderRace(t, "takes too long with multiple nodes") ctx := context.Background() args := defaultTenantStreamingClustersArgs @@ -743,15 +751,8 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - // Create a source table with multiple ranges spread across multiple nodes - numRanges := 50 - rowsPerRange := 20 - c.srcTenantSQL.Exec(t, fmt.Sprintf(` - CREATE TABLE d.scattered (key INT PRIMARY KEY); - INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, %d); - ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d)); - ALTER TABLE d.scattered SCATTER; - `, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange)) + createScatteredTable(t, c) + srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key") producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) @@ -768,6 +769,8 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { streamAddresses := progress.GetStreamIngest().StreamAddresses require.Greater(t, len(streamAddresses), 1) + destroyedAddress := c.srcURL.String() + require.NoError(t, c.srcTenantConn.Close()) c.srcTenantServer.Stopper().Stop(ctx) c.srcCluster.StopServer(0) @@ -801,18 +804,26 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { c.destSysSQL.Exec(c.t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime) jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + // The destroyed address should have been removed from the topology + progress = jobutils.GetJobProgress(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + newStreamAddresses := progress.GetStreamIngest().StreamAddresses + require.Contains(t, streamAddresses, destroyedAddress) + require.NotContains(t, newStreamAddresses, destroyedAddress) + alternateCompareResult("SELECT * FROM d.t1") alternateCompareResult("SELECT * FROM d.t2") alternateCompareResult("SELECT * FROM d.x") - alternateCompareResult("SELECT * FROM d.scattered") + + // We can't use alternateCompareResult because it'll try to contact the deceased + // n1 even if the lease holders for d.scattered have all moved to other nodes + dstScatteredData := c.destTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key") + require.Equal(t, srcScatteredData, dstScatteredData) } func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.UnderRaceWithIssue(t, 83867) - ctx := context.Background() args := defaultTenantStreamingClustersArgs c, cleanup := createTenantStreamingClusters(ctx, t, args) @@ -913,3 +924,72 @@ func TestTenantStreamingDeleteRange(t *testing.T) { checkDelRangeOnTable("t1", true /* embeddedInSST */) checkDelRangeOnTable("t2", false /* embeddedInSST */) } + +func TestTenantStreamingMultipleNodes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t, "takes too long with multiple nodes") + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + args.srcNumNodes = 4 + args.destNumNodes = 4 + + // Track the number of unique addresses that were connected to + clientAddresses := make(map[string]struct{}) + var addressesMu syncutil.Mutex + args.testingKnobs = &sql.StreamingTestingKnobs{ + BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) { + addressesMu.Lock() + defer addressesMu.Unlock() + clientAddresses[addr] = struct{}{} + }, + } + + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + + createScatteredTable(t, c) + + producerJobID, ingestionJobID := c.startStreamReplication() + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)") + tenantSQL.Exec(t, "INSERT INTO d.x VALUES (1, 1)") + }) + + c.destSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) + jobutils.WaitForJobToPause(t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + tenantSQL.Exec(t, "INSERT INTO d.x VALUES (2, 2)") + }) + c.destSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID) + jobutils.WaitForJobToRun(t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + tenantSQL.Exec(t, "INSERT INTO d.x VALUES (3, 3)") + }) + + var cutoverTime time.Time + c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { + sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) + }) + + c.cutover(producerJobID, ingestionJobID, cutoverTime) + + cleanupTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanupTenant()) + }() + + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + c.compareResult("SELECT * FROM d.x") + c.compareResult("SELECT * FROM d.scattered ORDER BY key") + + // Since the data was distributed across multiple nodes, multiple nodes should've been connected to + require.Greater(t, len(clientAddresses), 1) +} diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 9b36733e5925..844bbd9ba5fe 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1584,7 +1584,7 @@ type StreamingTestingKnobs struct { // BeforeClientSubscribe allows observation of parameters about to be passed // to a streaming client - BeforeClientSubscribe func(token string, startTime hlc.Timestamp) + BeforeClientSubscribe func(addr string, token string, startTime hlc.Timestamp) } var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}