diff --git a/pkg/base/BUILD.bazel b/pkg/base/BUILD.bazel index b9cc043c76be..5d8a683eaea3 100644 --- a/pkg/base/BUILD.bazel +++ b/pkg/base/BUILD.bazel @@ -28,7 +28,6 @@ go_library( "//pkg/settings/cluster", "//pkg/util", "//pkg/util/envutil", - "//pkg/util/errorutil", "//pkg/util/humanizeutil", "//pkg/util/metric", "//pkg/util/mon", diff --git a/pkg/base/node_id.go b/pkg/base/node_id.go index b0ef38f948af..2f41c3aefeb4 100644 --- a/pkg/base/node_id.go +++ b/pkg/base/node_id.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -233,15 +232,6 @@ func (c *SQLIDContainer) OptionalNodeID() (roachpb.NodeID, bool) { return (*NodeIDContainer)(c).Get(), true } -// OptionalNodeIDErr is like OptionalNodeID, but returns an error (referring to -// the optionally supplied GitHub issues) if the ID is not present. -func (c *SQLIDContainer) OptionalNodeIDErr(issue int) (roachpb.NodeID, error) { - if (*NodeIDContainer)(c).standaloneSQLInstance { - return 0, errorutil.UnsupportedWithMultiTenancy(issue) - } - return (*NodeIDContainer)(c).Get(), nil -} - // SQLInstanceID returns the wrapped SQLInstanceID. func (c *SQLIDContainer) SQLInstanceID() SQLInstanceID { return SQLInstanceID((*NodeIDContainer)(c).Get()) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index 3029c603e43d..630613dd7128 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -82,7 +82,9 @@ func startDistIngestion( client, replicatedTime, streamProgress.Checkpoint, - initialScanTimestamp)(ctx, dsp) + initialScanTimestamp, + dsp.GatewayID(), + )(ctx, dsp) if err != nil { return err } @@ -128,6 +130,7 @@ func makePlan( previousReplicatedTime hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, initialScanTimestamp hlc.Timestamp, + gatewayID base.SQLInstanceID, ) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { jobID := ingestionJob.ID() @@ -185,14 +188,9 @@ func makePlan( execinfrapb.Ordering{}, ) - gatewayNodeID, err := execCtx.ExecCfg().NodeInfo.NodeID.OptionalNodeIDErr(48274) - if err != nil { - return nil, nil, err - } - // The ResultRouters from the previous stage will feed in to the // StreamIngestionFrontier processor. - p.AddSingleGroupStage(ctx, base.SQLInstanceID(gatewayNodeID), + p.AddSingleGroupStage(ctx, gatewayID, execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec}, execinfrapb.PostProcessSpec{}, streamIngestionResultTypes)