Skip to content

Commit

Permalink
distsqlrun: turn "flow already registered" into an error
Browse files Browse the repository at this point in the history
Two flows with the same id seem to be scheduled on the same node which
used to cause panics. How that can happen is currently unclear; cosmic
rays? This patch turns the panic into an RPC or query error (depending
on sync/async flow), adds some more sanity checks and adds a
representation of the flow to the error and the error will also invite
users to report on cockroachdb#12876.

Touches cockroachdb#12876.
  • Loading branch information
andreimatei committed Aug 23, 2017
1 parent e8a1b41 commit 015c672
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 29 deletions.
16 changes: 16 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -364,6 +365,21 @@ type planningCtx struct {
nodeAddresses map[roachpb.NodeID]string
}

// sanityCheckAddresses returns an error if the same address is used by two
// nodes.
func (p *planningCtx) sanityCheckAddresses() error {
inverted := make(map[string]roachpb.NodeID)
for nodeID, addr := range p.nodeAddresses {
if otherNodeID, ok := inverted[addr]; ok {
return util.UnexpectedWithIssueErrorf(
12876,
"different nodes with the same address: %d and %d", nodeID, otherNodeID)
}
inverted[addr] = nodeID
}
return nil
}

// physicalPlan is a partial physical plan which corresponds to a planNode
// (partial in that it can correspond to a planNode subtree and not necessarily
// to the entire planNode for a given query).
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (dsp *distSQLPlanner) initRunners() {
//
// Note that errors that happen while actually running the flow are reported to
// recv, not returned by this function.
// TODO(andrei): Some errors ocurring during the "starting" phase are also
// reported to recv instead of being returned (see the flow.Start() call for the
// local flow). Perhaps we should push all errors to recv and have this function
// not return anything.
func (dsp *distSQLPlanner) Run(
planCtx *planningCtx,
txn *client.Txn,
Expand All @@ -108,6 +112,10 @@ func (dsp *distSQLPlanner) Run(
) error {
ctx := planCtx.ctx

if err := planCtx.sanityCheckAddresses(); err != nil {
return err
}

flows := plan.GenerateFlowSpecs()

if logPlanDiagram {
Expand Down Expand Up @@ -195,7 +203,10 @@ func (dsp *distSQLPlanner) Run(
return err
}
// TODO(radu): this should go through the flow scheduler.
flow.Start(ctx, func() {})
if err := flow.Start(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
flow.Wait()
flow.Cleanup(ctx)

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsqlplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func runTestFlow(
if err != nil {
t.Fatal(err)
}
flow.Start(ctx, func() {})
if err := flow.Start(ctx, func() {}); err != nil {
t.Fatal(err)
}
flow.Wait()
flow.Cleanup(ctx)

Expand Down
32 changes: 28 additions & 4 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ type Flow struct {
// Cancel function for ctx. Call this to cancel the flow (safe to be called
// multiple times).
ctxCancel context.CancelFunc

// spec is the request that produced this flow. Only used for debugging.
spec *FlowSpec
}

func newFlow(flowCtx FlowCtx, flowReg *flowRegistry, syncFlowConsumer RowReceiver) *Flow {
Expand Down Expand Up @@ -283,6 +286,8 @@ func (f *Flow) makeProcessor(ps *ProcessorSpec, inputs []RowSource) (Processor,
}

func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error {
f.spec = spec

// First step: setup the input synchronizers for all processors.
inputSyncs := make([][]RowSource, len(spec.Processors))
for pIdx, ps := range spec.Processors {
Expand Down Expand Up @@ -347,7 +352,12 @@ func (f *Flow) setup(ctx context.Context, spec *FlowSpec) error {
}

// Start starts the flow (each processor runs in their own goroutine).
func (f *Flow) Start(ctx context.Context, doneFn func()) {
//
// Generally if errors are encountered during the setup part, they're returned.
// But if the flow is a synchronous one, then no error is returned; instead the
// setup error is pushed to the syncFlowConsume. If an error is returned, a
// subsequent call to f.Wait() will not block.
func (f *Flow) Start(ctx context.Context, doneFn func()) error {
f.doneFn = doneFn
log.VEventf(
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
Expand All @@ -358,18 +368,32 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) {

// Once we call RegisterFlow, the inbound streams become accessible; we must
// set up the WaitGroup counter before.
f.waitGroup.Add(len(f.inboundStreams) + len(f.processors))

f.flowRegistry.RegisterFlow(f.ctx, f.id, f, f.inboundStreams, flowStreamDefaultTimeout)
// The counter will be further incremented below to account for the
// processors.
f.waitGroup.Add(len(f.inboundStreams))

if err := f.flowRegistry.RegisterFlow(
f.ctx, f.id, f, f.inboundStreams, flowStreamDefaultTimeout,
); err != nil {
if f.syncFlowConsumer != nil {
// For sync flows, the error goes to the consumer.
f.syncFlowConsumer.Push(nil /* row */, ProducerMetadata{Err: err})
f.syncFlowConsumer.ProducerDone()
} else {
return err
}
}
if log.V(1) {
log.Infof(f.ctx, "registered flow %s", f.id.Short())
}
for _, s := range f.startables {
s.start(f.ctx, &f.waitGroup, f.ctxCancel)
}
f.waitGroup.Add(len(f.processors))
for _, p := range f.processors {
go p.Run(f.ctx, &f.waitGroup)
}
return nil
}

// Wait waits for all the goroutines for this flow to exit. If the context gets
Expand Down
21 changes: 19 additions & 2 deletions pkg/sql/distsqlrun/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -130,18 +131,33 @@ func (fr *flowRegistry) releaseEntryLocked(id FlowID) {
//
// inboundStreams are all the remote streams that will be connected into this
// flow. If any of them is not connected within timeout, errors are propagated.
// The inboundStreams are expected to have been initialized with their
// WaitGroups (the group should have been incremented). RegisterFlow takes
// responsibility for calling Done() on that WaitGroup; this responsibility will
// be forwarded forward by ConnectInboundStream. In case this method returns an
// error, the WaitGroup will be decremented.
func (fr *flowRegistry) RegisterFlow(
ctx context.Context,
id FlowID,
f *Flow,
inboundStreams map[StreamID]*inboundStreamInfo,
timeout time.Duration,
) {
) (retErr error) {
fr.Lock()
defer fr.Unlock()
defer func() {
if retErr != nil {
for _, stream := range inboundStreams {
stream.waitGroup.Done()
}
}
}()
entry := fr.getEntryLocked(id)
if entry.flow != nil {
panic("flow already registered")
return util.UnexpectedWithIssueErrorf(
12876,
"flow already registered: flowID: %d.\nCurrent flow:%+v\nExisting flow:%+v",
f.spec, entry.flow.spec)
}
// Take a reference that will be removed by UnregisterFlow.
entry.refCount++
Expand Down Expand Up @@ -187,6 +203,7 @@ func (fr *flowRegistry) RegisterFlow(
}
})
}
return nil
}

// UnregisterFlow removes a flow from the registry. Any subsequent
Expand Down
43 changes: 35 additions & 8 deletions pkg/sql/distsqlrun/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ func TestFlowRegistry(t *testing.T) {
}

ctx := context.Background()
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout)
if err := reg.RegisterFlow(
ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout,
); err != nil {
t.Fatal(err)
}

if f := lookupFlow(reg, id1, 0); f != f1 {
t.Error("couldn't lookup previously registered flow")
Expand All @@ -107,7 +111,11 @@ func TestFlowRegistry(t *testing.T) {

go func() {
time.Sleep(jiffy)
reg.RegisterFlow(ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout)
if err := reg.RegisterFlow(
ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout,
); err != nil {
t.Error(err)
}
}()

if f := lookupFlow(reg, id1, 10*jiffy); f != f1 {
Expand Down Expand Up @@ -138,7 +146,11 @@ func TestFlowRegistry(t *testing.T) {
}()

time.Sleep(jiffy)
reg.RegisterFlow(ctx, id2, f2, nil /* inboundStreams */, flowStreamDefaultTimeout)
if err := reg.RegisterFlow(
ctx, id2, f2, nil /* inboundStreams */, flowStreamDefaultTimeout,
); err != nil {
t.Fatal(err)
}
wg.Wait()

// -- Multiple lookups, with the first one failing. --
Expand All @@ -163,14 +175,22 @@ func TestFlowRegistry(t *testing.T) {
}()

wg1.Wait()
reg.RegisterFlow(ctx, id3, f3, nil /* inboundStreams */, flowStreamDefaultTimeout)
if err := reg.RegisterFlow(
ctx, id3, f3, nil /* inboundStreams */, flowStreamDefaultTimeout,
); err != nil {
t.Fatal(err)
}
wg2.Wait()

// -- Lookup with huge timeout, register in the meantime. --

go func() {
time.Sleep(jiffy)
reg.RegisterFlow(ctx, id4, f4, nil /* inboundStreams */, flowStreamDefaultTimeout)
if err := reg.RegisterFlow(
ctx, id4, f4, nil /* inboundStreams */, flowStreamDefaultTimeout,
); err != nil {
t.Error(err)
}
}()

// This should return in a jiffy.
Expand Down Expand Up @@ -198,7 +218,11 @@ func TestStreamConnectionTimeout(t *testing.T) {
inboundStreams := map[StreamID]*inboundStreamInfo{
streamID1: {receiver: consumer, waitGroup: wg},
}
reg.RegisterFlow(context.TODO(), id1, f1, inboundStreams, jiffy)
if err := reg.RegisterFlow(
context.TODO(), id1, f1, inboundStreams, jiffy,
); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
si, err := lookupStreamInfo(reg, id1, streamID1)
Expand Down Expand Up @@ -288,8 +312,11 @@ func TestHandshake(t *testing.T) {
inboundStreams := map[StreamID]*inboundStreamInfo{
streamID: {receiver: consumer, waitGroup: wg},
}
reg.RegisterFlow(
context.TODO(), flowID, f1, inboundStreams, time.Hour /* timeout */)
if err := reg.RegisterFlow(
context.TODO(), flowID, f1, inboundStreams, time.Hour, /* timeout */
); err != nil {
t.Fatal(err)
}
}

// If the consumer is supposed to be connected early, then we connect the
Expand Down
32 changes: 21 additions & 11 deletions pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,42 @@ func (fs *flowScheduler) canRunFlow(_ *Flow) bool {
}

// runFlowNow starts the given flow; does not wait for the flow to complete.
func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) {
func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
fs.mu.numRunning++
fs.metrics.FlowStart()
f.Start(ctx, func() { fs.flowDoneCh <- f })
if err := f.Start(ctx, func() { fs.flowDoneCh <- f }); err != nil {
return err
}
// TODO(radu): we could replace the WaitGroup with a structure that keeps a
// refcount and automatically runs Cleanup() when the count reaches 0.
go func() {
f.Wait()
f.Cleanup(ctx)
}()
return nil
}

// ScheduleFlow is the main interface of the flow scheduler: it runs or enqueues
// the given flow.
//
// If the flow can start immediately, errors encountered when starting the flow
// are returned. If the flow is enqueued, these error will be later ignored.
func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
return fs.stopper.RunTask(ctx, "distsqlrun.flowScheduler: scheduling flow", func(ctx context.Context) {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.stopper.RunTaskWithErr(
ctx, "distsqlrun.flowScheduler: scheduling flow", func(ctx context.Context) error {
fs.mu.Lock()
defer fs.mu.Unlock()

if fs.canRunFlow(f) {
fs.runFlowNow(ctx, f)
} else {
if fs.canRunFlow(f) {
return fs.runFlowNow(ctx, f)
}
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
})
}
})
return nil

})
}

// Start launches the main loop of the scheduler.
Expand Down Expand Up @@ -127,7 +135,9 @@ func (fs *flowScheduler) Start() {
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
fs.runFlowNow(n.ctx, n.flow)
if err := fs.runFlowNow(n.ctx, n.flow); err != nil {
log.Errorf(n.ctx, "error starting queued flow: %s", err)
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (ds *ServerImpl) RunSyncFlow(stream DistSQL_RunSyncFlowServer) error {
ctx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
mbox.start(ctx, &f.waitGroup, ctxCancel)
f.Start(ctx, func() {})
if err := f.Start(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
"The error should have gone to the consumer.", err)
}
f.Wait()
f.Cleanup(ctx)
}); err != nil {
Expand All @@ -361,7 +364,7 @@ func (ds *ServerImpl) SetupFlow(
// Note: the passed context will be canceled when this RPC completes, so we
// can't associate it with the flow.
ctx = ds.AnnotateCtx(context.Background())
ctx, f, err := ds.setupFlow(ctx, parentSpan, req, nil)
ctx, f, err := ds.setupFlow(ctx, parentSpan, req, nil /* syncFlowConsumer */)
if err == nil {
err = ds.flowScheduler.ScheduleFlow(ctx, f)
}
Expand Down
Loading

0 comments on commit 015c672

Please sign in to comment.