Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samples for message passing docs #361

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions message-passing-intro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### Message Passing Intro


### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run message-passing-intro/worker/main.go
```
3) Run the following command to start the example
```
go run message-passing-intro/starter/main.go
```
113 changes: 113 additions & 0 deletions message-passing-intro/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"context"
"log"

message "github.com/temporalio/samples-go/message-passing-intro"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "message-passing-intro-workflow-ID",
TaskQueue: "message-passing-intro",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, message.GreetingWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

supportedLangResult, err := c.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguagesQuery, message.GetLanguagesInput{IncludeUnsupported: false})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
var supportedLang []message.Language
err = supportedLangResult.Get(&supportedLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Println("Supported languages:", supportedLang)

langResult, err := c.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguageQuery, message.GetLanguagesInput{})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
var currentLang message.Language
err = langResult.Get(&currentLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Println("Current language:", currentLang)

updateHandle, err := c.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateName: message.SetLanguageUpdate,
WaitForStage: client.WorkflowUpdateStageAccepted,
Args: []interface{}{message.Chinese},
})
if err != nil {
log.Fatalf("Unable to update workflow: %v", err)
}
var previousLang message.Language
err = updateHandle.Get(context.Background(), &previousLang)
if err != nil {
log.Fatalf("Unable to get update result: %v", err)
}

langResult, err = c.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguageQuery, message.GetLanguagesInput{})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
err = langResult.Get(&currentLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Printf("Language changed: %s -> %s", previousLang, currentLang)

updateHandle, err = c.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateName: message.SetLanguageUpdate,
WaitForStage: client.WorkflowUpdateStageAccepted,
Args: []interface{}{message.English},
})
if err != nil {
log.Fatalf("Unable to update workflow: %v", err)
}
err = updateHandle.Get(context.Background(), &previousLang)
if err != nil {
log.Fatalf("Unable to get update result: %v", err)
}

langResult, err = c.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguageQuery, message.GetLanguagesInput{})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
err = langResult.Get(&currentLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Printf("Language changed: %s -> %s", previousLang, currentLang)

err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.ApproveSignal, message.ApproveInput{Name: ""})
if err != nil {
log.Fatalf("Unable to signal workflow: %v", err)
}

var wfresult string
if err = we.Get(context.Background(), &wfresult); err != nil {
log.Fatalf("unable get workflow result: %v", err)
}
log.Println("workflow result:", wfresult)
}
27 changes: 27 additions & 0 deletions message-passing-intro/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"log"

message "github.com/temporalio/samples-go/message-passing-intro"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "message-passing-intro", worker.Options{})

w.RegisterWorkflow(message.GreetingWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
82 changes: 82 additions & 0 deletions message-passing-intro/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package update

import (
"fmt"

"go.temporal.io/sdk/workflow"
"golang.org/x/exp/maps"
)

type Language string

const Chinese Language = "chinese"
const English Language = "english"
const French Language = "french"
const Spanish Language = "spanish"
const Portuguese Language = "portuguese"

const GetLanguagesQuery = "get-languages"
const GetLanguageQuery = "get-language"
const SetLanguageUpdate = "set-language"
const ApproveSignal = "approve"

type ApproveInput struct {
Name string
}

type GetLanguagesInput struct {
IncludeUnsupported bool
}

func GreetingWorkflow(ctx workflow.Context) (string, error) {
logger := workflow.GetLogger(ctx)
approverName := ""
language := English
greeting := map[Language]string{English: "Hello", Chinese: "你好,世界"}
err := workflow.SetQueryHandler(ctx, GetLanguagesQuery, func(input GetLanguagesInput) ([]Language, error) {
// 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.IncludeUnsupported {
return []Language{Chinese, English, French, Spanish, Portuguese}, nil
} else {
// Range over map is a nondeterministic operation.
// It is OK to have a non-deterministic operation in a query function.
//workflowcheck:ignore
return maps.Keys(greeting), nil
}
})
if err != nil {
return "", err
}

err = workflow.SetQueryHandler(ctx, GetLanguageQuery, func(input GetLanguagesInput) (Language, error) {
return language, nil
})
if err != nil {
return "", err
}

err = workflow.SetUpdateHandlerWithOptions(ctx, SetLanguageUpdate, func(ctx workflow.Context, newLanguage Language) (Language, error) {
// 👉 An Update handler can mutate the Workflow state and return a value.
var previousLanguage Language
previousLanguage, language = language, newLanguage
return previousLanguage, nil
}, workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, newLanguage Language) error {
if _, ok := greeting[newLanguage]; !ok {
// 👉 In an Update validator you return any error to reject the Update.
return fmt.Errorf("%s unsupported language", newLanguage)
}
return nil
},
})
if err != nil {
return "", err
}
// Block until the language is approved
var approveInput ApproveInput
workflow.GetSignalChannel(ctx, ApproveSignal).Receive(ctx, &approveInput)
approverName = approveInput.Name
logger.Info("Received approval", "Approver", approverName)

return greeting[language], nil
}