diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 9ba527d2..2927646d 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -1250,6 +1250,7 @@ type TemporalServerStartDevCommand struct { SqlitePragma []string DynamicConfigValue []string LogConfig bool + SearchAttribute []string } func NewTemporalServerStartDevCommand(cctx *CommandContext, parent *TemporalServerCommand) *TemporalServerStartDevCommand { @@ -1278,6 +1279,7 @@ func NewTemporalServerStartDevCommand(cctx *CommandContext, parent *TemporalServ s.Command.Flags().StringArrayVar(&s.SqlitePragma, "sqlite-pragma", nil, "Specify SQLite pragma statements in pragma=value format.") s.Command.Flags().StringArrayVar(&s.DynamicConfigValue, "dynamic-config-value", nil, "Dynamic config value, as KEY=JSON_VALUE (string values need quotes).") s.Command.Flags().BoolVar(&s.LogConfig, "log-config", false, "Log the server config being used to stderr.") + s.Command.Flags().StringArrayVar(&s.SearchAttribute, "search-attribute", nil, "Search attributes to register, in key=type format.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.server.go b/temporalcli/commands.server.go index 927bb6f7..7f7559cd 100644 --- a/temporalcli/commands.server.go +++ b/temporalcli/commands.server.go @@ -3,11 +3,20 @@ package temporalcli import ( "encoding/json" "fmt" + "strings" "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/devserver" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) +var defaultDynamicConfigValues = map[string]any{ + "system.forceSearchAttributesCacheRefreshOnRead": true, +} + func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) error { // Have to assume "localhost" is 127.0.0.1 for server to work (it expects IP) if t.Ip == "localhost" { @@ -87,6 +96,22 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) } } + // Apply set of default dynamic config values if not already present + for k, v := range defaultDynamicConfigValues { + if _, ok := opts.DynamicConfigValues[k]; !ok { + if opts.DynamicConfigValues == nil { + opts.DynamicConfigValues = map[string]any{} + } + opts.DynamicConfigValues[k] = v + } + } + + // Prepare search attributes for adding before starting server + searchAttrs, err := t.prepareSearchAttributes() + if err != nil { + return err + } + // If not using DB file, set persistent cluster ID if t.DbFilename == "" { opts.ClusterID = persistentClusterID() @@ -114,6 +139,11 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string) } defer s.Stop() + // Register search attributes + if err := t.registerSearchAttributes(cctx, searchAttrs, opts.Namespaces); err != nil { + return err + } + friendlyIP := t.Ip if friendlyIP == "127.0.0.1" { friendlyIP = "localhost" @@ -147,3 +177,57 @@ func persistentClusterID() string { _ = writeEnvConfigFile(file, map[string]map[string]string{"default": {"cluster-id": id}}) return id } + +func (t *TemporalServerStartDevCommand) prepareSearchAttributes() (map[string]enums.IndexedValueType, error) { + opts, err := stringKeysValues(t.SearchAttribute) + if err != nil { + return nil, fmt.Errorf("invalid search attributes: %w", err) + } + attrs := make(map[string]enums.IndexedValueType, len(opts)) + for k, v := range opts { + // Case-insensitive index type lookup + var valType enums.IndexedValueType + for valTypeName, valTypeOrd := range enums.IndexedValueType_shorthandValue { + if strings.EqualFold(v, valTypeName) { + valType = enums.IndexedValueType(valTypeOrd) + break + } + } + if valType == 0 { + return nil, fmt.Errorf("invalid search attribute value type %q", v) + } + attrs[k] = valType + } + return attrs, nil +} + +func (t *TemporalServerStartDevCommand) registerSearchAttributes( + cctx *CommandContext, + attrs map[string]enums.IndexedValueType, + namespaces []string, +) error { + if len(attrs) == 0 { + return nil + } + + conn, err := grpc.NewClient( + fmt.Sprintf("%v:%v", t.Ip, t.Port), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return fmt.Errorf("failed creating client to register search attributes: %w", err) + } + defer conn.Close() + client := operatorservice.NewOperatorServiceClient(conn) + // Call for each namespace + for _, ns := range namespaces { + _, err := client.AddSearchAttributes(cctx, &operatorservice.AddSearchAttributesRequest{ + Namespace: ns, + SearchAttributes: attrs, + }) + if err != nil { + return fmt.Errorf("failed registering search attributes: %w", err) + } + } + return nil +} diff --git a/temporalcli/commands.server_test.go b/temporalcli/commands.server_test.go index 72462c56..b32952e9 100644 --- a/temporalcli/commands.server_test.go +++ b/temporalcli/commands.server_test.go @@ -11,7 +11,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/temporalio/cli/temporalcli/devserver" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" ) // TODO(cretz): To test: @@ -122,6 +124,82 @@ func TestServer_StartDev_ConcurrentStarts(t *testing.T) { wg.Wait() } +func TestServer_StartDev_WithSearchAttributes(t *testing.T) { + h := NewCommandHarness(t) + defer h.Close() + + // Start in background, then wait for client to be able to connect + port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) + resCh := make(chan *CommandResult, 1) + go func() { + resCh <- h.Execute( + "server", "start-dev", + "-p", port, + "--headless", + "--search-attribute", "search-attr-1=Int", + "--search-attribute", "search-attr-2=kEyWoRdLiSt", + ) + }() + + // Try to connect for a bit while checking for error + var cl client.Client + h.EventuallyWithT(func(t *assert.CollectT) { + select { + case res := <-resCh: + if res.Err != nil { + panic(res.Err) + } + default: + } + var err error + cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) + if !assert.NoError(t, err) { + return + } + // Confirm search attributes are present + resp, err := cl.OperatorService().ListSearchAttributes( + context.Background(), &operatorservice.ListSearchAttributesRequest{Namespace: "default"}) + if !assert.NoError(t, err) { + return + } + assert.Contains(t, resp.CustomAttributes, "search-attr-1") + assert.Contains(t, resp.CustomAttributes, "search-attr-2") + + }, 3*time.Second, 200*time.Millisecond) + defer cl.Close() + + // Do a workflow start with the search attributes + run, err := cl.ExecuteWorkflow( + context.Background(), + client.StartWorkflowOptions{ + TaskQueue: "my-task-queue", + TypedSearchAttributes: temporal.NewSearchAttributes( + temporal.NewSearchAttributeKeyInt64("search-attr-1").ValueSet(123), + temporal.NewSearchAttributeKeyKeywordList("search-attr-2").ValueSet([]string{"foo", "bar"}), + ), + }, + "MyWorkflow", + ) + h.NoError(err) + h.NotEmpty(run.GetRunID()) + + // Check that they are there + desc, err := cl.DescribeWorkflowExecution(context.Background(), run.GetID(), "") + h.NoError(err) + sa := desc.WorkflowExecutionInfo.SearchAttributes.IndexedFields + h.JSONEq("123", string(sa["search-attr-1"].Data)) + h.JSONEq(`["foo","bar"]`, string(sa["search-attr-2"].Data)) + + // Send an interrupt by cancelling context + h.CancelContext() + select { + case <-time.After(20 * time.Second): + h.Fail("didn't cleanup after 20 seconds") + case res := <-resCh: + h.NoError(res.Err) + } +} + type testLogger struct { t *testing.T } diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index e858f04c..443ec9ed 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -602,6 +602,7 @@ To persist Workflows across runs, use: * `--sqlite-pragma` (string[]) - Specify SQLite pragma statements in pragma=value format. * `--dynamic-config-value` (string[]) - Dynamic config value, as KEY=JSON_VALUE (string values need quotes). * `--log-config` (bool) - Log the server config being used to stderr. +* `--search-attribute` (string[]) - Search attributes to register, in key=type format. ### temporal task-queue: Manage Task Queues.