From 666ac259fd675d248d150d124f9aece3dda741ae Mon Sep 17 00:00:00 2001 From: Gimme <2429727684@qq.com> Date: Tue, 19 Apr 2022 17:59:57 +0800 Subject: [PATCH] feature:add state anno --- pkg/grpc/dapr/dapr_api_state.go | 20 +++++++++++++++++++- pkg/grpc/default_api/api_state.go | 10 +++++++++- pkg/runtime/state/compatibility.go | 14 +++++++++++++- pkg/runtime/state/factory.go | 1 + pkg/runtime/state/registry.go | 3 +++ 5 files changed, 45 insertions(+), 3 deletions(-) diff --git a/pkg/grpc/dapr/dapr_api_state.go b/pkg/grpc/dapr/dapr_api_state.go index b288adc94f..56086f81fd 100644 --- a/pkg/grpc/dapr/dapr_api_state.go +++ b/pkg/grpc/dapr/dapr_api_state.go @@ -323,10 +323,11 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr } func (d *daprGrpcAPI) getStateStore(name string) (state.Store, error) { + // check if the stateStores exists if d.stateStores == nil || len(d.stateStores) == 0 { return nil, status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) } - + // check name if d.stateStores[name] == nil { return nil, status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, name) } @@ -334,17 +335,22 @@ func (d *daprGrpcAPI) getStateStore(name string) (state.Store, error) { } func StateItem2SetRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.SetRequest { + // Set the key for the request req := &state.SetRequest{ Key: key, } + // check if the grpcReq exists if grpcReq == nil { return req } + // Assign the value of grpcReq property to req req.Metadata = grpcReq.Metadata req.Value = grpcReq.Value + // Check grpcReq.Etag if grpcReq.Etag != nil { req.ETag = &grpcReq.Etag.Value } + // Check grpcReq.Options if grpcReq.Options != nil { req.Options = state.SetStateOption{ Consistency: StateConsistencyToString(grpcReq.Options.Consistency), @@ -355,7 +361,9 @@ func StateItem2SetRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *stat } func GetResponse2GetStateResponse(compResp *state.GetResponse) *dapr_v1pb.GetStateResponse { + // Initialize an element of type GetStateResponse resp := &dapr_v1pb.GetStateResponse{} + // check if the compResp exists if compResp != nil { resp.Etag = common.PointerToString(compResp.ETag) resp.Data = compResp.Data @@ -365,6 +373,7 @@ func GetResponse2GetStateResponse(compResp *state.GetResponse) *dapr_v1pb.GetSta } func StateConsistencyToString(c dapr_common_v1pb.StateOptions_StateConsistency) string { + // check switch c { case dapr_common_v1pb.StateOptions_CONSISTENCY_EVENTUAL: return "eventual" @@ -375,6 +384,7 @@ func StateConsistencyToString(c dapr_common_v1pb.StateOptions_StateConsistency) } func StateConcurrencyToString(c dapr_common_v1pb.StateOptions_StateConcurrency) string { + // check the StateOptions of StateOptions_StateConcurrency switch c { case dapr_common_v1pb.StateOptions_CONCURRENCY_FIRST_WRITE: return "first-write" @@ -391,6 +401,7 @@ func (d *daprGrpcAPI) wrapDaprComponentError(err error, format string, args ...i if !ok { return status.Errorf(codes.Internal, format, args...) } + // check the Kind of error switch e.Kind() { case state.ETagMismatch: return status.Errorf(codes.Aborted, format, args...) @@ -425,6 +436,7 @@ func generateGetStateTask(store state.Store, req *state.GetRequest, resultCh cha } } +// converting from BulkGetResponse to BulkStateItem func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *dapr_v1pb.BulkStateItem { if compResp == nil { return &dapr_v1pb.BulkStateItem{} @@ -438,7 +450,9 @@ func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *dapr_v1pb.B } } +// converting from GetResponse to BulkStateItem func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *dapr_v1pb.BulkStateItem { + // convert resp := &dapr_v1pb.BulkStateItem{} resp.Key = key if compResp != nil { @@ -449,7 +463,9 @@ func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *dapr_v1 return resp } +// converting from DeleteStateRequest to DeleteRequest func DeleteStateRequest2DeleteRequest(grpcReq *dapr_v1pb.DeleteStateRequest, key string) *state.DeleteRequest { + // convert req := &state.DeleteRequest{ Key: key, } @@ -469,7 +485,9 @@ func DeleteStateRequest2DeleteRequest(grpcReq *dapr_v1pb.DeleteStateRequest, key return req } +// converting from StateItem to DeleteRequest func StateItem2DeleteRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.DeleteRequest { + //convert req := &state.DeleteRequest{ Key: key, } diff --git a/pkg/grpc/default_api/api_state.go b/pkg/grpc/default_api/api_state.go index 6e5df4fa23..fa324b17e1 100644 --- a/pkg/grpc/default_api/api_state.go +++ b/pkg/grpc/default_api/api_state.go @@ -29,15 +29,18 @@ import ( // GetState obtains the state for a specific key. func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) { + // Check if the StateRequest is exists if in == nil { return &runtimev1pb.GetStateResponse{}, status.Error(codes.InvalidArgument, "GetStateRequest is nil") } + // convert request daprReq := &dapr_v1pb.GetStateRequest{ StoreName: in.GetStoreName(), Key: in.GetKey(), Consistency: dapr_common_v1pb.StateOptions_StateConsistency(in.GetConsistency()), Metadata: in.GetMetadata(), } + // Generate response by request resp, err := a.daprAPI.GetState(ctx, daprReq) if err != nil { return &runtimev1pb.GetStateResponse{}, err @@ -50,6 +53,7 @@ func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*r } func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*emptypb.Empty, error) { + // Check if the request is nil if in == nil { return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "SaveStateRequest is nil") } @@ -66,12 +70,14 @@ func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequ if in == nil { return &runtimev1pb.GetBulkStateResponse{}, status.Error(codes.InvalidArgument, "GetBulkStateRequest is nil") } + // convert request daprReq := &dapr_v1pb.GetBulkStateRequest{ StoreName: in.GetStoreName(), Keys: in.GetKeys(), Parallelism: in.GetParallelism(), Metadata: in.GetMetadata(), } + // Generate response by request resp, err := a.daprAPI.GetBulkState(ctx, daprReq) if err != nil { return &runtimev1pb.GetBulkStateResponse{}, err @@ -93,6 +99,7 @@ func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateReques if in == nil { return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteStateRequest is nil") } + // convert request daprReq := &dapr_v1pb.DeleteStateRequest{ StoreName: in.GetStoreName(), Key: in.GetKey(), @@ -107,6 +114,7 @@ func (a *api) DeleteBulkState(ctx context.Context, in *runtimev1pb.DeleteBulkSta if in == nil { return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteBulkStateRequest is nil") } + // convert request daprReq := &dapr_v1pb.DeleteBulkStateRequest{ StoreName: in.GetStoreName(), States: convertStatesToDaprPB(in.States), @@ -118,6 +126,7 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu if in == nil { return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "ExecuteStateTransactionRequest is nil") } + // convert request daprReq := &dapr_v1pb.ExecuteStateTransactionRequest{ StoreName: in.GetStoreName(), Operations: convertTransactionalStateOperationToDaprPB(in.Operations), @@ -134,7 +143,6 @@ func convertEtagToDaprPB(etag *runtimev1pb.Etag) *dapr_common_v1pb.Etag { } return &dapr_common_v1pb.Etag{Value: etag.GetValue()} } - func convertOptionsToDaprPB(op *runtimev1pb.StateOptions) *dapr_common_v1pb.StateOptions { if op == nil { return &dapr_common_v1pb.StateOptions{} diff --git a/pkg/runtime/state/compatibility.go b/pkg/runtime/state/compatibility.go index 39d20df939..3c84d2a3a7 100644 --- a/pkg/runtime/state/compatibility.go +++ b/pkg/runtime/state/compatibility.go @@ -39,27 +39,35 @@ type StoreConfiguration struct { keyPrefixStrategy string } +//Save StateConfiguration by storeName func SaveStateConfiguration(storeName string, metadata map[string]string) error { + // convert strategy := metadata[strategyKey] + // Change strategy to lowercase strategy = strings.ToLower(strategy) + //if strategy is "",use default values("none") if strategy == "" { strategy = strategyDefault } else { + // Check if the secret key is legitimate err := checkKeyIllegal(metadata[strategyKey]) if err != nil { return err } } - + // convert statesConfiguration[storeName] = &StoreConfiguration{keyPrefixStrategy: strategy} return nil } func GetModifiedStateKey(key, storeName, appID string) (string, error) { + // Check if the secret key is legitimate if err := checkKeyIllegal(key); err != nil { return "", err } + // Get stateConfiguration by storeName stateConfiguration := getStateConfiguration(storeName) + // Determine the keyPrefixStrategy type switch stateConfiguration.keyPrefixStrategy { case strategyNone: return key, nil @@ -76,6 +84,7 @@ func GetModifiedStateKey(key, storeName, appID string) (string, error) { } func GetOriginalStateKey(modifiedStateKey string) string { + // Split modifiedStateKey by daprSeparator("||") splits := strings.Split(modifiedStateKey, daprSeparator) if len(splits) <= 1 { return modifiedStateKey @@ -84,7 +93,9 @@ func GetOriginalStateKey(modifiedStateKey string) string { } func getStateConfiguration(storeName string) *StoreConfiguration { + // Get statesConfiguration by storeName c := statesConfiguration[storeName] + // If statesConfiguration is empty, strategyDefault("none") is provided if c == nil { c = &StoreConfiguration{keyPrefixStrategy: strategyDefault} statesConfiguration[storeName] = c @@ -94,6 +105,7 @@ func getStateConfiguration(storeName string) *StoreConfiguration { } func checkKeyIllegal(key string) error { + // Determine if the key contains daprSeparator if strings.Contains(key, daprSeparator) { return errors.Errorf("input key/keyPrefix '%s' can't contain '%s'", key, daprSeparator) } diff --git a/pkg/runtime/state/factory.go b/pkg/runtime/state/factory.go index 7592261da2..13cabd1144 100644 --- a/pkg/runtime/state/factory.go +++ b/pkg/runtime/state/factory.go @@ -25,6 +25,7 @@ type Factory struct { FactoryMethod func() state.Store } +// Create a new Factory type variable func NewFactory(name string, f func() state.Store) *Factory { return &Factory{ Name: name, diff --git a/pkg/runtime/state/registry.go b/pkg/runtime/state/registry.go index 9d7bc21809..a1190c978c 100644 --- a/pkg/runtime/state/registry.go +++ b/pkg/runtime/state/registry.go @@ -36,6 +36,7 @@ type stateRegistry struct { info *info.RuntimeInfo } +// Create a new Registry type variable func NewRegistry(info *info.RuntimeInfo) Registry { info.AddService(ServiceName) return &stateRegistry{ @@ -44,6 +45,7 @@ func NewRegistry(info *info.RuntimeInfo) Registry { } } +// Registration for multiple Factories func (r *stateRegistry) Register(fs ...*Factory) { for _, f := range fs { r.stores[f.Name] = f.FactoryMethod @@ -51,6 +53,7 @@ func (r *stateRegistry) Register(fs ...*Factory) { } } +// Loading components for a registered Factory func (r *stateRegistry) Create(name string) (state.Store, error) { if f, ok := r.stores[name]; ok { r.info.LoadComponent(ServiceName, name)