Skip to content

Commit

Permalink
Various improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 16, 2024
1 parent a162391 commit 1830197
Show file tree
Hide file tree
Showing 18 changed files with 1,099 additions and 609 deletions.
140 changes: 140 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package restate

import (
"context"
"log/slog"
"time"

"github.com/restatedev/sdk-go/internal/futures"
"github.com/restatedev/sdk-go/internal/rand"
)

type Context interface {
RunContext

// Rand returns a random source which will give deterministic results for a given invocation
// The source wraps the stdlib rand.Rand but with some extra helper methods
// This source is not safe for use inside .Run()
Rand() *rand.Rand

// Sleep for the duration d
Sleep(d time.Duration)
// After is an alternative to Context.Sleep which allows you to complete other tasks concurrently
// with the sleep. This is particularly useful when combined with Context.Select to race between
// the sleep and other Selectable operations.
After(d time.Duration) After

// Service gets a Service accessor by name where service
// must be another service known by restate runtime
Service(service, method string) CallClient[[]byte, []byte]

// Object gets a Object accessor by name where object
// must be another object known by restate runtime and
// key is any string representing the key for the object
Object(object, key, method string) CallClient[[]byte, []byte]

// Run runs the function (fn), storing final results (including terminal errors)
// durably in the journal, or otherwise for transient errors stopping execution
// so Restate can retry the invocation. Replays will produce the same value, so
// all non-deterministic operations (eg, generating a unique ID) *must* happen
// inside Run blocks.
// Note: use the RunAs helper function to serialise non-[]byte return values
Run(fn func(RunContext) ([]byte, error)) ([]byte, error)

// Awakeable returns a Restate awakeable; a 'promise' to a future
// value or error, that can be resolved or rejected by other services.
// Note: use the AwakeableAs helper function to deserialise the []byte value
Awakeable() Awakeable[[]byte]
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// resolved with a particular value.
// Note: use the ResolveAwakeableAs helper function to provide a value to be serialised
ResolveAwakeable(id string, value []byte)
// ResolveAwakeable allows an awakeable (not necessarily from this service) to be
// rejected with a particular error.
RejectAwakeable(id string, reason error)

// Select returns an iterator over blocking Restate operations (sleep, call, awakeable)
// which allows you to safely run them in parallel. The Selector will store the order
// that things complete in durably inside Restate, so that on replay the same order
// can be used. This avoids non-determinism. It is *not* safe to use goroutines or channels
// outside of Context.Run functions, as they do not behave deterministically.
Select(futs ...futures.Selectable) Selector
}

type CallClient[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I) (ResponseFuture[O], error)
// Request makes a call and blocks on getting the response
Request(input I) (O, error)
SendClient[I]
}

type SendClient[I any] interface {
// Send makes a one-way call which is executed in the background
Send(input I, delay time.Duration) error
}

type ResponseFuture[O any] interface {
// Response blocks on the response to the call
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Response() (O, error)
futures.Selectable
}

// Selector is an iterator over a list of blocking Restate operations that are running
// in the background.
type Selector interface {
// Remaining returns whether there are still operations that haven't been returned by Select().
// There will always be exactly the same number of results as there were operations
// given to Context.Select
Remaining() bool
// Select blocks on the next completed operation
Select() futures.Selectable
}

// RunContext methods are the only methods safe to call from inside a .Run()
type RunContext interface {
context.Context

// Log obtains a handle on a slog.Logger which already has some useful fields (invocationID and method)
// By default, this logger will not output messages if the invocation is currently replaying
// The log handler can be set with `.WithLogger()` on the server object
Log() *slog.Logger
}

// After is a handle on a Sleep operation which allows you to do other work concurrently
// with the sleep.
type After interface {
// Done blocks waiting on the remaining duration of the sleep.
// It is *not* safe to call this in a goroutine - use Context.Select if you
// want to wait on multiple results at once.
Done()
futures.Selectable
}

type ObjectContext interface {
Context
KeyValueStore
// Key retrieves the key for this virtual object invocation. This is a no-op and is
// always safe to call.
Key() string
}

type KeyValueStore interface {
// Set sets a byte array against a key
// Note: Use SetAs helper function to seamlessly store
// a value of specific type.
Set(key string, value []byte)
// Get gets value (bytes array) associated with key
// If key does not exist, this function return a nil bytes array
// Note: Use GetAs helper function to seamlessly get value
// as specific type.
Get(key string) []byte
// Clear deletes a key
Clear(key string)
// ClearAll drops all stored state associated with key
ClearAll()
// Keys returns a list of all associated key
Keys() []string
}
212 changes: 184 additions & 28 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@ package encoding

import (
"encoding/json"
"fmt"
"reflect"

"google.golang.org/protobuf/proto"
)

type Void struct{}

type Codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}

type PayloadCodec interface {
Codec
InputPayload() *InputPayload
OutputPayload() *OutputPayload
}
type InputPayload struct {
Required bool `json:"required"`
ContentType *string `json:"contentType,omitempty"`
Expand All @@ -18,53 +32,195 @@ type OutputPayload struct {
JsonSchema interface{} `json:"jsonSchema,omitempty"`
}

type JSONDecoder[I any] struct{}
type VoidCodec struct{}

func (j JSONDecoder[I]) InputPayload() *InputPayload {
return &InputPayload{Required: true, ContentType: proto.String("application/json")}
var _ PayloadCodec = VoidCodec{}

func (j VoidCodec) InputPayload() *InputPayload {
return &InputPayload{}
}

func (j JSONDecoder[I]) Decode(data []byte) (input I, err error) {
err = json.Unmarshal(data, &input)
return
func (j VoidCodec) OutputPayload() *OutputPayload {
return &OutputPayload{}
}

type JSONEncoder[O any] struct{}
func (j VoidCodec) Unmarshal(data []byte, input any) (err error) {
return nil
}

func (j JSONEncoder[O]) OutputPayload() *OutputPayload {
return &OutputPayload{ContentType: proto.String("application/json")}
func (j VoidCodec) Marshal(output any) ([]byte, error) {
return nil, nil
}

func (j JSONEncoder[O]) Encode(output O) ([]byte, error) {
return json.Marshal(output)
type PairCodec struct {
Input PayloadCodec
Output PayloadCodec
}

type MessagePointer[I any] interface {
proto.Message
*I
var _ PayloadCodec = PairCodec{}

func (w PairCodec) InputPayload() *InputPayload {
return w.Input.InputPayload()
}

type ProtoDecoder[I any, IP MessagePointer[I]] struct{}
func (w PairCodec) OutputPayload() *OutputPayload {
return w.Output.OutputPayload()
}

func (p ProtoDecoder[I, IP]) InputPayload() *InputPayload {
return &InputPayload{Required: true, ContentType: proto.String("application/proto")}
func (w PairCodec) Unmarshal(data []byte, v any) error {
return w.Input.Unmarshal(data, v)
}

func (p ProtoDecoder[I, IP]) Decode(data []byte) (input IP, err error) {
// Unmarshal expects a non-nil pointer to a proto.Message implementing struct
// hence we must have a type parameter for the struct itself (I) and here we allocate
// a non-nil pointer of type IP
input = IP(new(I))
err = proto.Unmarshal(data, input)
return
func (w PairCodec) Marshal(v any) ([]byte, error) {
return w.Output.Marshal(v)
}

type ProtoEncoder[O proto.Message] struct{}
func MergeCodec(base, overlay PayloadCodec) PayloadCodec {
switch {
case base == nil && overlay == nil:
return nil
case base == nil:
return overlay
case overlay == nil:
return base
}

basePair, baseOk := base.(PairCodec)
overlayPair, overlayOk := overlay.(PairCodec)

switch {
case baseOk && overlayOk:
return PairCodec{
Input: MergeCodec(basePair.Input, overlayPair.Input),
Output: MergeCodec(basePair.Output, overlayPair.Output),
}
case baseOk:
return PairCodec{
Input: MergeCodec(basePair.Input, overlay),
Output: MergeCodec(basePair.Output, overlay),
}
case overlayOk:
return PairCodec{
Input: MergeCodec(base, overlayPair.Input),
Output: MergeCodec(base, overlayPair.Output),
}
default:
// just two non-pairs; keep base
return base
}
}

func PartialVoidCodec[I any, O any]() PayloadCodec {
var input I
var output O
_, inputVoid := any(input).(Void)
_, outputVoid := any(output).(Void)
switch {
case inputVoid && outputVoid:
return VoidCodec{}
case inputVoid:
return PairCodec{Input: VoidCodec{}, Output: nil}
case outputVoid:
return PairCodec{Input: nil, Output: VoidCodec{}}
default:
return nil
}
}

type BinaryCodec struct{}

var _ PayloadCodec = BinaryCodec{}

func (j BinaryCodec) InputPayload() *InputPayload {
return &InputPayload{Required: true, ContentType: proto.String("application/octet-stream")}
}

func (j BinaryCodec) OutputPayload() *OutputPayload {
return &OutputPayload{ContentType: proto.String("application/octet-stream")}
}

func (j BinaryCodec) Unmarshal(data []byte, input any) (err error) {
switch input := input.(type) {
case *[]byte:
*input = data
return nil
default:
return fmt.Errorf("BinaryCodec.Unmarshal called with a type that is not *[]byte")
}
}

func (p ProtoEncoder[O]) OutputPayload() *OutputPayload {
func (j BinaryCodec) Marshal(output any) ([]byte, error) {
switch output := output.(type) {
case []byte:
return output, nil
default:
return nil, fmt.Errorf("BinaryCodec.Marshal called with a type that is not []byte")
}
}

type JSONCodec struct{}

var _ PayloadCodec = JSONCodec{}

func (j JSONCodec) InputPayload() *InputPayload {
return &InputPayload{Required: true, ContentType: proto.String("application/json")}
}

func (j JSONCodec) OutputPayload() *OutputPayload {
return &OutputPayload{ContentType: proto.String("application/json")}
}

func (j JSONCodec) Unmarshal(data []byte, input any) (err error) {
return json.Unmarshal(data, &input)
}

func (j JSONCodec) Marshal(output any) ([]byte, error) {
return json.Marshal(output)
}

type ProtoCodec struct{}

var _ PayloadCodec = ProtoCodec{}

func (p ProtoCodec) InputPayload() *InputPayload {
return &InputPayload{Required: true, ContentType: proto.String("application/proto")}
}

func (p ProtoCodec) OutputPayload() *OutputPayload {
return &OutputPayload{ContentType: proto.String("application/proto")}
}

func (p ProtoEncoder[O]) Encode(output O) ([]byte, error) {
return proto.Marshal(output)
func (p ProtoCodec) Unmarshal(data []byte, input any) (err error) {
switch input := input.(type) {
case proto.Message:
// called with a *Message
return proto.Unmarshal(data, input)
default:
// we must support being called with a **Message where *Message is nil because this is the result of new(I) where I is a proto.Message
// and calling with new(I) is really the only generic approach.
value := reflect.ValueOf(input)
if value.Kind() != reflect.Pointer || value.IsNil() || value.Elem().Kind() != reflect.Pointer {
return fmt.Errorf("ProtoCodec.Unmarshal called with neither a proto.Message nor a non-nil pointer to a type that implements proto.Message.")
}
elem := value.Elem() // hopefully a *Message
if elem.IsNil() {
// allocate a &Message and swap this in
elem.Set(reflect.New(elem.Type().Elem()))
}
switch elemI := elem.Interface().(type) {
case proto.Message:
return proto.Unmarshal(data, elemI)
default:
return fmt.Errorf("ProtoCodec.Unmarshal called with neither a proto.Message nor a non-nil pointer to a type that implements proto.Message.")
}
}
}

func (p ProtoCodec) Marshal(output any) (data []byte, err error) {
switch output := output.(type) {
case proto.Message:
return proto.Marshal(output)
default:
return nil, fmt.Errorf("ProtoCodec.Marshal called with a type that is not a proto.Message")
}
}
Loading

0 comments on commit 1830197

Please sign in to comment.