Skip to content

Commit

Permalink
Move headers and delays into requestoption/ send option
Browse files Browse the repository at this point in the history
But keep codec as a property of the client, as this is a per method
option and you might want to store a client and use it repeatedly with
the codec set.
  • Loading branch information
jackkleeman committed Aug 21, 2024
1 parent 333963c commit acebb84
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 36 deletions.
6 changes: 3 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ type Awakeable interface {
// CallClient represents all the different ways you can invoke a particular service/key/method tuple.
type CallClient interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input any) ResponseFuture
RequestFuture(input any, opts ...options.RequestOption) ResponseFuture
// Request makes a call and blocks on getting the response which is stored in output
Request(input any, output any) error
Request(input any, output any, opts ...options.RequestOption) error
// Send makes a one-way call which is executed in the background
Send(input any, delay time.Duration)
Send(input any, opts ...options.SendOption)
}

// ResponseFuture is a handle on a potentially not-yet completed outbound call.
Expand Down
7 changes: 3 additions & 4 deletions examples/ticketreservation/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (boo
tickets = append(tickets, ticketId)

ctx.Set("tickets", tickets)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, 15*time.Minute)
ctx.Object(UserSessionServiceName, userId, "ExpireTicket").Send(ticketId, restate.WithDelay(15*time.Minute))

return true, nil
}
Expand All @@ -60,7 +60,7 @@ func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (
}

ctx.Set("tickets", tickets)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(nil, 0)
ctx.Object(TicketServiceName, ticketId, "Unreserve").Send(restate.Void{})

return void, nil
}
Expand Down Expand Up @@ -101,8 +101,7 @@ func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool,
ctx.Log().Info("payment details", "id", response.ID, "price", response.Price)

for _, ticket := range tickets {
call := ctx.Object(TicketServiceName, ticket, "MarkAsSold")
call.Send(nil, 0)
ctx.Object(TicketServiceName, ticket, "MarkAsSold").Send(restate.Void{})
}

ctx.Clear("tickets")
Expand Down
19 changes: 9 additions & 10 deletions facilitators.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package restate

import (
"errors"
"time"

"github.com/restatedev/sdk-go/internal/options"
)
Expand Down Expand Up @@ -58,11 +57,11 @@ func AwakeableAs[T any](ctx Context, options ...options.AwakeableOption) TypedAw
// TypedCallClient is an extension of [CallClient] which deals in typed values
type TypedCallClient[I any, O any] interface {
// RequestFuture makes a call and returns a handle on a future response
RequestFuture(input I) TypedResponseFuture[O]
RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O]
// Request makes a call and blocks on getting the response
Request(input I) (O, error)
Request(input I, opts ...options.RequestOption) (O, error)
// Send makes a one-way call which is executed in the background
Send(input I, delay time.Duration)
Send(input I, opts ...options.SendOption)
}

type typedCallClient[I any, O any] struct {
Expand All @@ -76,17 +75,17 @@ func NewTypedCallClient[I any, O any](client CallClient) TypedCallClient[I, O] {
return typedCallClient[I, O]{client}
}

func (t typedCallClient[I, O]) Request(input I) (output O, err error) {
err = t.inner.RequestFuture(input).Response(&output)
func (t typedCallClient[I, O]) Request(input I, opts ...options.RequestOption) (output O, err error) {
err = t.inner.RequestFuture(input, opts...).Response(&output)
return
}

func (t typedCallClient[I, O]) RequestFuture(input I) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input)}
func (t typedCallClient[I, O]) RequestFuture(input I, opts ...options.RequestOption) TypedResponseFuture[O] {
return typedResponseFuture[O]{t.inner.RequestFuture(input, opts...)}
}

func (t typedCallClient[I, O]) Send(input I, delay time.Duration) {
t.inner.Send(input, delay)
func (t typedCallClient[I, O]) Send(input I, opts ...options.SendOption) {
t.inner.Send(input, opts...)
}

// TypedResponseFuture is an extension of [ResponseFuture] which returns typed responses instead of accepting a pointer
Expand Down
26 changes: 23 additions & 3 deletions internal/options/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package options

import "github.com/restatedev/sdk-go/encoding"
import (
"time"

"github.com/restatedev/sdk-go/encoding"
)

type AwakeableOptions struct {
Codec encoding.Codec
Expand Down Expand Up @@ -35,14 +39,30 @@ type SetOption interface {
}

type CallOptions struct {
Codec encoding.Codec
Headers map[string]string
Codec encoding.Codec
}

type CallOption interface {
BeforeCall(*CallOptions)
}

type RequestOptions struct {
Headers map[string]string
}

type RequestOption interface {
BeforeRequest(*RequestOptions)
}

type SendOptions struct {
Headers map[string]string
Delay time.Duration
}

type SendOption interface {
BeforeSend(*SendOptions)
}

type RunOptions struct {
Codec encoding.Codec
}
Expand Down
22 changes: 16 additions & 6 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ type serviceCall struct {
}

// RequestFuture makes a call and returns a handle on the response
func (c *serviceCall) RequestFuture(input any) restate.ResponseFuture {
func (c *serviceCall) RequestFuture(input any, opts ...options.RequestOption) restate.ResponseFuture {
o := options.RequestOptions{}
for _, opt := range opts {
opt.BeforeRequest(&o)
}

bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal RequestFuture input: %w", err)))
}

entry, entryIndex := c.machine.doCall(c.service, c.key, c.method, c.options.Headers, bytes)
entry, entryIndex := c.machine.doCall(c.service, c.key, c.method, o.Headers, bytes)

return decodingResponseFuture{
futures.NewResponseFuture(c.machine.suspensionCtx, entry, entryIndex, func(err error) any { return c.machine.newProtocolViolation(entry, err) }),
Expand Down Expand Up @@ -59,17 +64,22 @@ func (d decodingResponseFuture) Response(output any) (err error) {
}

// Request makes a call and blocks on the response
func (c *serviceCall) Request(input any, output any) error {
return c.RequestFuture(input).Response(output)
func (c *serviceCall) Request(input any, output any, opts ...options.RequestOption) error {
return c.RequestFuture(input, opts...).Response(output)
}

// Send runs a call in the background after delay duration
func (c *serviceCall) Send(input any, delay time.Duration) {
func (c *serviceCall) Send(input any, opts ...options.SendOption) {
o := options.SendOptions{}
for _, opt := range opts {
opt.BeforeSend(&o)
}

bytes, err := encoding.Marshal(c.options.Codec, input)
if err != nil {
panic(c.machine.newCodecFailure(fmt.Errorf("failed to marshal Send input: %w", err)))
}
c.machine.sendCall(c.service, c.key, c.method, c.options.Headers, bytes, delay)
c.machine.sendCall(c.service, c.key, c.method, o.Headers, bytes, o.Delay)
return
}

Expand Down
26 changes: 24 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package restate

import (
"time"

"github.com/restatedev/sdk-go/encoding"
"github.com/restatedev/sdk-go/internal/options"
)
Expand Down Expand Up @@ -77,13 +79,33 @@ type withHeaders struct {
headers map[string]string
}

var _ options.CallOption = withHeaders{}
var _ options.RequestOption = withHeaders{}
var _ options.SendOption = withHeaders{}

func (w withHeaders) BeforeRequest(opts *options.RequestOptions) {
opts.Headers = w.headers
}

func (w withHeaders) BeforeCall(opts *options.CallOptions) {
func (w withHeaders) BeforeSend(opts *options.SendOptions) {
opts.Headers = w.headers
}

// WithHeaders is an option to specify outgoing headers when making a call
func WithHeaders(headers map[string]string) withHeaders {
return withHeaders{headers}
}

type withDelay struct {
delay time.Duration
}

var _ options.SendOption = withDelay{}

func (w withDelay) BeforeSend(opts *options.SendOptions) {
opts.Delay = w.delay
}

// WithDelay is an [SendOption] to specify the duration to delay the request
func WithDelay(delay time.Duration) withDelay {
return withDelay{delay}
}
2 changes: 1 addition & 1 deletion test-services/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {
Handler("recursiveCall", restate.NewObjectHandler(
func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) {
awakeable := ctx.Awakeable()
ctx.Object("AwakeableHolder", "kill", "hold").Send(awakeable.Id(), 0)
ctx.Object("AwakeableHolder", "kill", "hold").Send(awakeable.Id())
if err := awakeable.Result(restate.Void{}); err != nil {
return restate.Void{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions test-services/nondeterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {
return invocationCounts[countKey]%2 == 1
}
incrementCounter := func(ctx restate.ObjectContext) {
ctx.Object("Counter", ctx.Key(), "add").Send(int64(1), 0)
ctx.Object("Counter", ctx.Key(), "add").Send(int64(1))
}

REGISTRY.AddDefinition(
Expand Down Expand Up @@ -63,9 +63,9 @@ func init() {
Handler("backgroundInvokeWithDifferentTargets", restate.NewObjectHandler(
func(ctx restate.ObjectContext, _ restate.Void) (restate.Void, error) {
if doLeftAction(ctx) {
ctx.Object("Counter", "abc", "get").Send(restate.Void{}, 0)
ctx.Object("Counter", "abc", "get").Send(restate.Void{})
} else {
ctx.Object("Counter", "abc", "reset").Send(restate.Void{}, 0)
ctx.Object("Counter", "abc", "reset").Send(restate.Void{})
}

// This is required to cause a suspension after the non-deterministic operation
Expand Down
4 changes: 2 additions & 2 deletions test-services/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func init() {
// We need to use []int because Golang takes the opinionated choice of treating []byte as Base64
func(ctx restate.Context, req ProxyRequest) (restate.Void, error) {
input := intArrayToByteArray(req.Message)
req.ToTarget(ctx).Send(input, 0)
req.ToTarget(ctx).Send(input)
return restate.Void{}, nil
})).
Handler("manyCalls", restate.NewServiceHandler(
Expand All @@ -58,7 +58,7 @@ func init() {
for _, req := range requests {
input := intArrayToByteArray(req.ProxyRequest.Message)
if req.OneWayCall {
req.ProxyRequest.ToTarget(ctx).Send(input, 0)
req.ProxyRequest.ToTarget(ctx).Send(input)
} else {
fut := req.ProxyRequest.ToTarget(ctx).RequestFuture(input)
if req.AwaitAtTheEnd {
Expand Down
4 changes: 2 additions & 2 deletions test-services/upgradetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ func init() {
return "", fmt.Errorf("executeComplex should not be invoked with version different from 1!")
}
awakeable := restate.AwakeableAs[string](ctx)
ctx.Object("AwakeableHolder", "upgrade", "hold").Send(awakeable.Id(), 0)
ctx.Object("AwakeableHolder", "upgrade", "hold").Send(awakeable.Id())
if _, err := awakeable.Result(); err != nil {
return "", err
}
ctx.Object("ListObject", "upgrade-test", "append").Send(version(), 0)
ctx.Object("ListObject", "upgrade-test", "append").Send(version())
return version(), nil
})))
}

0 comments on commit acebb84

Please sign in to comment.