Skip to content

Commit

Permalink
Merge tag 'v1.6.9' into release/1.11
Browse files Browse the repository at this point in the history
# Conflicts:
#	actor/actor.go
#	actor/manager/container.go
#	actor/manager/manager.go
#	actor/mock/mock_server.go
  • Loading branch information
wXwcoder committed Aug 9, 2024
2 parents 9bc7d82 + 7b77b05 commit 3ab2048
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 12 deletions.
6 changes: 5 additions & 1 deletion actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ type Server interface {
// SaveState is impl by ServerImplBase, It saves the state cache of this actor instance to state store component by calling api of daprd.
// Save state is called at two places: 1. On invocation of this actor instance. 2. When new actor starts.
SaveState() error

// Activate called when actor created by actor manager
Activate(invokeName string) error
// Deactivate called before actor removed by actor manager
Deactivate() error

WithContext() ServerContext
}

Expand Down
34 changes: 33 additions & 1 deletion actor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import "github.com/dapr/go-sdk/actor/codec/constant"

// ActorConfig is Actor's configuration struct.
type ActorConfig struct {
SerializerType string
SerializerType string
ActorIdleTimeout string
ActorScanInterval string
DrainOngingCallTimeout string
DrainBalancedActors bool
}

// Option is option function of ActorConfig.
Expand All @@ -30,6 +34,34 @@ func WithSerializerName(serializerType string) Option {
}
}

// WithActorIdleTimeout set actorIdleTimeout type of the actor as @actorIdleTimeout.
func WithActorIdleTimeout(actorIdleTimeout string) Option {
return func(config *ActorConfig) {
config.ActorIdleTimeout = actorIdleTimeout
}
}

// WithActorScanInterval set actorScanInterval type of the actor as @actorScanInterval.
func WithActorScanInterval(actorScanInterval string) Option {
return func(config *ActorConfig) {
config.ActorScanInterval = actorScanInterval
}
}

// WithDrainOngingCallTimeout set drainOngingCallTimeout type of the actor as @drainOngingCallTimeout.
func WithDrainOngingCallTimeout(drainOngingCallTimeout string) Option {
return func(config *ActorConfig) {
config.DrainOngingCallTimeout = drainOngingCallTimeout
}
}

// WithDrainBalancedActors set drainBalancedActors type of the actor as @drainBalancedActors.
func WithDrainBalancedActors(drainBalancedActors bool) Option {
return func(config *ActorConfig) {
config.DrainBalancedActors = drainBalancedActors
}
}

// GetConfigFromOptions get final ActorConfig set by @opts.
func GetConfigFromOptions(opts ...Option) *ActorConfig {
conf := &ActorConfig{
Expand Down
8 changes: 8 additions & 0 deletions actor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@ func TestRegisterActorTimer(t *testing.T) {
t.Run("get config with option", func(t *testing.T) {
config := GetConfigFromOptions(
WithSerializerName("mockSerializerType"),
WithActorIdleTimeout("1m"),
WithActorScanInterval("10s"),
WithDrainOngingCallTimeout("10s"),
WithDrainBalancedActors(true),
)
assert.NotNil(t, config)
assert.Equal(t, "mockSerializerType", config.SerializerType)
assert.Equal(t, "1m", config.ActorIdleTimeout)
assert.Equal(t, "10s", config.ActorScanInterval)
assert.Equal(t, "10s", config.DrainOngingCallTimeout)
assert.Equal(t, true, config.DrainBalancedActors)
})
}
16 changes: 13 additions & 3 deletions actor/manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ActorContainer interface {
Invoke(methodName string, param []byte) ([]reflect.Value, actorErr.ActorErr)
//nolint:staticcheck // SA1019 Deprecated: use ActorContainerContext instead.
GetActor() actor.Server
Deactivate() error
}

type ActorContainerContext interface {
Expand Down Expand Up @@ -79,12 +80,17 @@ func NewDefaultActorContainerContext(ctx context.Context, actorID string, impl a
impl.SetID(actorID)
daprClient, _ := dapr.NewClient()
// create state manager for this new actor
impl.SetStateManager(state.NewActorStateManagerContext(impl.Type(), actorID, state.NewDaprStateAsyncProvider(daprClient)))
// save state of this actor
err := impl.SaveState(ctx)
impl.SetStateManager(state.NewActorStateManager(impl.Type(), actorID, state.NewDaprStateAsyncProvider(daprClient)))

Check failure on line 83 in actor/manager/container.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

cannot use state.NewActorStateManager(impl.Type(), actorID, state.NewDaprStateAsyncProvider(daprClient)) (value of type actor.StateManager) as actor.StateManagerContext value in argument to impl.SetStateManager: actor.StateManager does not implement actor.StateManagerContext (wrong type for method Add)

Check failure on line 83 in actor/manager/container.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

cannot use state.NewActorStateManager(impl.Type(), actorID, state.NewDaprStateAsyncProvider(daprClient)) (value of type actor.StateManager) as actor.StateManagerContext value in argument to impl.SetStateManager: actor.StateManager does not implement actor.StateManagerContext (wrong type for method Add)
// move out for Activate param
/*err := impl.Activate(ctx)
if err != nil {
return nil, actorErr.ErrSaveStateFailed
}
// save state of this actor
err = impl.SaveState()
if err != nil {
return nil, actorErr.ErrSaveStateFailed
}*/
methodType, err := getAbsctractMethodMap(impl)
if err != nil {
log.Printf("failed to get absctract method map from registered provider, err = %s", err)
Expand Down Expand Up @@ -118,6 +124,10 @@ func (d *DefaultActorContainerContext) Invoke(ctx context.Context, methodName st
return returnValue, actorErr.Success
}

func (d *DefaultActorContainer) Deactivate() error {
return d.actor.Deactivate()
}

func (d *DefaultActorContainerContext) GetActor() actor.ServerContext {
return d.actor
}
61 changes: 57 additions & 4 deletions actor/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type ActorManager interface {
DeactivateActor(actorID string) actorErr.ActorErr
InvokeReminder(actorID, reminderName string, params []byte) actorErr.ActorErr
InvokeTimer(actorID, timerName string, params []byte) actorErr.ActorErr
InvokeActors(methodName string, request []byte) actorErr.ActorErr
KillAllActors() actorErr.ActorErr
}

type ActorManagerContext interface {
Expand Down Expand Up @@ -127,13 +129,22 @@ func (m *DefaultActorManagerContext) RegisterActorImplFactory(f actor.FactoryCon
}

// getAndCreateActorContainerIfNotExist will.
func (m *DefaultActorManagerContext) getAndCreateActorContainerIfNotExist(ctx context.Context, actorID string) (ActorContainerContext, actorErr.ActorErr) {
func (m *DefaultActorManagerContext) getAndCreateActorContainerIfNotExist(ctx context.Context, actorID, invokeName string) (ActorContainerContext, actorErr.ActorErr) {
val, ok := m.activeActors.Load(actorID)
if !ok {
newContainer, aerr := NewDefaultActorContainerContext(ctx, actorID, m.factory(), m.serializer)
if aerr != actorErr.Success {
return nil, aerr
}
err := newContainer.GetActor().Activate(invokeName)

Check failure on line 139 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

newContainer.GetActor().Activate undefined (type actor.ServerContext has no field or method Activate)

Check failure on line 139 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

newContainer.GetActor().Activate undefined (type actor.ServerContext has no field or method Activate)
if err != nil {
return nil, actorErr.ErrSaveStateFailed
}
// save state of this actor
err = newContainer.GetActor().SaveState()

Check failure on line 144 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

not enough arguments in call to newContainer.GetActor().SaveState

Check failure on line 144 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

not enough arguments in call to newContainer.GetActor().SaveState
if err != nil {
return nil, actorErr.ErrSaveStateFailed
}
m.activeActors.Store(actorID, newContainer)
val, _ = m.activeActors.Load(actorID)
}
Expand All @@ -146,7 +157,7 @@ func (m *DefaultActorManagerContext) InvokeMethod(ctx context.Context, actorID,
return nil, actorErr.ErrActorFactoryNotSet
}

actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID)
actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID, methodName)
if aerr != actorErr.Success {
return nil, aerr
}
Expand Down Expand Up @@ -187,6 +198,7 @@ func (m *DefaultActorManagerContext) DeactivateActor(_ context.Context, actorID
if !ok {
return actorErr.ErrActorIDNotFound
}
actor.(ActorContainer).Deactivate()

Check failure on line 201 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

use of package actor not in selector

Check failure on line 201 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

use of package actor not in selector
m.activeActors.Delete(actorID)
return actorErr.Success
}
Expand All @@ -201,7 +213,7 @@ func (m *DefaultActorManagerContext) InvokeReminder(ctx context.Context, actorID
log.Printf("failed to unmarshal reminder param, err: %v ", err)
return actorErr.ErrRemindersParamsInvalid
}
actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID)
actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID, reminderName)
if aerr != actorErr.Success {
return aerr
}
Expand All @@ -224,14 +236,55 @@ func (m *DefaultActorManagerContext) InvokeTimer(ctx context.Context, actorID, t
log.Printf("failed to unmarshal reminder param, err: %v ", err)
return actorErr.ErrTimerParamsInvalid
}
actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID)
actorContainer, aerr := m.getAndCreateActorContainerIfNotExist(ctx, actorID, timerName)
if aerr != actorErr.Success {
return aerr
}
_, aerr = actorContainer.Invoke(ctx, timerParams.CallBack, timerParams.Data)
return aerr
}

func (m *DefaultActorManager) InvokeActors(methodName string, request []byte) actorErr.ActorErr {
m.activeActors.Range(func(key, value interface{}) bool {

Check failure on line 248 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

m.activeActors undefined (type *DefaultActorManager has no field or method activeActors)

Check failure on line 248 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

m.activeActors undefined (type *DefaultActorManager has no field or method activeActors)
return func() bool {
go func() {
defer func() {
if err := recover(); err != nil {
log.Printf("InvokeActors recover, methodName:%s, request:%s", methodName, string(request))
}
}()
out, err := m.InvokeMethod(key.(string), methodName, request)
if err != actorErr.Success {
log.Printf("InvokeActors, methodName:%s, request:%s, out:%s, err:%v", methodName, string(request), string(out), err)
}
}()
return true
}()
})
return actorErr.Success
}

func (m *DefaultActorManager) KillAllActors() actorErr.ActorErr {
var actorIds []string
m.activeActors.Range(func(key, value interface{}) bool {

Check failure on line 269 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

m.activeActors undefined (type *DefaultActorManager has no field or method activeActors)

Check failure on line 269 in actor/manager/manager.go

View workflow job for this annotation

GitHub Actions / Test on 1.22

m.activeActors undefined (type *DefaultActorManager has no field or method activeActors)
return func() bool {
actorIds = append(actorIds, key.(string))
return true
}()
})
for _, actorId := range actorIds {
func() {
defer func() {
if err := recover(); err != nil {
log.Printf("KillAllActors recover, actorId:%s", actorId)
}
}()
m.DeactivateActor(actorId)
}()
}
return actorErr.Success
}

func getAbsctractMethodMap(rcvr interface{}) (map[string]*MethodType, error) {
s := &Service{}
s.reflectType = reflect.TypeOf(rcvr)
Expand Down
16 changes: 16 additions & 0 deletions actor/mock/mock_factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type ActorImpl struct {
actor.ServerImplBase
}

func (t *ActorImpl) Activate(invokeName string) error {
return nil
}

func (t *ActorImpl) Deactivate() error {
return nil
}

func (t *ActorImpl) Type() string {
return "testActorType"
}
Expand Down Expand Up @@ -70,6 +78,14 @@ type NotReminderCalleeActor struct {
actor.ServerImplBaseCtx
}

func (t *NotReminderCalleeActor) Activate() error {
return nil
}

func (t *NotReminderCalleeActor) Deactivate() error {
return nil
}

func (t *NotReminderCalleeActor) Type() string {
return "testActorNotReminderCalleeType"
}
8 changes: 8 additions & 0 deletions actor/mock/mock_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions actor/runtime/actor_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package runtime
import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/dapr/go-sdk/actor"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (r *ActorRunTimeContext) RegisterActorFactory(f actor.FactoryContext, opt .
conf := config.GetConfigFromOptions(opt...)
actType := f().Type()
r.config.RegisteredActorTypes = append(r.config.RegisteredActorTypes, actType)
r.config.ActorIdleTimeout = conf.ActorIdleTimeout
r.config.ActorScanInterval = conf.ActorScanInterval
r.config.DrainOngingCallTimeout = conf.DrainOngingCallTimeout
r.config.DrainBalancedActors = conf.DrainBalancedActors
mng, ok := r.actorManagers.Load(actType)
if !ok {
newMng, err := manager.NewDefaultActorManagerContext(conf.SerializerType)
Expand Down Expand Up @@ -154,3 +159,20 @@ func (r *ActorRunTime) InvokeReminder(actorTypeName, actorID, reminderName strin
func (r *ActorRunTime) InvokeTimer(actorTypeName, actorID, timerName string, params []byte) actorErr.ActorErr {
return r.ctx.InvokeTimer(context.Background(), actorTypeName, actorID, timerName, params)
}

func (r *ActorRunTime) InvokeActors(actorType, methodName string, request []byte) actorErr.ActorErr {
mng, ok := r.actorManagers.Load(actorType)
if !ok {
return actorErr.ErrActorTypeNotFound
}
return mng.(manager.ActorManager).InvokeActors(methodName, request)
}

func (r *ActorRunTime) KillAllActors(actorTypeName string) actorErr.ActorErr {
fmt.Println("KillAllActors:", actorTypeName)
targetManager, ok := r.actorManagers.Load(actorTypeName)
if !ok {
return actorErr.ErrActorTypeNotFound
}
return targetManager.(manager.ActorManager).KillAllActors()
}
6 changes: 5 additions & 1 deletion client/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

var (
ErrEmpty = errors.New("get configuration item result empty")
)

type ConfigurationItem struct {
Value string
Version string
Expand All @@ -29,7 +33,7 @@ func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key st
return nil, err
}
if len(items) == 0 {
return nil, nil
return nil, ErrEmpty
}

return items[key], nil
Expand Down
4 changes: 3 additions & 1 deletion service/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package common

import (
"encoding/json"
"net/http"
)

// TopicEvent is the content of the inbound topic message.
Expand Down Expand Up @@ -66,7 +67,8 @@ type InvocationEvent struct {
// Verb is the HTTP verb that was used to invoke this service.
Verb string `json:"-"`
// QueryString represents an encoded HTTP url query string in the following format: name=value&name2=value2
QueryString string `json:"-"`
QueryString string `json:"-"`
Request *http.Request `json:"-"`
}

// Content is a generic data content.
Expand Down
3 changes: 2 additions & 1 deletion service/http/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func (s *Server) AddHealthCheckHandler(route string, fn common.HealthCheckHandle
return
}

w.WriteHeader(http.StatusNoContent)
// fix check fail on k8s, http.StatusNoContent -> http.StatusOK
w.WriteHeader(http.StatusOK)
})))

return nil
Expand Down
1 change: 1 addition & 0 deletions service/http/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (s *Server) AddServiceInvocationHandler(route string, fn common.ServiceInvo
Verb: r.Method,
QueryString: r.URL.RawQuery,
ContentType: r.Header.Get("Content-type"),
Request: r,
}

var err error
Expand Down

0 comments on commit 3ab2048

Please sign in to comment.