Skip to content

Commit

Permalink
daemon: fix the context passed to the driver factory; fixes #253
Browse files Browse the repository at this point in the history
Signed-off-by: Denys Smirnov <denys@sourced.tech>
  • Loading branch information
Denys Smirnov committed Feb 22, 2019
1 parent 83166ea commit 6babafa
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 45 deletions.
4 changes: 2 additions & 2 deletions daemon/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type mockDriver struct {
MockStatus protocol.Status
}

func newMockDriver() (Driver, error) {
func newMockDriver(ctx context.Context) (Driver, error) {
return &mockDriver{
MockID: runtime.NewULID().String(),
MockStatus: protocol.Running,
Expand Down Expand Up @@ -63,7 +63,7 @@ func (d *mockDriver) Stop() error {
}

func newEchoDriver() *echoDriver {
d, _ := newMockDriver()
d, _ := newMockDriver(context.Background())
return &echoDriver{
Driver: d,
}
Expand Down
10 changes: 5 additions & 5 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func (d *Daemon) getDriverImage(rctx context.Context, language string) (runtime.

// newDriverPool, instance a new driver pool for the given language and image
// and should be called under a lock.
func (d *Daemon) newDriverPool(rctx context.Context, language string, image runtime.DriverImage) (*DriverPool, error) {
sp, _ := opentracing.StartSpanFromContext(rctx, "bblfshd.pool.newDriverPool")
func (d *Daemon) newDriverPool(ctx context.Context, language string, image runtime.DriverImage) (*DriverPool, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "bblfshd.pool.newDriverPool")
defer sp.Finish()

dp := NewDriverPool(func() (Driver, error) {
dp := NewDriverPool(func(ctx context.Context) (Driver, error) {
logrus.Debugf("spawning driver instance %q ...", image.Name())

opts := d.getDriverInstanceOptions()
Expand All @@ -186,7 +186,7 @@ func (d *Daemon) newDriverPool(rctx context.Context, language string, image runt
return nil, err
}

if err := driver.Start(rctx); err != nil {
if err := driver.Start(ctx); err != nil {
return nil, err
}

Expand All @@ -198,7 +198,7 @@ func (d *Daemon) newDriverPool(rctx context.Context, language string, image runt
"language": language,
})

if err := dp.Start(); err != nil {
if err := dp.Start(ctx); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ func buildMockedDaemon(t *testing.T, images ...runtime.DriverImage) (*Daemon, st
parsedBuild, err := time.Parse(time.RFC3339, testBuildDate)
d := NewDaemon("foo", parsedBuild, r)

dp := NewDriverPool(func() (Driver, error) {
dp := NewDriverPool(func(ctx context.Context) (Driver, error) {
return newEchoDriver(), nil
})

err = dp.Start()
err = dp.Start(context.Background())
require.NoError(err)

d.pool["python"] = dp
Expand Down
28 changes: 14 additions & 14 deletions daemon/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type DriverPool struct {
}

// FactoryFunction is a factory function that creates new DriverInstance's.
type FactoryFunction func() (Driver, error)
type FactoryFunction func(ctx context.Context) (Driver, error)

// NewDriverPool creates and starts a new DriverPool. It takes as parameters
// a FactoryFunction, used to instantiate new drivers.
Expand All @@ -77,10 +77,10 @@ func NewDriverPool(factory FactoryFunction) *DriverPool {
}

// Start stats the driver pool.
func (dp *DriverPool) Start() error {
func (dp *DriverPool) Start(ctx context.Context) error {
target := dp.ScalingPolicy.Scale(0, 0)
if err := dp.setInstances(target); err != nil {
_ = dp.setInstances(0)
if err := dp.setInstances(ctx, target); err != nil {
_ = dp.setInstances(context.Background(), 0)
return err
}
dp.running = true
Expand All @@ -92,24 +92,24 @@ func (dp *DriverPool) Start() error {
// setInstances changes the number of running driver instances. Instances
// will be started or stopped as necessary to satisfy the new instance count.
// It blocks until the all required instances are started or stopped.
func (dp *DriverPool) setInstances(target int) error {
func (dp *DriverPool) setInstances(ctx context.Context, target int) error {
if target < 0 {
return ErrNegativeInstances.New()
}

n := target - dp.stats.instances.Value()
if n == 0 {
return nil
}
if n > 0 {
return dp.add(n)
} else if n < 0 {
return dp.del(-n)
return dp.add(ctx, n)
}

return nil
return dp.del(-n)
}

func (dp *DriverPool) add(n int) error {
func (dp *DriverPool) add(ctx context.Context, n int) error {
for i := 0; i < n; i++ {
d, err := dp.factory()
d, err := dp.factory(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (dp *DriverPool) doScaling() {
}

dp.Logger.Debugf("scaling driver pool from %d instance(s) to %d instance(s)", total, s)
if err := dp.setInstances(s); err != nil {
if err := dp.setInstances(context.TODO(), s); err != nil {
dp.Logger.Errorf("error re-scaling pool: %s", err)
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (dp *DriverPool) Stop() error {
dp.closed = true
dp.close <- struct{}{}
<-dp.close
if err := dp.setInstances(0); err != nil {
if err := dp.setInstances(context.Background(), 0); err != nil {
return err
}

Expand Down
60 changes: 38 additions & 22 deletions daemon/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func TestDriverPoolClose_StartNoopClose(t *testing.T) {
require := require.New(t)
dp := NewDriverPool(newMockDriver)

err := dp.Start()
ctx := context.Background()
err := dp.Start(ctx)
require.NoError(err)

err = dp.Stop()
Expand All @@ -25,7 +26,7 @@ func TestDriverPoolClose_StartNoopClose(t *testing.T) {
err = dp.Stop()
require.True(ErrPoolClosed.Is(err))

err = dp.Execute(nil, 0)
err = dp.ExecuteCtx(ctx, nil)
require.True(ErrPoolClosed.Is(err))
}

Expand All @@ -34,7 +35,7 @@ func TestDriverPoolCurrent(t *testing.T) {

dp := NewDriverPool(newMockDriver)

err := dp.Start()
err := dp.Start(context.Background())
require.NoError(err)

require.Len(dp.Current(), 1)
Expand All @@ -43,12 +44,19 @@ func TestDriverPoolCurrent(t *testing.T) {
func TestDriverPoolExecute_Timeout(t *testing.T) {
require := require.New(t)

dp := NewDriverPool(func() (Driver, error) {
time.Sleep(time.Millisecond)
return newMockDriver()
dp := NewDriverPool(func(ctx context.Context) (Driver, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Millisecond):
}
return newMockDriver(ctx)
})

err := dp.Execute(nil, time.Nanosecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()

err := dp.ExecuteCtx(ctx, nil)
require.True(err == context.DeadlineExceeded)
}

Expand All @@ -57,7 +65,7 @@ func TestDriverPoolState(t *testing.T) {

dp := NewDriverPool(newMockDriver)

err := dp.Start()
err := dp.Start(context.Background())
require.NoError(err)
require.Equal(dp.State().Wanted, 1)
require.Equal(dp.State().Running, 1)
Expand All @@ -72,28 +80,28 @@ func TestDriverPoolState(t *testing.T) {
func TestDiverPoolStart_FailingDriver(t *testing.T) {
require := require.New(t)

dp := NewDriverPool(func() (Driver, error) {
dp := NewDriverPool(func(ctx context.Context) (Driver, error) {
return nil, fmt.Errorf("driver error")
})

err := dp.Start()
err := dp.Start(context.Background())
require.EqualError(err, "driver error")
}

func TestDriverPoolExecute_Recovery(t *testing.T) {
require := require.New(t)

var called int
dp := NewDriverPool(func() (Driver, error) {
dp := NewDriverPool(func(ctx context.Context) (Driver, error) {
called++
return newMockDriver()
return newMockDriver(ctx)
})

err := dp.Start()
require.NoError(err)

ctx := context.Background()

err := dp.Start(ctx)
require.NoError(err)

for i := 0; i < 100; i++ {
err := dp.ExecuteCtx(ctx, func(_ context.Context, d Driver) error {
require.NotNil(d)
Expand Down Expand Up @@ -121,14 +129,16 @@ func TestDriverPoolExecute_Sequential(t *testing.T) {

dp := NewDriverPool(newMockDriver)

err := dp.Start()
ctx := context.Background()

err := dp.Start(ctx)
require.NoError(err)

for i := 0; i < 100; i++ {
err := dp.Execute(func(_ context.Context, d Driver) error {
err := dp.ExecuteCtx(ctx, func(_ context.Context, d Driver) error {
require.NotNil(d)
return nil
}, 0)
})

require.Nil(err)
require.Equal(dp.State().Running, 1)
Expand All @@ -143,18 +153,24 @@ func TestDriverPoolExecute_Parallel(t *testing.T) {

dp := NewDriverPool(newMockDriver)

err := dp.Start()
ctx := context.Background()

err := dp.Start(ctx)
require.NoError(err)

var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
err := dp.Execute(func(_ context.Context, _ Driver) error {
err := dp.ExecuteCtx(ctx, func(_ context.Context, _ Driver) error {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(50 * time.Millisecond):
}
return nil
}, 0)
})

require.Nil(err)
require.True(len(dp.Current()) >= 1)
Expand Down

0 comments on commit 6babafa

Please sign in to comment.