Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions lib/temporal/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ module Temporal
module Metadata
ACTIVITY_TYPE = :activity
WORKFLOW_TASK_TYPE = :workflow_task
WORKFLOW_TYPE = :workflow

class << self
include Concerns::Payloads

def generate(type, data, namespace = nil)
def generate(type, data, namespace)
case type
when ACTIVITY_TYPE
activity_metadata_from(data, namespace)
when WORKFLOW_TASK_TYPE
workflow_task_metadata_from(data, namespace)
when WORKFLOW_TYPE
workflow_metadata_from(data)
else
raise InternalError, 'Unsupported metadata type'
end
Expand Down Expand Up @@ -62,15 +59,6 @@ def workflow_task_metadata_from(task, namespace)
workflow_name: task.workflow_type.name
)
end

def workflow_metadata_from(event)
Metadata::Workflow.new(
name: event.workflow_type.name,
run_id: event.original_execution_run_id,
attempt: event.attempt,
headers: headers(event.header&.fields)
)
end
end
end
end
8 changes: 6 additions & 2 deletions lib/temporal/metadata/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
module Temporal
module Metadata
class Workflow < Base
attr_reader :name, :run_id, :attempt, :headers
attr_reader :namespace, :id, :name, :run_id, :attempt, :headers

def initialize(name:, run_id:, attempt:, headers: {})
def initialize(namespace:, id:, name:, run_id:, attempt:, headers: {})
@namespace = namespace
@id = id
@name = name
@run_id = run_id
@attempt = attempt
Expand All @@ -20,6 +22,8 @@ def workflow?

def to_h
{
'namespace' => namespace,
'workflow_id' => id,
'workflow_name' => name,
'workflow_run_id' => run_id,
'attempt' => attempt
Expand Down
7 changes: 6 additions & 1 deletion lib/temporal/testing/temporal_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ def start_locally(workflow, schedule, *input, **args)

execution_options = ExecutionOptions.new(workflow, options)
metadata = Metadata::Workflow.new(
name: workflow_id, run_id: run_id, attempt: 1, headers: execution_options.headers
namespace: execution_options.namespace,
id: workflow_id,
name: execution_options.name,
run_id: run_id,
attempt: 1,
headers: execution_options.headers
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, workflow.disabled_releases, metadata
Expand Down
6 changes: 5 additions & 1 deletion lib/temporal/testing/workflow_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ def execute_locally(*input)
run_id = SecureRandom.uuid
execution = WorkflowExecution.new
metadata = Temporal::Metadata::Workflow.new(
name: workflow_id, run_id: run_id, attempt: 1
namespace: nil,
id: workflow_id,
name: name, # Workflow class name
run_id: run_id,
attempt: 1
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, disabled_releases, metadata
Expand Down
22 changes: 19 additions & 3 deletions lib/temporal/workflow/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
require 'temporal/workflow/state_manager'
require 'temporal/workflow/context'
require 'temporal/workflow/history/event_target'
require 'temporal/metadata'

module Temporal
class Workflow
class Executor
def initialize(workflow_class, history, config)
def initialize(workflow_class, history, metadata, config)
@workflow_class = workflow_class
@dispatcher = Dispatcher.new
@state_manager = StateManager.new(dispatcher)
@metadata = metadata
@history = history
@config = config
end
Expand All @@ -32,15 +34,29 @@ def run

private

attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config
attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config

def execute_workflow(input, metadata)
def execute_workflow(input, workflow_started_event_attributes)
metadata = generate_workflow_metadata_from(workflow_started_event_attributes)
context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config)

Fiber.new do
workflow_class.execute_in_context(context, input)
end.resume
end

# workflow_id and domain are confusingly not available on the WorkflowExecutionStartedEvent,
# so we have to fetch these from the DecisionTask's metadata
def generate_workflow_metadata_from(event_attributes)
Metadata::Workflow.new(
namespace: metadata.namespace,
id: metadata.workflow_id,
name: event_attributes.workflow_type.name,
run_id: event_attributes.original_execution_run_id,
attempt: event_attributes.attempt,
headers: event_attributes.header&.fields || {}
)
end
end
end
end
3 changes: 1 addition & 2 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require 'temporal/workflow/command'
require 'temporal/workflow/command_state_machine'
require 'temporal/workflow/history/event_target'
require 'temporal/metadata'
require 'temporal/concerns/payloads'
require 'temporal/workflow/errors'

Expand Down Expand Up @@ -106,7 +105,7 @@ def apply_event(event)
History::EventTarget.workflow,
'started',
from_payloads(event.attributes.input),
Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes)
event.attributes
)

when 'WORKFLOW_EXECUTION_COMPLETED'
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def process

history = fetch_full_history
# TODO: For sticky workflows we need to cache the Executor instance
executor = Workflow::Executor.new(workflow_class, history, config)
executor = Workflow::Executor.new(workflow_class, history, metadata, config)

commands = middleware_chain.invoke(metadata) do
executor.run
Expand Down
2 changes: 2 additions & 0 deletions spec/fabricators/workflow_metadata_fabricator.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'securerandom'

Fabricator(:workflow_metadata, from: :open_struct) do
namespace 'test-namespace'
id { SecureRandom.uuid }
name 'TestWorkflow'
run_id { SecureRandom.uuid }
attempt 1
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
input: config.converter.to_payloads(input)
)
end
let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task) }
let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task, namespace) }
let(:activity_name) { 'TestActivity' }
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:middleware_chain) { Temporal::Middleware::Chain.new }
Expand Down
5 changes: 5 additions & 0 deletions spec/unit/lib/temporal/metadata/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
let(:args) { Fabricate(:workflow_metadata) }

it 'sets the attributes' do
expect(subject.namespace).to eq(args.namespace)
expect(subject.id).to eq(args.id)
expect(subject.name).to eq(args.name)
expect(subject.run_id).to eq(args.run_id)
expect(subject.attempt).to eq(args.attempt)
Expand All @@ -25,6 +27,9 @@

it 'returns a hash' do
expect(subject.to_h).to eq({
'namespace' => subject.namespace,
'workflow_id' => subject.id,
'workflow_id' => subject.id,
'attempt' => subject.attempt,
'workflow_name' => subject.name,
'workflow_run_id' => subject.run_id
Expand Down
22 changes: 0 additions & 22 deletions spec/unit/lib/temporal/metadata_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,6 @@
end
end

context 'with workflow type' do
let(:type) { described_class::WORKFLOW_TYPE }
let(:data) { Fabricate(:api_workflow_execution_started_event_attributes) }
let(:namespace) { nil }

it 'generates metadata' do
expect(subject.run_id).to eq(data.original_execution_run_id)
expect(subject.attempt).to eq(data.attempt)
expect(subject.headers).to eq({})
end

context 'with headers' do
let(:data) do
Fabricate(:api_workflow_execution_started_event_attributes, headers: { 'Foo' => 'Bar' })
end

it 'assigns headers' do
expect(subject.headers).to eq('Foo' => 'Bar')
end
end
end

context 'with unknown type' do
let(:type) { :unknown }
let(:data) { nil }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
workflow_id,
run_id,
[],
Temporal::Metadata::Workflow.new(name: workflow_id, run_id: run_id, attempt: 1)
Temporal::Metadata::Workflow.new(namespace: 'ruby-samples', id: workflow_id, name: 'HelloWorldWorkflow', run_id: run_id, attempt: 1)
)
end
let(:async_token) do
Expand Down
78 changes: 78 additions & 0 deletions spec/unit/lib/temporal/workflow/executor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
require 'temporal/workflow/executor'
require 'temporal/workflow/history'
require 'temporal/workflow'

describe Temporal::Workflow::Executor do
subject { described_class.new(workflow, history, workflow_metadata, config) }

let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) }
let(:history) do
Temporal::Workflow::History.new([
workflow_started_event,
Fabricate(:api_workflow_task_scheduled_event, event_id: 2),
Fabricate(:api_workflow_task_started_event, event_id: 3),
Fabricate(:api_workflow_task_completed_event, event_id: 4)
])
end
let(:workflow) { TestWorkflow }
let(:workflow_metadata) { Fabricate(:workflow_metadata) }
let(:config) { Temporal::Configuration.new }

class TestWorkflow < Temporal::Workflow
def execute
'test'
end
end

describe '#run' do
it 'runs a workflow' do
allow(workflow).to receive(:execute_in_context).and_call_original

subject.run

expect(workflow)
.to have_received(:execute_in_context)
.with(
an_instance_of(Temporal::Workflow::Context),
nil
)
end

it 'returns a complete workflow decision' do
decisions = subject.run

expect(decisions.length).to eq(1)

decision_id, decision = decisions.first
expect(decision_id).to eq(history.events.length + 1)
expect(decision).to be_an_instance_of(Temporal::Workflow::Command::CompleteWorkflow)
expect(decision.result).to eq('test')
end

it 'generates workflow metadata' do
allow(Temporal::Metadata::Workflow).to receive(:new).and_call_original
payload = Temporal::Api::Common::V1::Payload.new(
metadata: { 'encoding' => 'xyz' },
data: 'test'.b
)
header =
Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload, { 'Foo' => payload })
workflow_started_event.workflow_execution_started_event_attributes.header =
Fabricate(:api_header, fields: header)

subject.run

event_attributes = workflow_started_event.workflow_execution_started_event_attributes
expect(Temporal::Metadata::Workflow)
.to have_received(:new)
.with(
namespace: workflow_metadata.namespace,
id: workflow_metadata.workflow_id,
name: event_attributes.workflow_type.name,
run_id: event_attributes.original_execution_run_id,
attempt: event_attributes.attempt,
headers: header
)
end
end
end