Skip to content

Commit

Permalink
Merge pull request #22 from smola/fix-scaling-policy
Browse files Browse the repository at this point in the history
pool: use a new scaling policy instance per pool
  • Loading branch information
smola authored Jun 2, 2017
2 parents 3576f19 + be3ba72 commit fc0811c
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 9 deletions.
11 changes: 8 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
)

var (
// DefaultScalingPolicy is the default ScalingPolicy.
DefaultScalingPolicy = MovingAverage(10, MinMax(1, 10, AIMD(1, 0.5)))
// DefaultPoolTimeout is the time a request to the DriverPool can wait
// before getting a driver assigned.
DefaultPoolTimeout = time.Second * 5
Expand Down Expand Up @@ -234,6 +232,12 @@ type ScalingPolicy interface {
Scale(total, load int) int
}

// DefaultScalingPolicy returns a new instance of the default scaling policy.
// Instances returned by this function should not be reused.
func DefaultScalingPolicy() ScalingPolicy {
return MovingAverage(10, MinMax(1, 10, AIMD(1, 0.5)))
}

type movingAverage struct {
ScalingPolicy
loads []float64
Expand All @@ -242,7 +246,8 @@ type movingAverage struct {
}

// MovingAverage computes a moving average of the load and forwards it to the
// underlying scaling policy.
// underlying scaling policy. This policy is stateful and not thread-safe, do not
// reuse its instances for multiple pools.
func MovingAverage(window int, p ScalingPolicy) ScalingPolicy {
return &movingAverage{
ScalingPolicy: p,
Expand Down
8 changes: 4 additions & 4 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDriverPoolStartNoopClose(t *testing.T) {
return &mockDriver{}, nil
}

dp, err := StartDriverPool(DefaultScalingPolicy, DefaultPoolTimeout, new)
dp, err := StartDriverPool(DefaultScalingPolicy(), DefaultPoolTimeout, new)
require.NoError(err)
require.NotNil(dp)

Expand All @@ -56,7 +56,7 @@ func TestDriverPoolStartFailingDriver(t *testing.T) {
return nil, fmt.Errorf("driver error")
}

dp, err := StartDriverPool(DefaultScalingPolicy, DefaultPoolTimeout, new)
dp, err := StartDriverPool(DefaultScalingPolicy(), DefaultPoolTimeout, new)
require.EqualError(err, "driver error")
require.Nil(dp)
}
Expand All @@ -74,7 +74,7 @@ func TestDriverPoolSequential(t *testing.T) {
}, nil
}

dp, err := StartDriverPool(DefaultScalingPolicy, DefaultPoolTimeout, new)
dp, err := StartDriverPool(DefaultScalingPolicy(), DefaultPoolTimeout, new)
require.NoError(err)
require.NotNil(dp)

Expand Down Expand Up @@ -103,7 +103,7 @@ func TestDriverPoolParallel(t *testing.T) {
}, nil
}

dp, err := StartDriverPool(DefaultScalingPolicy, time.Second*10, new)
dp, err := StartDriverPool(DefaultScalingPolicy(), time.Second*10, new)
require.NoError(err)
require.NotNil(dp)

Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *Server) AddDriver(lang string, img string) error {
return ErrRuntime.Wrap(err)
}

dp, err := StartDriverPool(DefaultScalingPolicy, DefaultPoolTimeout, func() (Driver, error) {
dp, err := StartDriverPool(DefaultScalingPolicy(), DefaultPoolTimeout, func() (Driver, error) {
return ExecDriver(s.rt, image)
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNewServerMockedDriverParallelClients(t *testing.T) {
require.NoError(err)

s := NewServer(r)
dp, err := StartDriverPool(DefaultScalingPolicy, DefaultPoolTimeout, func() (Driver, error) {
dp, err := StartDriverPool(DefaultScalingPolicy(), DefaultPoolTimeout, func() (Driver, error) {
return &echoDriver{}, nil
})
require.NoError(err)
Expand Down

0 comments on commit fc0811c

Please sign in to comment.