diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index e39f8da9..c467d25e 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -8,19 +8,16 @@ module Temporal module Metadata ACTIVITY_TYPE = :activity WORKFLOW_TASK_TYPE = :workflow_task - WORKFLOW_TYPE = :workflow class << self include Concerns::Payloads - def generate(type, data, namespace = nil) + def generate(type, data, namespace) case type when ACTIVITY_TYPE activity_metadata_from(data, namespace) when WORKFLOW_TASK_TYPE workflow_task_metadata_from(data, namespace) - when WORKFLOW_TYPE - workflow_metadata_from(data) else raise InternalError, 'Unsupported metadata type' end @@ -62,15 +59,6 @@ def workflow_task_metadata_from(task, namespace) workflow_name: task.workflow_type.name ) end - - def workflow_metadata_from(event) - Metadata::Workflow.new( - name: event.workflow_type.name, - run_id: event.original_execution_run_id, - attempt: event.attempt, - headers: headers(event.header&.fields) - ) - end end end end diff --git a/lib/temporal/metadata/workflow.rb b/lib/temporal/metadata/workflow.rb index 86d14de4..f70c7f2f 100644 --- a/lib/temporal/metadata/workflow.rb +++ b/lib/temporal/metadata/workflow.rb @@ -3,9 +3,11 @@ module Temporal module Metadata class Workflow < Base - attr_reader :name, :run_id, :attempt, :headers + attr_reader :namespace, :id, :name, :run_id, :attempt, :headers - def initialize(name:, run_id:, attempt:, headers: {}) + def initialize(namespace:, id:, name:, run_id:, attempt:, headers: {}) + @namespace = namespace + @id = id @name = name @run_id = run_id @attempt = attempt @@ -20,6 +22,8 @@ def workflow? def to_h { + 'namespace' => namespace, + 'workflow_id' => id, 'workflow_name' => name, 'workflow_run_id' => run_id, 'attempt' => attempt diff --git a/lib/temporal/testing/temporal_override.rb b/lib/temporal/testing/temporal_override.rb index de61b591..d4c58310 100644 --- a/lib/temporal/testing/temporal_override.rb +++ b/lib/temporal/testing/temporal_override.rb @@ -89,7 +89,12 @@ def start_locally(workflow, schedule, *input, **args) execution_options = ExecutionOptions.new(workflow, options) metadata = Metadata::Workflow.new( - name: workflow_id, run_id: run_id, attempt: 1, headers: execution_options.headers + namespace: execution_options.namespace, + id: workflow_id, + name: execution_options.name, + run_id: run_id, + attempt: 1, + headers: execution_options.headers ) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, workflow.disabled_releases, metadata diff --git a/lib/temporal/testing/workflow_override.rb b/lib/temporal/testing/workflow_override.rb index a36e843e..eb9a9fd4 100644 --- a/lib/temporal/testing/workflow_override.rb +++ b/lib/temporal/testing/workflow_override.rb @@ -28,7 +28,11 @@ def execute_locally(*input) run_id = SecureRandom.uuid execution = WorkflowExecution.new metadata = Temporal::Metadata::Workflow.new( - name: workflow_id, run_id: run_id, attempt: 1 + namespace: nil, + id: workflow_id, + name: name, # Workflow class name + run_id: run_id, + attempt: 1 ) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, disabled_releases, metadata diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index c81703cb..313e6977 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -4,14 +4,16 @@ require 'temporal/workflow/state_manager' require 'temporal/workflow/context' require 'temporal/workflow/history/event_target' +require 'temporal/metadata' module Temporal class Workflow class Executor - def initialize(workflow_class, history, config) + def initialize(workflow_class, history, metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new @state_manager = StateManager.new(dispatcher) + @metadata = metadata @history = history @config = config end @@ -32,15 +34,29 @@ def run private - attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config + attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config - def execute_workflow(input, metadata) + def execute_workflow(input, workflow_started_event_attributes) + metadata = generate_workflow_metadata_from(workflow_started_event_attributes) context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) Fiber.new do workflow_class.execute_in_context(context, input) end.resume end + + # workflow_id and domain are confusingly not available on the WorkflowExecutionStartedEvent, + # so we have to fetch these from the DecisionTask's metadata + def generate_workflow_metadata_from(event_attributes) + Metadata::Workflow.new( + namespace: metadata.namespace, + id: metadata.workflow_id, + name: event_attributes.workflow_type.name, + run_id: event_attributes.original_execution_run_id, + attempt: event_attributes.attempt, + headers: event_attributes.header&.fields || {} + ) + end end end end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 693f1305..e88850fd 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -3,7 +3,6 @@ require 'temporal/workflow/command' require 'temporal/workflow/command_state_machine' require 'temporal/workflow/history/event_target' -require 'temporal/metadata' require 'temporal/concerns/payloads' require 'temporal/workflow/errors' @@ -106,7 +105,7 @@ def apply_event(event) History::EventTarget.workflow, 'started', from_payloads(event.attributes.input), - Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes) + event.attributes ) when 'WORKFLOW_EXECUTION_COMPLETED' diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index f2d0a3c2..1701e18d 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -32,7 +32,7 @@ def process history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance - executor = Workflow::Executor.new(workflow_class, history, config) + executor = Workflow::Executor.new(workflow_class, history, metadata, config) commands = middleware_chain.invoke(metadata) do executor.run diff --git a/spec/fabricators/workflow_metadata_fabricator.rb b/spec/fabricators/workflow_metadata_fabricator.rb index 5a609bb8..f5393765 100644 --- a/spec/fabricators/workflow_metadata_fabricator.rb +++ b/spec/fabricators/workflow_metadata_fabricator.rb @@ -1,6 +1,8 @@ require 'securerandom' Fabricator(:workflow_metadata, from: :open_struct) do + namespace 'test-namespace' + id { SecureRandom.uuid } name 'TestWorkflow' run_id { SecureRandom.uuid } attempt 1 diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index bb5159c7..0a582883 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -14,7 +14,7 @@ input: config.converter.to_payloads(input) ) end - let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task) } + let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task, namespace) } let(:activity_name) { 'TestActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } diff --git a/spec/unit/lib/temporal/metadata/workflow_spec.rb b/spec/unit/lib/temporal/metadata/workflow_spec.rb index 6bbe3af6..562c3fb8 100644 --- a/spec/unit/lib/temporal/metadata/workflow_spec.rb +++ b/spec/unit/lib/temporal/metadata/workflow_spec.rb @@ -6,6 +6,8 @@ let(:args) { Fabricate(:workflow_metadata) } it 'sets the attributes' do + expect(subject.namespace).to eq(args.namespace) + expect(subject.id).to eq(args.id) expect(subject.name).to eq(args.name) expect(subject.run_id).to eq(args.run_id) expect(subject.attempt).to eq(args.attempt) @@ -25,6 +27,9 @@ it 'returns a hash' do expect(subject.to_h).to eq({ + 'namespace' => subject.namespace, + 'workflow_id' => subject.id, + 'workflow_id' => subject.id, 'attempt' => subject.attempt, 'workflow_name' => subject.name, 'workflow_run_id' => subject.run_id diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index 18134e26..f4c86df6 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -46,28 +46,6 @@ end end - context 'with workflow type' do - let(:type) { described_class::WORKFLOW_TYPE } - let(:data) { Fabricate(:api_workflow_execution_started_event_attributes) } - let(:namespace) { nil } - - it 'generates metadata' do - expect(subject.run_id).to eq(data.original_execution_run_id) - expect(subject.attempt).to eq(data.attempt) - expect(subject.headers).to eq({}) - end - - context 'with headers' do - let(:data) do - Fabricate(:api_workflow_execution_started_event_attributes, headers: { 'Foo' => 'Bar' }) - end - - it 'assigns headers' do - expect(subject.headers).to eq('Foo' => 'Bar') - end - end - end - context 'with unknown type' do let(:type) { :unknown } let(:data) { nil } diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb index 29b4b2a3..70747841 100644 --- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb +++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb @@ -12,7 +12,7 @@ workflow_id, run_id, [], - Temporal::Metadata::Workflow.new(name: workflow_id, run_id: run_id, attempt: 1) + Temporal::Metadata::Workflow.new(namespace: 'ruby-samples', id: workflow_id, name: 'HelloWorldWorkflow', run_id: run_id, attempt: 1) ) end let(:async_token) do diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb new file mode 100644 index 00000000..a639bdca --- /dev/null +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -0,0 +1,78 @@ +require 'temporal/workflow/executor' +require 'temporal/workflow/history' +require 'temporal/workflow' + +describe Temporal::Workflow::Executor do + subject { described_class.new(workflow, history, workflow_metadata, config) } + + let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) } + let(:history) do + Temporal::Workflow::History.new([ + workflow_started_event, + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4) + ]) + end + let(:workflow) { TestWorkflow } + let(:workflow_metadata) { Fabricate(:workflow_metadata) } + let(:config) { Temporal::Configuration.new } + + class TestWorkflow < Temporal::Workflow + def execute + 'test' + end + end + + describe '#run' do + it 'runs a workflow' do + allow(workflow).to receive(:execute_in_context).and_call_original + + subject.run + + expect(workflow) + .to have_received(:execute_in_context) + .with( + an_instance_of(Temporal::Workflow::Context), + nil + ) + end + + it 'returns a complete workflow decision' do + decisions = subject.run + + expect(decisions.length).to eq(1) + + decision_id, decision = decisions.first + expect(decision_id).to eq(history.events.length + 1) + expect(decision).to be_an_instance_of(Temporal::Workflow::Command::CompleteWorkflow) + expect(decision.result).to eq('test') + end + + it 'generates workflow metadata' do + allow(Temporal::Metadata::Workflow).to receive(:new).and_call_original + payload = Temporal::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'xyz' }, + data: 'test'.b + ) + header = + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload, { 'Foo' => payload }) + workflow_started_event.workflow_execution_started_event_attributes.header = + Fabricate(:api_header, fields: header) + + subject.run + + event_attributes = workflow_started_event.workflow_execution_started_event_attributes + expect(Temporal::Metadata::Workflow) + .to have_received(:new) + .with( + namespace: workflow_metadata.namespace, + id: workflow_metadata.workflow_id, + name: event_attributes.workflow_type.name, + run_id: event_attributes.original_execution_run_id, + attempt: event_attributes.attempt, + headers: header + ) + end + end +end \ No newline at end of file