diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..2b2dd76 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1,2 @@ +# Packages maintained by the storage team. +/future/ @mesosphere/mesosphere-storage diff --git a/README.md b/README.md index a6447a2..f89a042 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ This project uses `go mod` to manage external dependencies. External libs are ve - [dcos](/dcos/): Common constants and helpers - [dcos/http/transport](/dcos/http/transport/README.md) : HTTP transport with JWT token support - [dcos/nodeutil](/dcos/nodeutil/README.md) : Interact with DC/OS services and variables +- [future](/future/) : Promise and Future types for Go. - [store](/store/README.md) : In-Memory key/value store. - [zkstore](/zkstore/README.md): ZK-based blob storage. - [elector](/elector/README.md): Leadership election. diff --git a/future/future.go b/future/future.go new file mode 100644 index 0000000..6e0272c --- /dev/null +++ b/future/future.go @@ -0,0 +1,668 @@ +package future + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" +) + +// Interface yields a Future. +type Interface interface { + Future() Future +} + +// Func is a func adapter for Interface. +type Func func() Future + +// Future implements Interface for Func, returns nil if the receiving Func is nil. +func (f Func) Future() Future { + if f == nil { + return nil + } + return f() +} + +// sanity check +var _ = Interface(Func(nil)) + +// Future represents some value and/or error that may be available at some point in the future. +// It represents the "read-only" side of the Promise/Future pipe. +type Future interface { + Interface + Done() <-chan struct{} + Err() error + Value() interface{} + + // Block blocks until the cancellation closes, or else the future completes. + // Returns the value and error of the future (if completed). + Block(<-chan struct{}) (interface{}, error) + + // BlockContext blocks until the context is cancelled, or else the future completes. + // Returns the value and error of the future (if completed), or else a nil value and cancellation error. + BlockContext(context.Context) (interface{}, error) +} + +type future struct { + done chan struct{} + err error + v interface{} +} + +func (f *future) Done() <-chan struct{} { return f.done } + +func (f *future) Err() (err error) { + select { + case <-f.done: + err = f.err + default: + } + return +} + +func (f *future) Value() (v interface{}) { + select { + case <-f.done: + v = f.v + default: + } + return +} + +func (f *future) Block(cancel <-chan struct{}) (v interface{}, err error) { + select { + case <-f.done: + v, err = f.v, f.err + case <-cancel: + select { + case <-f.done: + v, err = f.v, f.err + default: + err = context.Canceled + } + } + return +} + +func (f *future) BlockContext(ctx context.Context) (v interface{}, err error) { + select { + case <-f.done: + v, err = f.v, f.err + case <-ctx.Done(): + select { + case <-f.done: + v, err = f.v, f.err + default: + // NOTE: ctx could be a Future-derived Context, in which case + // it could be convenient to extract the completion value of + // said future and return it here. However, we don't have a + // requirement for that, and it also means that we prefer one + // Future over another, which seems somewhat arbitrary. + // It could also be surprising to the caller, whom may not + // expect a value of the type yielded by the Context-mapped + // Future. The safest thing to do here is to simply leave the + // value of `v` unset. + err = ctx.Err() + } + } + return +} + +func (f *future) Future() Future { return f } + +// Completer logic is executed upon or after the completion of a Promise. +type Completer interface { + Complete(interface{}, error) +} + +// CompleterFunc is the func adapter for Completer. +type CompleterFunc func(interface{}, error) + +// Complete implements Completer for CompleterFunc. +func (f CompleterFunc) Complete(v interface{}, err error) { f(v, err) } + +// Apply implements Option for CompleterFunc +func (f CompleterFunc) Apply(o *Options) { o.completers = append(o.completers, f) } + +// If yields the receiving func if the argument if `true`, otherwise yields nil. +func (f CompleterFunc) If(b bool) (result CompleterFunc) { + if b { + result = f + } + return +} + +var _ Completer = CompleterFunc(nil) // sanity check + +// Do adapts a generic func to the completion interface. +func Do(f func()) CompleterFunc { + return func(interface{}, error) { f() } +} + +// Options captures partial configuration state of a Promise. +type Options struct { + // completers are configured at promise construction-time and are invoked + // sychronously upon promise completion. they *should not* block. + completers []Completer +} + +// Optional configuration that may be applied to Options. +type Optional interface { + Apply(*Options) +} + +// Option is the func adapter for Optional. +type Option func(*Options) + +// Apply implements Optional for Option. +func (f Option) Apply(o *Options) { f(o) } + +// Promise supports the composition of a deferred result, it represents the "write" side of +// the Promise/Future pipe. The completion of a promise is idempotent: once a promise is +// completed its state will no longer change. +type Promise interface { + Interface + Error(error) Promise + Value(interface{}) Promise + Cancel() Promise + + // Complete completes this promise. + Complete(interface{}, error) + + // OnComplete schedules a completion function that's executed after the + // promise is completed. The given completion func never blocks the caller + // of this method. + OnComplete(func(interface{}, error)) Promise + + // OnPanic installs an interceptor that fires for panics in OnComplete handlers. + OnPanic(func(Panic)) Promise + + // Completer converts this promise into a Completer. Completion of the returned + // object completes this promise. + Completer() CompleterFunc +} + +type promise struct { + f Future + complete func(interface{}, error) + onComplete func(func(interface{}, error)) + onPanic func(func(Panic)) + options Options +} + +func (p promise) Future() Future { return p.f } +func (p promise) Error(err error) Promise { p.complete(nil, err); return p } +func (p promise) Value(v interface{}) Promise { p.complete(v, nil); return p } +func (p promise) Complete(v interface{}, err error) { p.complete(v, err) } +func (p promise) OnComplete(f func(interface{}, error)) Promise { p.onComplete(f); return p } +func (p promise) OnPanic(f func(Panic)) Promise { p.onPanic(f); return p } + +func (p promise) Cancel() Promise { p.complete(nil, context.Canceled); return p } + +func (p promise) Completer() CompleterFunc { return p.complete } + +// WithCompletion configures a synchronous completion listener for a promise, invoked at +// the time the promise is completed on the same goroutine as the promise's Complete(). +func WithCompletion(c Completer) Optional { + return Option(func(o *Options) { + o.completers = append(o.completers, c) + }) +} + +// WithCompletionF is syntactic sugar for writing completion closures. +// See WithCompletion. +func WithCompletionF(c CompleterFunc) Optional { return WithCompletion(c) } + +// NewPromise configures and yields a unique promise. +func NewPromise(opts ...Optional) Promise { + var ( + f = &future{ + done: make(chan struct{}), + } + m sync.Mutex + callbacks []func(interface{}, error) + completed int32 + panicHandler func(Panic) + options Options + ) + for _, oo := range opts { + if oo != nil { + oo.Apply(&options) + } + } + complete := func(v interface{}, err error) { + if atomic.LoadInt32(&completed) == 1 { + return + } + + m.Lock() + if atomic.LoadInt32(&completed) == 1 { + m.Unlock() + return + } + + defer func() { + callbacks = callbacks[:0] + close(f.done) + // do this after channel close in case any of the callbacks + // want to observe the "done" chan of the future: doing things + // in this order presents a more consistent view of the future. + atomic.StoreInt32(&completed, 1) + m.Unlock() + }() + + f.v = v + f.err = err + + for i := len(options.completers); i > 0; { + i-- + cf := options.completers[i] + defer func() { + success := false + + // apply panic handling to synchronous completers + defer func() { + if success || panicHandler == nil { + return + } + x := recover() + p := Panic{ + Recovered: x, + Val: v, + Err: err, + } + panicHandler(p) + }() + + cf.Complete(v, err) + success = true + }() + } + + if len(callbacks) > 0 { + cb2 := callbacks[:] + go func() { + <-f.done // ensure that the promise has completed + success := false + defer func() { + if success || panicHandler == nil { + return + } + x := recover() + p := Panic{ + Recovered: x, + Val: v, + Err: err, + } + panicHandler(p) + }() + for _, fn := range cb2 { + fn(v, err) + } + success = true + }() + } + } + onComplete := func(fn func(interface{}, error)) { + if fn == nil { + return + } + if atomic.LoadInt32(&completed) == 1 { + go fn(f.v, f.err) + return + } + + m.Lock() + if atomic.LoadInt32(&completed) == 1 { + m.Unlock() + go fn(f.v, f.err) + return + } + + callbacks = append(callbacks, fn) + m.Unlock() + } + onPanic := func(fn func(Panic)) { + if atomic.LoadInt32(&completed) == 1 { + return // already complete, noop + } + m.Lock() + if atomic.LoadInt32(&completed) == 1 { + m.Unlock() + return // already complete, noop + } + + panicHandler = fn + m.Unlock() + } + return promise{ + f: f, + complete: complete, + onComplete: onComplete, + onPanic: onPanic, + options: options, + } +} + +var alwaysDone = func() (ch chan struct{}) { + ch = make(chan struct{}) + close(ch) + return +}() + +type futureFixture struct { + v interface{} + err error +} + +func (futureFixture) Done() <-chan struct{} { return alwaysDone } +func (f futureFixture) Err() error { return f.err } +func (f futureFixture) Value() interface{} { return f.v } +func (f futureFixture) Block(<-chan struct{}) (interface{}, error) { return f.v, f.err } +func (f futureFixture) BlockContext(context.Context) (interface{}, error) { return f.v, f.err } +func (f futureFixture) Future() Future { return f } + +// Error returns a Fixture that always reports the given error. +func Error(err error) Future { return Fixture(nil, err) } + +// Value returns a Fixture that always reports the given value. +func Value(v interface{}) Future { return Fixture(v, nil) } + +var nilFixture = futureFixture{} // intentionally empty, always + +// Nil is an optimization and shorthand for Fixture(nil, nil). +func Nil() Future { return nilFixture } + +// Fixture returns an already-completed future with the given value and error. +func Fixture(v interface{}, err error) Future { return futureFixture{v: v, err: err} } + +type futureContext struct { + f Future +} + +// contextValueKind is the type for keys used to retrieve values from a context.Context. +type contextValueKind uint8 + +const ( + // AsContextValue may be used as a context key for retrieving the value of + // a Future-derived Context. + AsContextValue contextValueKind = iota +) + +func (futureContext) Deadline() (_ time.Time, _ bool) { return } +func (f futureContext) Done() <-chan struct{} { return f.f.Done() } +func (f futureContext) Value(k interface{}) (v interface{}) { + if k != AsContextValue { + return + } + select { + case <-f.f.Done(): + v = f.f.Value() + default: + } + return +} +func (f futureContext) Err() (err error) { + // Err() must return a non-nil error value if Done() is closed. + // If the underlying promise completes successfully then there is no + // error value in the promise to return, so instead we return Canceled + // since it's matches the semantics of the context package. + select { + case <-f.f.Done(): + err = f.f.Err() + if err == nil { + err = context.Canceled + } + default: + } + return +} + +var _ = context.Context(&futureContext{}) // sanity check + +// Context returns a context derived from the given future; it has no deadline. +func Context(f Interface) context.Context { return futureContext{f: f.Future()} } + +// Merger performs an accumulative merge of 1 or more Future results. +type Merger interface { + Merge(interface{}, error) (interface{}, error) +} + +// MergeFunc performs an accumulative merge of 1 or more Future results. +// It is the func adapter for Merger. +type MergeFunc func(interface{}, error) (interface{}, error) + +// Merge implements Merger for MergeFunc. +func (f MergeFunc) Merge(v interface{}, err error) (interface{}, error) { return f(v, err) } + +var _ = Merger(MergeFunc(nil)) // sanity check + +// Discard is a Future merge-function that discards everything and always returns (nil, nil). +func Discard() MergeFunc { return func(interface{}, error) (_ interface{}, _ error) { return } } + +// FirstError is a Merger that tracks the first error its asked to Merge. +type FirstError struct{ error } + +// Merge implements Merger. +func (f *FirstError) Merge(_ interface{}, err error) (interface{}, error) { + if f.error == nil && err != nil { + f.error = err + } + return nil, f.error +} + +var _ = Merger(&FirstError{}) // sanity check + +// Block waits for all given futures to complete, returning the merged value of their results +// according to the specified merge-function. The given merge function SHALL NOT be invoked +// concurrently. +func Block(merge Merger, futures ...Interface) (v interface{}, err error) { + return BlockContext(context.Background(), merge, futures...) +} + +// BlockContext is like Block, but can be cancelled by the context. +func BlockContext(ctx context.Context, merge Merger, futures ...Interface) (v interface{}, err error) { + if merge == nil { + merge = Discard() + } + var ( + wg sync.WaitGroup + m sync.Mutex + ) + for i := range futures { + if futures[i] == nil { + continue + } + f := futures[i].Future() + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-f.Done(): + case <-ctx.Done(): + select { + case <-f.Done(): + default: + m.Lock() + defer m.Unlock() + _, err = merge.Merge(nil, ctx.Err()) + return + } + } + + m.Lock() + defer m.Unlock() + v, err = merge.Merge(f.Value(), f.Err()) + }() + } + wg.Wait() + return +} + +// Join is like Block, except that it's non-blocking. The merged results are returned by Future. +func Join(merge Merger, futures ...Interface) Future { + return JoinContext(context.Background(), merge, futures...) +} + +// JoinContext is like BlockContext, except that it's non-blocking. The merged results are returned by Future. +func JoinContext(ctx context.Context, merge Merger, futures ...Interface) Future { + p := NewPromise() + go func() { + v, err := BlockContext(ctx, merge, futures...) + p.Complete(v, err) + }() + return p.Future() +} + +// Await returns a Future that completes upon cancelation of a context. +func Await(ctx context.Context) Future { + return ctxFuture{done: ctx.Done, err: ctx.Err} +} + +type ctxFuture struct { + done func() <-chan struct{} + err func() error +} + +func (c ctxFuture) Future() Future { return c } +func (c ctxFuture) Done() <-chan struct{} { return c.done() } +func (c ctxFuture) Err() error { return c.err() } +func (c ctxFuture) Value() interface{} { return nil } + +func (c ctxFuture) Block(done <-chan struct{}) (interface{}, error) { + select { + case <-c.done(): + return nil, c.err() + case <-done: + return nil, nil + } +} + +func (c ctxFuture) BlockContext(ctx context.Context) (interface{}, error) { + select { + case <-c.done(): + return nil, c.err() + case <-ctx.Done(): + // NOTE: ctx could be a Future-derived Context, in which case + // it could be convenient to extract the completion value of + // said future and return it here. However, we don't have a + // requirement for that, and it also means that we prefer one + // Future over another, which seems somewhat arbitrary. + // It could also be surprising to the caller, whom may not + // expect a value of the type yielded by the Context-mapped + // Future. The safest thing to do here is to simply leave the + // value of `v` unset. + return nil, ctx.Err() + } +} + +// AwaitChan returns a Future that completes when the given signal chan closes. +// No data is ever expected to flow over the "done" chan, it should be closed to +// complete the returned future. +// The Err() func of the returned future returns context.Canceled once the done +// chan is closed. +// Deprecated in favor of AwaitChanNoError. +func AwaitChan(done <-chan struct{}) Future { + return ctxFuture{ + done: func() <-chan struct{} { return done }, + err: func() (err error) { + select { + case <-done: + err = context.Canceled + default: + } + return + }, + } +} + +// AwaitChanNoError returns a Future that completes when the given signal chan closes. +// No data is ever expected to flow over the "done" chan, it should be closed to +// complete the returned future. +// The Err() func of the returned future returns nil, always (vs. the behavior of AwaitChan). +func AwaitChanNoError(done <-chan struct{}) Future { + return ctxFuture{ + done: func() <-chan struct{} { return done }, + err: func() (_ error) { return }, + } +} + +// Lazy returns the value of a factory func, invoking the factory func (and caching the +// result) the first time the value is needed and returning the cached result upon subsequent +// invocations. +func Lazy(f func() interface{}) func() interface{} { + var m sync.Once + var val interface{} + return func() interface{} { + m.Do(func() { + val = f() + }) + return val + } +} + +// Panic captures a recovered panic value and the completion value of +// a promise whose completion handler is panicking. +type Panic struct { + Recovered interface{} + Val interface{} + Err error +} + +// ErrFlattenInvalid is returned from Flatten if the given value does not implement Interface. +// See Flatten. +var ErrFlattenInvalid = errors.New("cannot flatten non-Future value") + +// Flatten deferences the value of `v` as an Interface, unless v is empty (nil) or err is +// not nil. If `v` is not actually an Interface then ErrFlattenInvalid is reported by +// the returned future. +func Flatten(v interface{}, err error) Interface { + if v == nil || err != nil { + return Fixture(v, err) + } + f, ok := v.(Future) + if !ok { + return Fixture(v, ErrFlattenInvalid) + } + return f +} + +// Mapper transforms the completion values of a Future. +type Mapper func(interface{}, error) Interface + +// Map returns an Interface that reports the mapped value of `f` as an Interface. +// Invocations of the Interface's Future() will block until `f` is ready. +func Map(ctx context.Context, f Interface, m Mapper) Interface { + // delayed eval allows this func to return without blocking. + return Func(func() Future { return m(f.Future().BlockContext(ctx)).Future() }) +} + +// FlatMap is a convenience func that invokes Map with Flatten as the mapping func. +func FlatMap(ctx context.Context, f Interface) Interface { return Map(ctx, f, Flatten) } + +// ErrorOf reduces the completion result of a future to just its error component, useful +// when the caller isn't interested in the value component of the future. Intended to be +// composed with Block and BlockContext. +func ErrorOf(_ interface{}, err error) error { return err } + +// ValueOf reduces the completion result of a future to just its value component, useful +// when the caller isn't interested in the error component of the future. Intended to be +// composed with Block and BlockContext. +func ValueOf(v interface{}, _ error) interface{} { return v } + +// HasCompleted returns true if the given future has reached a terminal state. +func HasCompleted(i Interface) bool { + f := i.Future() + select { + case <-f.Done(): + return true + default: + } + return false +} + +var completedPromise = func() Promise { p := NewPromise(); p.Complete(nil, nil); return p }() + +// EmptyPromise returns an already-completed promise with a nil value and error. +func EmptyPromise() Promise { return completedPromise } diff --git a/future/future_test.go b/future/future_test.go new file mode 100644 index 0000000..31cb133 --- /dev/null +++ b/future/future_test.go @@ -0,0 +1,126 @@ +package future + +import ( + "context" + "errors" + "testing" +) + +func TestFutureBlockChan(t *testing.T) { + closed := func() chan struct{} { + c := make(chan struct{}) + close(c) + return c + }() + errRandom := errors.New("some random error") + for ti, tc := range map[string]struct { + done chan struct{} + cancel <-chan struct{} + v interface{} + e error + // -- wants: + val interface{} + err error + }{ + "canceled": {cancel: closed}, + "done-with-val": {done: closed, v: "1", val: "1"}, + "done-with-err": {done: closed, e: errRandom, err: errRandom}, + "done-with-val+err": {done: closed, v: "1", val: "1", e: errRandom, err: errRandom}, + "done-with-val,canceled": {done: closed, v: "1", val: "1", cancel: closed}, + "done-with-err,canceled": {done: closed, e: errRandom, err: errRandom, cancel: closed}, + "done-with-val+err,canceled": {done: closed, v: "1", val: "1", e: errRandom, err: errRandom, cancel: closed}, + } { + t.Run(ti, func(t *testing.T) { + for range make([]struct{}, 20) { // loop exercises tie-breaking + f := &future{done: tc.done, v: tc.v, err: tc.e} + v, err := f.Block(tc.cancel) + + select { + case <-f.done: + if tc.val != v { + t.Fatalf("expected value %v instead of %v", tc.val, v) + } + if err != nil && tc.err != err { + t.Fatalf("expected error %v instead of %v", tc.err, err) + } + return + default: + } + select { + case <-tc.cancel: + if v != nil { + t.Fatalf("unexpected value: %v", v) + } + if err != context.Canceled { + t.Fatalf("unexpected error: %v", err) + } + default: + } + } + }) + } +} + +func TestFutureBlockContext(t *testing.T) { + canceled := func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }() + closed := func() chan struct{} { + c := make(chan struct{}) + close(c) + return c + }() + errRandom := errors.New("some random error") + for ti, tc := range map[string]struct { + done chan struct{} + ctx context.Context + v interface{} + e error + // -- wants: + val interface{} + err error + }{ + "canceled": {ctx: canceled}, + "done-with-val": {done: closed, v: "1", val: "1"}, + "done-with-err": {done: closed, e: errRandom, err: errRandom}, + "done-with-val+err": {done: closed, v: "1", val: "1", e: errRandom, err: errRandom}, + "done-with-val,canceled": {done: closed, v: "1", val: "1", ctx: canceled}, + "done-with-err,canceled": {done: closed, e: errRandom, err: errRandom, ctx: canceled}, + "done-with-val+err,canceled": {done: closed, v: "1", val: "1", e: errRandom, err: errRandom, ctx: canceled}, + } { + t.Run(ti, func(t *testing.T) { + for range make([]struct{}, 20) { // loop exercises tie-breaking + ctx := tc.ctx + if ctx == nil { + ctx = context.Background() + } + f := &future{done: tc.done, v: tc.v, err: tc.e} + v, err := f.BlockContext(ctx) + + select { + case <-f.done: + if tc.val != v { + t.Fatalf("expected value %v instead of %v", tc.val, v) + } + if err != nil && tc.err != err { + t.Fatalf("expected error %v instead of %v", tc.err, err) + } + return + default: + } + select { + case <-ctx.Done(): + if v != nil { + t.Fatalf("unexpected value: %v", v) + } + if err != context.Canceled { + t.Fatalf("unexpected error: %v", err) + } + default: + } + } + }) + } +} diff --git a/future/must.go b/future/must.go new file mode 100644 index 0000000..0a5aa69 --- /dev/null +++ b/future/must.go @@ -0,0 +1,13 @@ +package future + +// HandleMust may be overridden by downstream consumers w/ custom handlers for +// assertion failure. +var HandleMust = func(err error) { panic(err) } + +// Must returns the given value only if the given error is nil; otherwise invokes HandleMust. +func Must(v interface{}, err error) interface{} { + if err != nil { + HandleMust(err) + } + return v +}