Skip to content

Commit

Permalink
Add metadata query command
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 22, 2025
1 parent ced2fdc commit f908422
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 21 deletions.
18 changes: 14 additions & 4 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ func (v *NexusEndpointConfigOptions) buildFlags(cctx *CommandContext, f *pflag.F
f.StringVar(&v.TargetUrl, "target-url", "", "An external Nexus Endpoint that receives forwarded Nexus requests. May be used as an alternative to `--target-namespace` and `--target-task-queue`.")
}

type QueryModifiersOptions struct {
RejectCondition StringEnum
}

func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
v.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "")
f.Var(&v.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.")
}

type TemporalCommand struct {
Command cobra.Command
Env string
Expand Down Expand Up @@ -2916,6 +2925,7 @@ type TemporalWorkflowMetadataCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
QueryModifiersOptions
}

func NewTemporalWorkflowMetadataCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowMetadataCommand {
Expand All @@ -2931,6 +2941,7 @@ func NewTemporalWorkflowMetadataCommand(cctx *CommandContext, parent *TemporalWo
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.QueryModifiersOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand All @@ -2944,8 +2955,8 @@ type TemporalWorkflowQueryCommand struct {
Command cobra.Command
PayloadInputOptions
WorkflowReferenceOptions
Name string
RejectCondition StringEnum
QueryModifiersOptions
Name string
}

func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowQueryCommand {
Expand All @@ -2962,10 +2973,9 @@ func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.Name, "name", "", "Query Type/Name. Required. Aliased as \"--type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "")
s.Command.Flags().Var(&s.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.")
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.QueryModifiersOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"type": "name",
}))
Expand Down
43 changes: 33 additions & 10 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
deploymentpb "go.temporal.io/api/deployment/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/query/v1"
sdkpb "go.temporal.io/api/sdk/v1"
"go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
Expand All @@ -26,6 +27,8 @@ import (
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

const metadataQueryName = "__temporal_workflow_metadata"

func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
Expand Down Expand Up @@ -205,7 +208,7 @@ func (c *TemporalWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []

func (c *TemporalWorkflowMetadataCommand) run(cctx *CommandContext, _ []string) error {
return queryHelper(cctx, c.Parent, PayloadInputOptions{},
"__user_metadata", StringEnum{Value: ""}, c.WorkflowReferenceOptions)
metadataQueryName, c.RejectCondition, c.WorkflowReferenceOptions)
}

func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -642,14 +645,34 @@ func queryHelper(cctx *CommandContext,
return cctx.Printer.PrintStructured(result, printer.StructuredOptions{})
}

cctx.Printer.Println(color.MagentaString("Query result:"))
output := struct {
QueryResult json.RawMessage `cli:",cardOmitEmpty"`
}{}
output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult)
if err != nil {
return fmt.Errorf("failed to marshal query result: %w", err)
}
if queryType == metadataQueryName {
cctx.Printer.Println(color.MagentaString("Metadata:"))
var metadata sdkpb.WorkflowMetadata
UnmarshalProtoJSONWithOptions(result.QueryResult.Payloads[0].Data, &metadata, true)
output := struct {
WorkflowType string
QueryDefinitions []*sdkpb.WorkflowInteractionDefinition `cli:",cardOmitEmpty"`
SignalDefinitions []*sdkpb.WorkflowInteractionDefinition `cli:",cardOmitEmpty"`
UpdateDefinitions []*sdkpb.WorkflowInteractionDefinition `cli:",cardOmitEmpty"`
CurrentDetails string `cli:",cardOmitEmpty"`
}{
WorkflowType: metadata.GetDefinition().GetType(),
QueryDefinitions: metadata.GetDefinition().GetQueryDefinitions(),
SignalDefinitions: metadata.GetDefinition().GetSignalDefinitions(),
UpdateDefinitions: metadata.GetDefinition().GetUpdateDefinitions(),
CurrentDetails: metadata.GetCurrentDetails(),
}
return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
} else {
cctx.Printer.Println(color.MagentaString("Query result:"))
output := struct {
QueryResult json.RawMessage `cli:",cardOmitEmpty"`
}{}
output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult)
if err != nil {
return fmt.Errorf("failed to marshal query result: %w", err)
}

return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
}
}
89 changes: 89 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,3 +1148,92 @@ func (s *SharedServerSuite) testStackWorkflow(json bool) {
s.Error(res.Err)
s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed")
}

func (s *SharedServerSuite) TestWorkflow_MetadataJSON() {
s.testWorkflowMetadata(true)
}

func (s *SharedServerSuite) TestWorkflow_Metadata() {
s.testWorkflowMetadata(false)
}

func (s *SharedServerSuite) testWorkflowMetadata(json bool) {
// Make workflow wait for signal and then return it
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
done := false
workflow.Go(ctx, func(ctx workflow.Context) {
_ = workflow.Await(ctx, func() bool {
return done
})
})
workflow.SetQueryHandlerWithOptions(ctx, "my-query", func(arg string) (string, error) {
return "hi", nil
}, workflow.QueryHandlerOptions{Description: "q-desc"})
workflow.SetUpdateHandlerWithOptions(ctx, "my-update",
func(ctx workflow.Context, arg string) (string, error) { return "hi", nil },
workflow.UpdateHandlerOptions{Description: "upd-desc"})
workflow.SetCurrentDetails(ctx, "current-deets")
workflow.GetSignalChannelWithOptions(ctx, "my-signal",
workflow.SignalChannelOptions{Description: "sig-desc"}).Receive(ctx, nil)
done = true
return nil, nil

})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
StaticSummary: "summie",
StaticDetails: "deets",
},
DevWorkflow,
"ignored",
)
s.NoError(err)

args := []string{
"workflow", "metadata",
"--address", s.Address(),
"-w", run.GetID(),
}
if json {
args = append(args, "-o", "json")
}

res := s.Execute(args...)
s.NoError(res.Err)
if !json {
s.ContainsOnSameLine(res.Stdout.String(), "QueryDefinitions", "my-query", "q-desc")
s.ContainsOnSameLine(res.Stdout.String(), "SignalDefinitions", "my-signal", "sig-desc")
s.ContainsOnSameLine(res.Stdout.String(), "UpdateDefinitions", "my-update", "upd-desc")
s.ContainsOnSameLine(res.Stdout.String(), "CurrentDetails", "current-deets")
} else {
s.Contains(res.Stdout.String(), "queryDefinitions")
s.ContainsOnSameLine(res.Stdout.String(), "name", "my-query")
s.Contains(res.Stdout.String(), "signalDefinitions")
s.ContainsOnSameLine(res.Stdout.String(), "name", "my-signal")
s.Contains(res.Stdout.String(), "updateDefinitions")
s.ContainsOnSameLine(res.Stdout.String(), "name", "my-update")
s.ContainsOnSameLine(res.Stdout.String(), "currentDetails", "current-deets")
}

// Unblock the workflow with a signal
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

// Ensure query is rejected when using not open rejection condition
args = []string{
"workflow", "metadata",
"--address", s.Address(),
"-w", run.GetID(),
"--reject-condition", "not_open",
}
if json {
args = append(args, "-o", "json")
}
res = s.Execute(args...)
s.Error(res.Err)
s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed")
}
19 changes: 12 additions & 7 deletions temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2899,6 +2899,7 @@ commands:
```
option-sets:
- workflow-reference
- query-modifiers

- name: temporal workflow update-options
summary: Change Workflow Execution Options
Expand Down Expand Up @@ -2984,20 +2985,14 @@ commands:
option-sets:
- payload-input
- workflow-reference
- query-modifiers
options:
- name: name
type: string
description: Query Type/Name.
required: true
aliases:
- type
- name: reject-condition
type: string-enum
description: |
Optional flag for rejecting Queries based on Workflow state.
enum-values:
- not_open
- not_completed_cleanly

- name: temporal workflow reset
summary: Move Workflow Execution history point
Expand Down Expand Up @@ -3814,3 +3809,13 @@ option-sets:
An external Nexus Endpoint that receives forwarded Nexus requests.
May be used as an alternative to `--target-namespace` and
`--target-task-queue`.
- name: query-modifiers
options:
- name: reject-condition
type: string-enum
description: |
Optional flag for rejecting Queries based on Workflow state.
enum-values:
- not_open
- not_completed_cleanly

0 comments on commit f908422

Please sign in to comment.