Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add state comments #487

Merged
merged 3 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/grpc/dapr/dapr_api_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,28 +323,34 @@ func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr
}

func (d *daprGrpcAPI) getStateStore(name string) (state.Store, error) {
// check if the stateStores exists
if d.stateStores == nil || len(d.stateStores) == 0 {
return nil, status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured)
}

// check name
if d.stateStores[name] == nil {
return nil, status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, name)
}
return d.stateStores[name], nil
}

func StateItem2SetRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.SetRequest {
// Set the key for the request
req := &state.SetRequest{
Key: key,
}
// check if the grpcReq exists
if grpcReq == nil {
return req
}
// Assign the value of grpcReq property to req
req.Metadata = grpcReq.Metadata
req.Value = grpcReq.Value
// Check grpcReq.Etag
if grpcReq.Etag != nil {
req.ETag = &grpcReq.Etag.Value
}
// Check grpcReq.Options
if grpcReq.Options != nil {
req.Options = state.SetStateOption{
Consistency: StateConsistencyToString(grpcReq.Options.Consistency),
Expand All @@ -355,7 +361,9 @@ func StateItem2SetRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *stat
}

func GetResponse2GetStateResponse(compResp *state.GetResponse) *dapr_v1pb.GetStateResponse {
// Initialize an element of type GetStateResponse
resp := &dapr_v1pb.GetStateResponse{}
// check if the compResp exists
if compResp != nil {
resp.Etag = common.PointerToString(compResp.ETag)
resp.Data = compResp.Data
Expand All @@ -365,6 +373,7 @@ func GetResponse2GetStateResponse(compResp *state.GetResponse) *dapr_v1pb.GetSta
}

func StateConsistencyToString(c dapr_common_v1pb.StateOptions_StateConsistency) string {
// check
switch c {
case dapr_common_v1pb.StateOptions_CONSISTENCY_EVENTUAL:
return "eventual"
Expand All @@ -375,6 +384,7 @@ func StateConsistencyToString(c dapr_common_v1pb.StateOptions_StateConsistency)
}

func StateConcurrencyToString(c dapr_common_v1pb.StateOptions_StateConcurrency) string {
// check the StateOptions of StateOptions_StateConcurrency
switch c {
case dapr_common_v1pb.StateOptions_CONCURRENCY_FIRST_WRITE:
return "first-write"
Expand All @@ -391,6 +401,7 @@ func (d *daprGrpcAPI) wrapDaprComponentError(err error, format string, args ...i
if !ok {
return status.Errorf(codes.Internal, format, args...)
}
// check the Kind of error
switch e.Kind() {
case state.ETagMismatch:
return status.Errorf(codes.Aborted, format, args...)
Expand Down Expand Up @@ -425,6 +436,7 @@ func generateGetStateTask(store state.Store, req *state.GetRequest, resultCh cha
}
}

// converting from BulkGetResponse to BulkStateItem
func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *dapr_v1pb.BulkStateItem {
if compResp == nil {
return &dapr_v1pb.BulkStateItem{}
Expand All @@ -438,7 +450,9 @@ func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *dapr_v1pb.B
}
}

// converting from GetResponse to BulkStateItem
func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *dapr_v1pb.BulkStateItem {
// convert
resp := &dapr_v1pb.BulkStateItem{}
resp.Key = key
if compResp != nil {
Expand All @@ -449,7 +463,9 @@ func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *dapr_v1
return resp
}

// converting from DeleteStateRequest to DeleteRequest
func DeleteStateRequest2DeleteRequest(grpcReq *dapr_v1pb.DeleteStateRequest, key string) *state.DeleteRequest {
// convert
req := &state.DeleteRequest{
Key: key,
}
Expand All @@ -469,7 +485,9 @@ func DeleteStateRequest2DeleteRequest(grpcReq *dapr_v1pb.DeleteStateRequest, key
return req
}

// converting from StateItem to DeleteRequest
func StateItem2DeleteRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.DeleteRequest {
//convert
req := &state.DeleteRequest{
Key: key,
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/grpc/default_api/api_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ import (

// GetState obtains the state for a specific key.
func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) {
// Check if the StateRequest is exists
if in == nil {
return &runtimev1pb.GetStateResponse{}, status.Error(codes.InvalidArgument, "GetStateRequest is nil")
}
// convert request
daprReq := &dapr_v1pb.GetStateRequest{
StoreName: in.GetStoreName(),
Key: in.GetKey(),
Consistency: dapr_common_v1pb.StateOptions_StateConsistency(in.GetConsistency()),
Metadata: in.GetMetadata(),
}
// Generate response by request
resp, err := a.daprAPI.GetState(ctx, daprReq)
if err != nil {
return &runtimev1pb.GetStateResponse{}, err
Expand All @@ -50,6 +53,7 @@ func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*r
}

func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*emptypb.Empty, error) {
// Check if the request is nil
if in == nil {
return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "SaveStateRequest is nil")
}
Expand All @@ -67,12 +71,14 @@ func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequ
if in == nil {
return &runtimev1pb.GetBulkStateResponse{}, status.Error(codes.InvalidArgument, "GetBulkStateRequest is nil")
}
// convert request
daprReq := &dapr_v1pb.GetBulkStateRequest{
StoreName: in.GetStoreName(),
Keys: in.GetKeys(),
Parallelism: in.GetParallelism(),
Metadata: in.GetMetadata(),
}
// Generate response by request
resp, err := a.daprAPI.GetBulkState(ctx, daprReq)
if err != nil {
return &runtimev1pb.GetBulkStateResponse{}, err
Expand All @@ -94,6 +100,7 @@ func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateReques
if in == nil {
return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteStateRequest is nil")
}
// convert request
daprReq := &dapr_v1pb.DeleteStateRequest{
StoreName: in.GetStoreName(),
Key: in.GetKey(),
Expand All @@ -108,6 +115,7 @@ func (a *api) DeleteBulkState(ctx context.Context, in *runtimev1pb.DeleteBulkSta
if in == nil {
return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteBulkStateRequest is nil")
}
// convert request
daprReq := &dapr_v1pb.DeleteBulkStateRequest{
StoreName: in.GetStoreName(),
States: convertStatesToDaprPB(in.States),
Expand All @@ -119,6 +127,7 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
if in == nil {
return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "ExecuteStateTransactionRequest is nil")
}
// convert request
daprReq := &dapr_v1pb.ExecuteStateTransactionRequest{
StoreName: in.GetStoreName(),
Operations: convertTransactionalStateOperationToDaprPB(in.Operations),
Expand All @@ -135,7 +144,6 @@ func convertEtagToDaprPB(etag *runtimev1pb.Etag) *dapr_common_v1pb.Etag {
}
return &dapr_common_v1pb.Etag{Value: etag.GetValue()}
}

func convertOptionsToDaprPB(op *runtimev1pb.StateOptions) *dapr_common_v1pb.StateOptions {
if op == nil {
return &dapr_common_v1pb.StateOptions{}
Expand Down
14 changes: 13 additions & 1 deletion pkg/runtime/state/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,35 @@ type StoreConfiguration struct {
keyPrefixStrategy string
}

//Save StateConfiguration by storeName
func SaveStateConfiguration(storeName string, metadata map[string]string) error {
// convert
strategy := metadata[strategyKey]
// Change strategy to lowercase
strategy = strings.ToLower(strategy)
//if strategy is "",use default values("none")
if strategy == "" {
strategy = strategyDefault
} else {
// Check if the secret key is legitimate
err := checkKeyIllegal(metadata[strategyKey])
if err != nil {
return err
}
}

// convert
statesConfiguration[storeName] = &StoreConfiguration{keyPrefixStrategy: strategy}
return nil
}

func GetModifiedStateKey(key, storeName, appID string) (string, error) {
// Check if the secret key is legitimate
if err := checkKeyIllegal(key); err != nil {
return "", err
}
// Get stateConfiguration by storeName
stateConfiguration := getStateConfiguration(storeName)
// Determine the keyPrefixStrategy type
switch stateConfiguration.keyPrefixStrategy {
case strategyNone:
return key, nil
Expand All @@ -76,6 +84,7 @@ func GetModifiedStateKey(key, storeName, appID string) (string, error) {
}

func GetOriginalStateKey(modifiedStateKey string) string {
// Split modifiedStateKey by daprSeparator("||")
splits := strings.Split(modifiedStateKey, daprSeparator)
if len(splits) <= 1 {
return modifiedStateKey
Expand All @@ -84,7 +93,9 @@ func GetOriginalStateKey(modifiedStateKey string) string {
}

func getStateConfiguration(storeName string) *StoreConfiguration {
// Get statesConfiguration by storeName
c := statesConfiguration[storeName]
// If statesConfiguration is empty, strategyDefault("none") is provided
if c == nil {
c = &StoreConfiguration{keyPrefixStrategy: strategyDefault}
statesConfiguration[storeName] = c
Expand All @@ -94,6 +105,7 @@ func getStateConfiguration(storeName string) *StoreConfiguration {
}

func checkKeyIllegal(key string) error {
// Determine if the key contains daprSeparator
if strings.Contains(key, daprSeparator) {
return errors.Errorf("input key/keyPrefix '%s' can't contain '%s'", key, daprSeparator)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Factory struct {
FactoryMethod func() state.Store
}

// Create a new Factory type variable
func NewFactory(name string, f func() state.Store) *Factory {
return &Factory{
Name: name,
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/state/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type stateRegistry struct {
info *info.RuntimeInfo
}

// Create a new Registry type variable
func NewRegistry(info *info.RuntimeInfo) Registry {
info.AddService(ServiceName)
return &stateRegistry{
Expand All @@ -44,13 +45,15 @@ func NewRegistry(info *info.RuntimeInfo) Registry {
}
}

// Registration for multiple Factories
func (r *stateRegistry) Register(fs ...*Factory) {
for _, f := range fs {
r.stores[f.Name] = f.FactoryMethod
r.info.RegisterComponent(ServiceName, f.Name)
}
}

// Loading components for a registered Factory
func (r *stateRegistry) Create(name string) (state.Store, error) {
if f, ok := r.stores[name]; ok {
r.info.LoadComponent(ServiceName, name)
Expand Down