Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/temporal/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def generate_activity_metadata(task, namespace)
workflow_id: task.workflow_execution.workflow_id,
workflow_name: task.workflow_type.name,
headers: from_payload_map(task.header&.fields || {}),
heartbeat_details: from_details_payloads(task.heartbeat_details)
heartbeat_details: from_details_payloads(task.heartbeat_details),
scheduled_at: task.scheduled_time.to_time,
current_attempt_scheduled_at: task.current_attempt_scheduled_time.to_time
)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/temporal/metadata/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Temporal
module Metadata
class Activity < Base
attr_reader :namespace, :id, :name, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :headers, :heartbeat_details
attr_reader :namespace, :id, :name, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :headers, :heartbeat_details, :scheduled_at, :current_attempt_scheduled_at

def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, headers: {}, heartbeat_details:)
def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, headers: {}, heartbeat_details:, scheduled_at:, current_attempt_scheduled_at:)
@namespace = namespace
@id = id
@name = name
Expand All @@ -16,6 +16,8 @@ def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:,
@workflow_name = workflow_name
@headers = headers
@heartbeat_details = heartbeat_details
@scheduled_at = scheduled_at
@current_attempt_scheduled_at = current_attempt_scheduled_at

freeze
end
Expand All @@ -32,7 +34,9 @@ def to_h
'workflow_run_id' => workflow_run_id,
'activity_id' => id,
'activity_name' => name,
'attempt' => attempt
'attempt' => attempt,
'scheduled_at' => scheduled_at.to_s,
'current_attempt_scheduled_at' => current_attempt_scheduled_at.to_s,
}
end
end
Expand Down
8 changes: 6 additions & 2 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def execute_activity(activity_class, *input, **args)
workflow_id: workflow_id,
workflow_name: nil, # not yet used, but will be in the future
headers: execution_options.headers,
heartbeat_details: nil
heartbeat_details: nil,
scheduled_at: Time.now,
current_attempt_scheduled_at: Time.now,
)
context = LocalActivityContext.new(metadata)

Expand Down Expand Up @@ -108,7 +110,9 @@ def execute_local_activity(activity_class, *input, **args)
workflow_id: workflow_id,
workflow_name: nil, # not yet used, but will be in the future
headers: execution_options.headers,
heartbeat_details: nil
heartbeat_details: nil,
scheduled_at: Time.now,
current_attempt_scheduled_at: Time.now,
)
context = LocalActivityContext.new(metadata)

Expand Down
2 changes: 2 additions & 0 deletions spec/fabricators/activity_metadata_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
workflow_name 'TestWorkflow'
headers { {} }
heartbeat_details nil
scheduled_at { Time.now }
current_attempt_scheduled_at { Time.now }
end
2 changes: 2 additions & 0 deletions spec/fabricators/grpc/activity_task_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
workflow_execution { Fabricate(:api_workflow_execution) }
current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
header do |attrs|
fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h|
h[field] = Temporal.configuration.converter.to_payload(value)
Expand Down
6 changes: 5 additions & 1 deletion spec/unit/lib/temporal/metadata/activity_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
expect(subject.workflow_name).to eq(args.workflow_name)
expect(subject.headers).to eq(args.headers)
expect(subject.heartbeat_details).to eq(args.heartbeat_details)
expect(subject.scheduled_at).to eq(args.scheduled_at)
expect(subject.current_attempt_scheduled_at).to eq(args.current_attempt_scheduled_at)
end

it { is_expected.to be_frozen }
Expand All @@ -36,7 +38,9 @@
'namespace' => subject.namespace,
'workflow_id' => subject.workflow_id,
'workflow_name' => subject.workflow_name,
'workflow_run_id' => subject.workflow_run_id
'workflow_run_id' => subject.workflow_run_id,
'scheduled_at' => subject.scheduled_at.to_s,
'current_attempt_scheduled_at' => subject.current_attempt_scheduled_at.to_s,
})
end
end
Expand Down