diff --git a/examples/spec/integration/call_failing_activity_workflow_spec.rb b/examples/spec/integration/call_failing_activity_workflow_spec.rb index c39853a6..090dd312 100644 --- a/examples/spec/integration/call_failing_activity_workflow_spec.rb +++ b/examples/spec/integration/call_failing_activity_workflow_spec.rb @@ -1,11 +1,6 @@ require 'workflows/call_failing_activity_workflow' describe CallFailingActivityWorkflow, :integration do - - class TestDeserializer - include Temporal::Concerns::Payloads - end - it 'correctly re-raises an activity-thrown exception in the workflow' do workflow_id = SecureRandom.uuid expected_message = "a failure message" diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index 0a97f075..1b479377 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -3,10 +3,8 @@ require 'grpc/errors' describe 'Converter', :integration do - around(:each) do |example| - task_queue = Temporal.configuration.task_queue - - Temporal.configure do |config| + let(:config) do + Temporal.configuration.dup.tap do |config| config.task_queue = 'crypt' config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new( payload_codecs: [ @@ -14,61 +12,54 @@ ] ) end - - example.run - ensure - Temporal.configure do |config| - config.task_queue = task_queue - config.payload_codec = Temporal::Configuration::DEFAULT_PAYLOAD_CODEC - end end + let(:client) { Temporal::Client.new(config) } it 'can encrypt payloads' do - workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom') + workflow_id = SecureRandom.uuid + run_id = client.start_workflow(HelloWorldWorkflow, 'Tom', options: { workflow_id: workflow_id }) begin - wait_for_workflow_completion(workflow_id, run_id) + client.await_workflow_result(HelloWorldWorkflow, workflow_id: workflow_id, run_id: run_id) rescue GRPC::DeadlineExceeded raise "Encrypted-payload workflow didn't run. Make sure you run USE_ENCRYPTION=1 ./bin/worker and try again." end - result = fetch_history(workflow_id, run_id) + history = client.get_workflow_history(namespace: config.namespace, workflow_id: workflow_id, run_id: run_id) - events = result.history.events.group_by(&:event_type) + events = history.events.group_by(&:type) - events[:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED].map do |event| - input = event.workflow_execution_started_event_attributes.input + events['WORKFLOW_EXECUTION_STARTED'].map do |event| + input = event.attributes.input input.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED].map do |event| - input = event.activity_task_scheduled_event_attributes.input + events['ACTIVITY_TASK_SCHEDULED'].map do |event| + input = event.attributes.input input.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_ACTIVITY_TASK_COMPLETED].map do |event| - result = event.activity_task_completed_event_attributes.result + events['ACTIVITY_TASK_COMPLETED'].map do |event| + result = event.attributes.result result.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].map do |event| - result = event.workflow_execution_completed_event_attributes.result + events['WORKFLOW_EXECUTION_COMPLETED'].map do |event| + result = event.attributes.result result.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first - result = completion_event.workflow_execution_completed_event_attributes.result - - payload_codec = Temporal.configuration.payload_codec + completion_event = events['WORKFLOW_EXECUTION_COMPLETED'].first + result = completion_event.attributes.result - expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"') + expect(config.converter.from_payloads(result)&.first).to eq('Hello World, Tom') end end diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 35f4bbc6..82d9af70 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -2,7 +2,6 @@ require 'temporal/error_handler' require 'temporal/errors' require 'temporal/activity/context' -require 'temporal/concerns/payloads' require 'temporal/connection/retryer' require 'temporal/connection' require 'temporal/metric_keys' @@ -10,13 +9,12 @@ module Temporal class Activity class TaskProcessor - include Concerns::Payloads def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool) @task = task @task_queue = task_queue @namespace = namespace - @metadata = Metadata.generate_activity_metadata(task, namespace) + @metadata = Metadata.generate_activity_metadata(task, namespace, config.converter) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) @@ -38,7 +36,7 @@ def process end result = middleware_chain.invoke(metadata) do - activity_class.execute_in_context(context, from_payloads(task.input)) + activity_class.execute_in_context(context, config.converter.from_payloads(task.input)) end # Do not complete asynchronous activities, these should be completed manually diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index f1e6d3a7..f22bd203 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -249,7 +249,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' payloads = closed_event.attributes.result - return ResultConverter.from_result_payloads(payloads) + return config.converter.from_result_payloads(payloads) when 'WORKFLOW_EXECUTION_TIMED_OUT' raise Temporal::WorkflowTimedOut when 'WORKFLOW_EXECUTION_TERMINATED' @@ -257,7 +257,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam when 'WORKFLOW_EXECUTION_CANCELED' raise Temporal::WorkflowCanceled when 'WORKFLOW_EXECUTION_FAILED' - raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure) + raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, config.converter) when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' new_run_id = closed_event.attributes.new_execution_run_id # Throw to let the caller know they're not getting the result @@ -328,7 +328,7 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_ # for reference # @param details [String, Array, nil] optional details to be stored in history def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) - namespace ||= Temporal.configuration.namespace + namespace ||= config.namespace connection.terminate_workflow_execution( namespace: namespace, @@ -353,7 +353,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) run_id: run_id ) - Workflow::ExecutionInfo.generate_from(response.workflow_execution_info) + Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, config.converter) end # Manually complete an activity @@ -550,11 +550,6 @@ def connection @connection ||= Temporal::Connection.generate(config.for_connection) end - class ResultConverter - extend Concerns::Payloads - end - private_constant :ResultConverter - private attr_reader :config @@ -596,6 +591,5 @@ def validate_filter(filter, *allowed_filters) raise ArgumentError, 'Only one filter is allowed' if filter.size > 1 end - end end diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb deleted file mode 100644 index 5c771e21..00000000 --- a/lib/temporal/concerns/payloads.rb +++ /dev/null @@ -1,86 +0,0 @@ -module Temporal - module Concerns - module Payloads - def from_payloads(payloads) - payloads = payload_codec.decodes(payloads) - payload_converter.from_payloads(payloads) - end - - def from_payload(payload) - payload = payload_codec.decode(payload) - payload_converter.from_payload(payload) - end - - def from_payload_map_without_codec(payload_map) - payload_map.map { |key, value| [key, payload_converter.from_payload(value)] }.to_h - end - - def from_result_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_details_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_signal_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_query_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_payload_map(payload_map) - payload_map.map { |key, value| [key, from_payload(value)] }.to_h - end - - def to_payloads(data) - payloads = payload_converter.to_payloads(data) - payload_codec.encodes(payloads) - end - - def to_payload(data) - payload = payload_converter.to_payload(data) - payload_codec.encode(payload) - end - - def to_payload_map_without_codec(data) - # skips the payload_codec step because search attributes don't use this pipeline - data.transform_values do |value| - payload_converter.to_payload(value) - end - end - - def to_result_payloads(data) - to_payloads([data]) - end - - def to_details_payloads(data) - to_payloads([data]) - end - - def to_signal_payloads(data) - to_payloads([data]) - end - - def to_query_payloads(data) - to_payloads([data]) - end - - def to_payload_map(data) - data.transform_values(&method(:to_payload)) - end - - private - - def payload_converter - Temporal.configuration.converter - end - - def payload_codec - Temporal.configuration.payload_codec - end - end - end -end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 9deb4226..e2a9e899 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -9,14 +9,15 @@ require 'temporal/connection/converter/payload/proto_json' require 'temporal/connection/converter/composite' require 'temporal/connection/converter/codec/chain' +require 'temporal/converter_wrapper' module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :credentials, :identity, :converter, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) attr_reader :timeouts, :error_handlers, :capabilities - attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, + attr_accessor :connection_type, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, :payload_codec, :legacy_signals, :no_signals_in_first_task @@ -114,13 +115,23 @@ def timeouts=(new_timeouts) @timeouts = DEFAULT_TIMEOUTS.merge(new_timeouts) end + def converter + @converter_wrapper ||= ConverterWrapper.new(@converter, @payload_codec) + end + + def converter=(new_converter) + @converter = new_converter + @converter_wrapper = nil + end + def for_connection Connection.new( type: connection_type, host: host, port: port, credentials: credentials, - identity: identity || default_identity + identity: identity || default_identity, + converter: converter ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index b70bcbed..9c4de0e8 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -13,7 +13,7 @@ def self.generate(configuration) credentials = configuration.credentials identity = configuration.identity - connection_class.new(host, port, identity, credentials) + connection_class.new(host, port, identity, credentials, configuration.converter) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 06be6a6d..f519f12a 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -14,13 +14,10 @@ require 'temporal/connection/serializer/backfill' require 'temporal/connection/serializer/schedule' require 'temporal/connection/serializer/workflow_id_reuse_policy' -require 'temporal/concerns/payloads' module Temporal module Connection class GRPC - include Concerns::Payloads - HISTORY_EVENT_FILTER = { all: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, close: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT @@ -57,10 +54,11 @@ class GRPC CONNECTION_TIMEOUT_SECONDS = 60 - def initialize(host, port, identity, credentials, options = {}) + def initialize(host, port, identity, credentials, converter, options = {}) @url = "#{host}:#{port}" @identity = identity @credentials = credentials + @converter = converter @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -134,20 +132,20 @@ def start_workflow_execution( task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporalio::Api::Common::V1::Header.new( - fields: to_payload_map(headers || {}) + fields: converter.to_payload_map(headers || {}) ), cron_schedule: cron_schedule, memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(search_attributes || {}) ) ) @@ -229,8 +227,8 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_c namespace: namespace, identity: identity, task_token: task_token, - commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, - query_results: query_results.transform_values { |value| Serializer.serialize(value) }, + commands: Array(commands).map { |(_, command)| Serializer.serialize(command, converter) }, + query_results: query_results.transform_values { |value| Serializer.serialize(value, converter) }, binary_checksum: binary_checksum, sdk_metadata: if new_sdk_flags_used.any? Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( @@ -249,7 +247,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception:, bi identity: identity, task_token: task_token, cause: cause, - failure: Serializer::Failure.new(exception).to_proto, + failure: Serializer::Failure.new(exception, converter).to_proto, binary_checksum: binary_checksum ) client.respond_workflow_task_failed(request) @@ -277,7 +275,7 @@ def record_activity_task_heartbeat(namespace:, task_token:, details: nil) request = Temporalio::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.record_activity_task_heartbeat(request) @@ -292,7 +290,7 @@ def respond_activity_task_completed(namespace:, task_token:, result:) namespace: namespace, identity: identity, task_token: task_token, - result: to_result_payloads(result) + result: converter.to_result_payloads(result) ) client.respond_activity_task_completed(request) end @@ -304,7 +302,7 @@ def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id: workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - result: to_result_payloads(result) + result: converter.to_result_payloads(result) ) client.respond_activity_task_completed_by_id(request) end @@ -315,7 +313,7 @@ def respond_activity_task_failed(namespace:, task_token:, exception:) namespace: namespace, identity: identity, task_token: task_token, - failure: Serializer::Failure.new(exception, serialize_whole_error: serialize_whole_error).to_proto + failure: Serializer::Failure.new(exception, converter, serialize_whole_error: serialize_whole_error).to_proto ) client.respond_activity_task_failed(request) end @@ -327,7 +325,7 @@ def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, r workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - failure: Serializer::Failure.new(exception).to_proto + failure: Serializer::Failure.new(exception, converter).to_proto ) client.respond_activity_task_failed_by_id(request) end @@ -336,7 +334,7 @@ def respond_activity_task_canceled(namespace:, task_token:, details: nil) request = Temporalio::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.respond_activity_task_canceled(request) @@ -358,7 +356,7 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: run_id: run_id ), signal_name: signal, - input: to_signal_payloads(input), + input: converter.to_signal_payloads(input), identity: identity ) client.signal_workflow_execution(request) @@ -377,9 +375,9 @@ def signal_with_start_workflow_execution( search_attributes: nil ) proto_header_fields = if headers.nil? - to_payload_map({}) + converter.to_payload_map({}) elsif headers.instance_of?(Hash) - to_payload_map(headers) + converter.to_payload_map(headers) else # Preserve backward compatability for headers specified using proto objects warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' @@ -397,7 +395,7 @@ def signal_with_start_workflow_execution( task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, @@ -407,9 +405,9 @@ def signal_with_start_workflow_execution( ), cron_schedule: cron_schedule, signal_name: signal_name, - signal_input: to_signal_payloads(signal_input), + signal_input: converter.to_signal_payloads(signal_input), memo: Temporalio::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( indexed_fields: to_payload_map_without_codec(search_attributes || {}) @@ -456,7 +454,7 @@ def terminate_workflow_execution( run_id: run_id ), reason: reason, - details: to_details_payloads(details) + details: converter.to_details_payloads(details) ) client.terminate_workflow_execution(request) @@ -792,7 +790,7 @@ def pause_schedule(namespace:, schedule_id:, should_pause:, note: nil) private - attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :credentials, :converter, :options, :poll_mutex, :poll_request def client @client ||= Temporalio::Api::WorkflowService::V1::WorkflowService::Stub.new( diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 46070c66..b31c1005 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -33,9 +33,9 @@ module Serializer Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze - def self.serialize(object) + def self.serialize(object, converter) serializer = SERIALIZERS_MAP[object.class] - serializer.new(object).to_proto + serializer.new(object, converter).to_proto end end end diff --git a/lib/temporal/connection/serializer/base.rb b/lib/temporal/connection/serializer/base.rb index 9fcd49c5..79e8767a 100644 --- a/lib/temporal/connection/serializer/base.rb +++ b/lib/temporal/connection/serializer/base.rb @@ -6,8 +6,9 @@ module Temporal module Connection module Serializer class Base - def initialize(object) + def initialize(object, converter) @object = object + @converter = converter end def to_proto @@ -16,7 +17,7 @@ def to_proto private - attr_reader :object + attr_reader :object, :converter end end end diff --git a/lib/temporal/connection/serializer/complete_workflow.rb b/lib/temporal/connection/serializer/complete_workflow.rb index beb3b0ed..8eaa3ed4 100644 --- a/lib/temporal/connection/serializer/complete_workflow.rb +++ b/lib/temporal/connection/serializer/complete_workflow.rb @@ -1,18 +1,15 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class CompleteWorkflow < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, complete_workflow_execution_command_attributes: Temporalio::Api::Command::V1::CompleteWorkflowExecutionCommandAttributes.new( - result: to_result_payloads(object.result) + result: converter.to_result_payloads(object.result) ) ) end diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index 6573c8ec..989ff2a9 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ContinueAsNew < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, @@ -15,10 +12,10 @@ def to_proto Temporalio::Api::Command::V1::ContinueAsNewWorkflowExecutionCommandAttributes.new( workflow_type: Temporalio::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers), memo: serialize_memo(object.memo), search_attributes: serialize_search_attributes(object.search_attributes), @@ -31,19 +28,19 @@ def to_proto def serialize_headers(headers) return unless headers - Temporalio::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporalio::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) return unless memo - Temporalio::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) + Temporalio::Api::Common::V1::Memo.new(fields: converter.to_payload_map(memo)) end def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: converter.to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/fail_workflow.rb b/lib/temporal/connection/serializer/fail_workflow.rb index a6ef9ea0..2bedb688 100644 --- a/lib/temporal/connection/serializer/fail_workflow.rb +++ b/lib/temporal/connection/serializer/fail_workflow.rb @@ -10,7 +10,7 @@ def to_proto command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, fail_workflow_execution_command_attributes: Temporalio::Api::Command::V1::FailWorkflowExecutionCommandAttributes.new( - failure: Failure.new(object.exception).to_proto + failure: Failure.new(object.exception, converter).to_proto ) ) end diff --git a/lib/temporal/connection/serializer/failure.rb b/lib/temporal/connection/serializer/failure.rb index ddfeb2e3..383c9fa3 100644 --- a/lib/temporal/connection/serializer/failure.rb +++ b/lib/temporal/connection/serializer/failure.rb @@ -1,11 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class Failure < Base - include Concerns::Payloads def initialize(error, serialize_whole_error: false, max_bytes: 200_000) @serialize_whole_error = serialize_whole_error @@ -15,7 +13,7 @@ def initialize(error, serialize_whole_error: false, max_bytes: 200_000) def to_proto if @serialize_whole_error - details = to_details_payloads(object) + details = converter.to_details_payloads(object) if details.payloads.first.data.size > @max_bytes Temporal.logger.error( "Could not serialize exception because it's too large, so we are using a fallback that may not "\ @@ -25,10 +23,10 @@ def to_proto ) # Fallback to a more conservative serialization if the payload is too big to avoid # sending a huge amount of data to temporal and putting it in the history. - details = to_details_payloads(object.message) + details = converter.to_details_payloads(object.message) end else - details = to_details_payloads(object.message) + details = converter.to_details_payloads(object.message) end Temporalio::Api::Failure::V1::Failure.new( message: object.message, diff --git a/lib/temporal/connection/serializer/record_marker.rb b/lib/temporal/connection/serializer/record_marker.rb index b29040f3..99fddb8c 100644 --- a/lib/temporal/connection/serializer/record_marker.rb +++ b/lib/temporal/connection/serializer/record_marker.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class RecordMarker < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_RECORD_MARKER, @@ -14,7 +11,7 @@ def to_proto Temporalio::Api::Command::V1::RecordMarkerCommandAttributes.new( marker_name: object.name, details: { - 'data' => to_details_payloads(object.details) + 'data' => converter.to_details_payloads(object.details) } ) ) diff --git a/lib/temporal/connection/serializer/schedule_activity.rb b/lib/temporal/connection/serializer/schedule_activity.rb index 10b26570..f751b3ea 100644 --- a/lib/temporal/connection/serializer/schedule_activity.rb +++ b/lib/temporal/connection/serializer/schedule_activity.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ScheduleActivity < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, @@ -15,13 +12,13 @@ def to_proto Temporalio::Api::Command::V1::ScheduleActivityTaskCommandAttributes.new( activity_id: object.activity_id.to_s, activity_type: Temporalio::Api::Common::V1::ActivityType.new(name: object.activity_type), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), schedule_to_close_timeout: object.timeouts[:schedule_to_close], schedule_to_start_timeout: object.timeouts[:schedule_to_start], start_to_close_timeout: object.timeouts[:start_to_close], heartbeat_timeout: object.timeouts[:heartbeat], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers) ) ) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index 5cc640fd..ff229ddb 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class SignalExternalWorkflow < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, @@ -15,7 +12,7 @@ def to_proto namespace: object.namespace, execution: serialize_execution(object.execution), signal_name: object.signal_name, - input: to_signal_payloads(object.input), + input: converter.to_signal_payloads(object.input), control: "", # deprecated child_workflow_only: object.child_workflow_only ) diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index 90d08c79..47a1fb1c 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -1,13 +1,11 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' require 'temporal/connection/serializer/workflow_id_reuse_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class StartChildWorkflow < Base - include Concerns::Payloads PARENT_CLOSE_POLICY = { terminate: Temporalio::Api::Enums::V1::ParentClosePolicy::PARENT_CLOSE_POLICY_TERMINATE, @@ -24,11 +22,11 @@ def to_proto workflow_id: object.workflow_id.to_s, workflow_type: Temporalio::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporalio::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_execution_timeout: object.timeouts[:execution], workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, parent_close_policy: serialize_parent_close_policy(object.parent_close_policy), header: serialize_headers(object.headers), cron_schedule: object.cron_schedule, @@ -44,7 +42,7 @@ def to_proto def serialize_headers(headers) return unless headers - Temporalio::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporalio::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) @@ -66,7 +64,7 @@ def serialize_parent_close_policy(parent_close_policy) def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: converter.to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/upsert_search_attributes.rb b/lib/temporal/connection/serializer/upsert_search_attributes.rb index e8aa652c..b1b0395a 100644 --- a/lib/temporal/connection/serializer/upsert_search_attributes.rb +++ b/lib/temporal/connection/serializer/upsert_search_attributes.rb @@ -1,19 +1,16 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class UpsertSearchAttributes < Base - include Concerns::Payloads - def to_proto Temporalio::Api::Command::V1::Command.new( command_type: Temporalio::Api::Enums::V1::CommandType::COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES, upsert_workflow_search_attributes_command_attributes: Temporalio::Api::Command::V1::UpsertWorkflowSearchAttributesCommandAttributes.new( search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map_without_codec(object.search_attributes || {}) + indexed_fields: converter.to_payload_map_without_codec(object.search_attributes || {}) ), ) ) diff --git a/lib/temporal/converter_wrapper.rb b/lib/temporal/converter_wrapper.rb new file mode 100644 index 00000000..a8a10d91 --- /dev/null +++ b/lib/temporal/converter_wrapper.rb @@ -0,0 +1,79 @@ +# This class provides convenience methods for accessing the converter. it is backwards compatible +# with Temporal::Connection::Converter::Base interface, however it adds new methods specific to +# different conversion scenarios. + +module Temporal + class ConverterWrapper + def initialize(converter, payload_codec) + @converter = converter + @payload_codec = payload_codec + end + + def from_payloads(payloads) + payloads = payload_codec.decodes(payloads) + converter.from_payloads(payloads) + end + + def from_payload(payload) + payload = payload_codec.decode(payload) + converter.from_payload(payload) + end + + def from_payload_map_without_codec(payload_map) + payload_map.map { |key, value| [key, converter.from_payload(value)] }.to_h + end + + def from_result_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_details_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_signal_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_payload_map(payload_map) + payload_map.map { |key, value| [key, from_payload(value)] }.to_h + end + + def to_payloads(data) + payloads = converter.to_payloads(data) + payload_codec.encodes(payloads) + end + + def to_payload(data) + payload = converter.to_payload(data) + payload_codec.encode(payload) + end + + def to_payload_map_without_codec(data) + # skips the payload_codec step because search attributes don't use this pipeline + data.transform_values do |value| + converter.to_payload(value) + end + end + + def to_result_payloads(data) + to_payloads([data]) + end + + def to_details_payloads(data) + to_payloads([data]) + end + + def to_signal_payloads(data) + to_payloads([data]) + end + + def to_payload_map(data) + data.transform_values(&method(:to_payload)) + end + + private + + attr_reader :converter, :payload_codec + end +end diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index 7be46b31..e9f2d952 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -2,15 +2,12 @@ require 'temporal/metadata/activity' require 'temporal/metadata/workflow' require 'temporal/metadata/workflow_task' -require 'temporal/concerns/payloads' module Temporal module Metadata class << self - include Concerns::Payloads - - def generate_activity_metadata(task, namespace) + def generate_activity_metadata(task, namespace, converter) Metadata::Activity.new( namespace: namespace, id: task.activity_id, @@ -20,8 +17,8 @@ def generate_activity_metadata(task, namespace) workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, workflow_name: task.workflow_type.name, - headers: from_payload_map(task.header&.fields || {}), - heartbeat_details: from_details_payloads(task.heartbeat_details), + headers: converter.from_payload_map(task.header&.fields || {}), + heartbeat_details: converter.from_details_payloads(task.heartbeat_details), scheduled_at: task.scheduled_time.to_time, current_attempt_scheduled_at: task.current_attempt_scheduled_time.to_time, heartbeat_timeout: task.heartbeat_timeout.seconds @@ -44,7 +41,8 @@ def generate_workflow_task_metadata(task, namespace) # @param event [Temporal::Workflow::History::Event] Workflow started history event # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata - def generate_workflow_metadata(event, task_metadata) + # @param converter [Temporal::ConverterWrapper] + def generate_workflow_metadata(event, task_metadata, converter) Metadata::Workflow.new( name: event.attributes.workflow_type.name, id: task_metadata.workflow_id, @@ -54,9 +52,9 @@ def generate_workflow_metadata(event, task_metadata) attempt: event.attributes.attempt, namespace: task_metadata.namespace, task_queue: event.attributes.task_queue.name, - headers: from_payload_map(event.attributes.header&.fields || {}), + headers: converter.from_payload_map(event.attributes.header&.fields || {}), run_started_at: event.timestamp, - memo: from_payload_map(event.attributes.memo&.fields || {}), + memo: converter.from_payload_map(event.attributes.memo&.fields || {}), ) end end diff --git a/lib/temporal/workflow/errors.rb b/lib/temporal/workflow/errors.rb index 42157376..832c2ac3 100644 --- a/lib/temporal/workflow/errors.rb +++ b/lib/temporal/workflow/errors.rb @@ -3,11 +3,9 @@ module Temporal class Workflow class Errors - extend Concerns::Payloads - # Convert a failure returned from the server to an Error to raise to the client # failure: Temporalio::Api::Failure::V1::Failure - def self.generate_error(failure, default_exception_class = StandardError) + def self.generate_error(failure, converter, default_exception_class = StandardError) case failure.failure_info when :application_failure_info @@ -25,7 +23,7 @@ def self.generate_error(failure, default_exception_class = StandardError) end begin details = failure.application_failure_info.details - exception_or_message = from_details_payloads(details) + exception_or_message = converter.from_details_payloads(details) # v1 serialization only supports StandardErrors with a single "message" argument. # v2 serialization supports complex errors using our converters to serialize them. # enable v2 serialization in activities with Temporal.configuration.use_error_serialization_v2 @@ -59,7 +57,7 @@ def self.generate_error(failure, default_exception_class = StandardError) TimeoutError.new("Timeout type: #{failure.timeout_failure_info.timeout_type.to_s}") when :canceled_failure_info # TODO: Distinguish between different entity cancellations - StandardError.new(from_payloads(failure.canceled_failure_info.details)) + StandardError.new(converter.from_payloads(failure.canceled_failure_info.details)) else StandardError.new(failure.message) end diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index e3f70021..79c8648f 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -1,11 +1,9 @@ -require 'temporal/concerns/payloads' require 'temporal/workflow/status' module Temporal class Workflow class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, :search_attributes, keyword_init: true) - extend Concerns::Payloads STATUSES = [ Temporal::Workflow::Status::RUNNING, @@ -17,8 +15,8 @@ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, Temporal::Workflow::Status::TIMED_OUT ] - def self.generate_from(response) - search_attributes = response.search_attributes.nil? ? {} : from_payload_map_without_codec(response.search_attributes.indexed_fields) + def self.generate_from(response, converter) + search_attributes = response.search_attributes.nil? ? {} : converter.from_payload_map_without_codec(response.search_attributes.indexed_fields) new( workflow: response.type.name, workflow_id: response.execution.workflow_id, @@ -27,7 +25,7 @@ def self.generate_from(response) close_time: response.close_time&.to_time, status: Temporal::Workflow::Status::API_STATUS_MAP.fetch(response.status), history_length: response.history_length, - memo: from_payload_map(response.memo.fields), + memo: converter.from_payload_map(response.memo.fields), search_attributes: search_attributes ).freeze end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 5c8eaf9e..d45c5c04 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -71,7 +71,7 @@ def process_query(query) end def execute_workflow(input, workflow_started_event) - metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) + metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata, config.converter) context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry, track_stack_trace) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 2cf82159..17e2fad6 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -4,7 +4,6 @@ require 'temporal/workflow/command_state_machine' require 'temporal/workflow/history/event_target' require 'temporal/workflow/history/size' -require 'temporal/concerns/payloads' require 'temporal/workflow/errors' require 'temporal/workflow/sdk_flags' require 'temporal/workflow/signal' @@ -12,8 +11,6 @@ module Temporal class Workflow class StateManager - include Concerns::Payloads - SIDE_EFFECT_MARKER = 'SIDE_EFFECT'.freeze RELEASE_MARKER = 'RELEASE'.freeze @@ -24,6 +21,7 @@ class UnsupportedMarkerType < Temporal::InternalError; end def initialize(dispatcher, config) @dispatcher = dispatcher + @converter = config.converter @commands = [] @marker_ids = Set.new @releases = {} @@ -167,7 +165,7 @@ def history_size private - attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config + attr_reader :commands, :dispatcher, :converter, :command_tracker, :marker_ids, :side_effects, :releases, :config def use_signals_first(raw_events) # The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST @@ -257,7 +255,7 @@ def apply_event(event) dispatch( History::EventTarget.workflow, 'started', - from_payloads(event.attributes.input), + converter.from_payloads(event.attributes.input), event ) @@ -294,16 +292,15 @@ def apply_event(event) when 'ACTIVITY_TASK_COMPLETED' state_machine.complete - dispatch(history_target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(history_target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'ACTIVITY_TASK_FAILED' state_machine.fail - dispatch(history_target, 'failed', - Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) + dispatch(history_target, 'failed', generate_error(event.attributes.failure, ActivityException)) when 'ACTIVITY_TASK_TIMED_OUT' state_machine.time_out - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', generate_error(event.attributes.failure)) when 'ACTIVITY_TASK_CANCEL_REQUESTED' state_machine.requested @@ -317,7 +314,7 @@ def apply_event(event) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel dispatch(history_target, 'failed', - Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) + Temporal::ActivityCanceled.new(converter.from_details_payloads(event.attributes.details))) when 'TIMER_STARTED' state_machine.start @@ -354,13 +351,13 @@ def apply_event(event) when 'MARKER_RECORDED' state_machine.complete - handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data'])) + handle_marker(event.id, event.attributes.marker_name, converter.from_details_payloads(event.attributes.details['data'])) when 'WORKFLOW_EXECUTION_SIGNALED' # relies on Signal#== for matching in Dispatcher signal_target = Signal.new(event.attributes.signal_name) dispatch(signal_target, 'signaled', event.attributes.signal_name, - from_signal_payloads(event.attributes.input)) + converter.from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo @@ -385,15 +382,15 @@ def apply_event(event) when 'CHILD_WORKFLOW_EXECUTION_COMPLETED' state_machine.complete - dispatch(history_target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(history_target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_CANCELED' state_machine.cancel - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(history_target, 'failed', generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_TIMED_OUT' state_machine.time_out @@ -506,6 +503,10 @@ def track_release(release_name) schedule(Command::RecordMarker.new(name: RELEASE_MARKER, details: release_name)) end end + + def generate_error(failure, default_exception_class = StandardError) + Temporal::Workflow::Errors.generate_error(failure, converter, default_exception_class) + end end end end diff --git a/spec/config/temporal.rb b/spec/config/temporal.rb index 0d868ffe..5bb173c0 100644 --- a/spec/config/temporal.rb +++ b/spec/config/temporal.rb @@ -1,5 +1,10 @@ +require 'temporal/converter_wrapper' +require 'temporal/configuration' + +$converter = Temporal::ConverterWrapper.new(Temporal::Configuration::DEFAULT_CONVERTER) + RSpec.configure do |config| config.before(:each) do Temporal.configuration.error_handlers.clear end -end \ No newline at end of file +end diff --git a/spec/fabricators/grpc/activity_task_fabricator.rb b/spec/fabricators/grpc/activity_task_fabricator.rb index 6d2a531d..1154ff06 100644 --- a/spec/fabricators/grpc/activity_task_fabricator.rb +++ b/spec/fabricators/grpc/activity_task_fabricator.rb @@ -6,7 +6,7 @@ activity_id { SecureRandom.uuid } task_token { |attrs| attrs[:task_token] || SecureRandom.uuid } activity_type { Fabricate(:api_activity_type) } - input { Temporal.configuration.converter.to_payloads(nil) } + input { $converter.to_payloads(nil) } workflow_type { Fabricate(:api_workflow_type) } workflow_execution { Fabricate(:api_workflow_execution) } current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } @@ -14,10 +14,7 @@ scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } header do |attrs| - fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) - end - Temporalio::Api::Common::V1::Header.new(fields: fields) + Temporalio::Api::Common::V1::Header.new(fields: $converter.to_payload_map(attrs[:headers] || {})) end heartbeat_timeout { Google::Protobuf::Duration.new } end diff --git a/spec/fabricators/grpc/application_failure_fabricator.rb b/spec/fabricators/grpc/application_failure_fabricator.rb index 9d1396d8..800bf65a 100644 --- a/spec/fabricators/grpc/application_failure_fabricator.rb +++ b/spec/fabricators/grpc/application_failure_fabricator.rb @@ -1,7 +1,3 @@ -require 'temporal/concerns/payloads' -class TestDeserializer - include Temporal::Concerns::Payloads -end # Simulates Temporal::Connection::Serializer::Failure Fabricator(:api_application_failure, from: Temporalio::Api::Failure::V1::Failure) do transient :error_class, :backtrace @@ -10,7 +6,7 @@ class TestDeserializer application_failure_info do |attrs| Temporalio::Api::Failure::V1::ApplicationFailureInfo.new( type: attrs[:error_class], - details: TestDeserializer.new.to_details_payloads(attrs[:message]), + details: $converter.to_details_payloads(attrs[:message]), ) end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 4562d7ef..f2c861b2 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -1,13 +1,6 @@ require 'securerandom' -require 'temporal/concerns/payloads' -class TestSerializer - extend Temporal::Concerns::Payloads -end - -include Temporal::Concerns::Payloads - -Fabricator(:api_history_event, from: Temporalio::Api::History::V1::HistoryEvent) do +Fabricator(:api_history_event, from: Temporal::Api::History::V1::HistoryEvent) do event_id { 1 } event_time { Time.now } end @@ -17,9 +10,10 @@ class TestSerializer event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED } event_time { Time.now } workflow_execution_started_event_attributes do |attrs| - header_fields = to_payload_map(attrs[:headers] || {}) - header = Temporalio::Api::Common::V1::Header.new(fields: header_fields) - indexed_fields = attrs[:search_attributes] ? to_payload_map(attrs[:search_attributes]) : nil + header = Temporal::Api::Common::V1::Header.new( + fields: $converter.to_payload_map(attrs[:headers] || {}) + ) + indexed_fields = attrs[:search_attributes] ? $converter.to_payload_map(attrs[:search_attributes]) : nil Temporalio::Api::History::V1::WorkflowExecutionStartedEventAttributes.new( workflow_type: Fabricate(:api_workflow_type), @@ -142,7 +136,7 @@ class TestSerializer event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED } activity_task_canceled_event_attributes do |attrs| Temporalio::Api::History::V1::ActivityTaskCanceledEventAttributes.new( - details: TestSerializer.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), + details: $converter.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), scheduled_event_id: attrs[:event_id] - 2, started_event_id: nil, identity: 'test-worker@test-host' diff --git a/spec/fabricators/grpc/memo_fabricator.rb b/spec/fabricators/grpc/memo_fabricator.rb index 38f764f2..8d1ab215 100644 --- a/spec/fabricators/grpc/memo_fabricator.rb +++ b/spec/fabricators/grpc/memo_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:memo, from: Temporalio::Api::Common::V1::Memo) do fields do Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = $converter.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/payload_fabricator.rb b/spec/fabricators/grpc/payload_fabricator.rb index badd8f36..0580f4e6 100644 --- a/spec/fabricators/grpc/payload_fabricator.rb +++ b/spec/fabricators/grpc/payload_fabricator.rb @@ -1,3 +1,25 @@ +require 'temporal/connection/converter/payload/nil' + Fabricator(:api_payload, from: Temporalio::Api::Common::V1::Payload) do metadata { Google::Protobuf::Map.new(:string, :bytes) } end + +Fabricator(:api_payload_nil, from: :api_payload) do + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Nil::ENCODING + end + end +end + +Fabricator(:api_payload_bytes, from: :api_payload) do + transient :bytes + + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Bytes::ENCODING + end + end + + data { |attrs| attrs.fetch(:bytes, 'foobar') } +end diff --git a/spec/fabricators/grpc/payloads_fabricator.rb b/spec/fabricators/grpc/payloads_fabricator.rb new file mode 100644 index 00000000..d4c95514 --- /dev/null +++ b/spec/fabricators/grpc/payloads_fabricator.rb @@ -0,0 +1,9 @@ +Fabricator(:api_payloads, from: Temporal::Api::Common::V1::Payloads) do + transient :payloads_array + + payloads do |attrs| + Google::Protobuf::RepeatedField.new(:message, Temporal::Api::Common::V1::Payload).tap do |m| + m.concat(Array(attrs.fetch(:payloads_array, Fabricate(:api_payload)))) + end + end +end diff --git a/spec/fabricators/grpc/search_attributes_fabricator.rb b/spec/fabricators/grpc/search_attributes_fabricator.rb index 16a33675..78833c8e 100644 --- a/spec/fabricators/grpc/search_attributes_fabricator.rb +++ b/spec/fabricators/grpc/search_attributes_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:search_attributes, from: Temporalio::Api::Common::V1::SearchAttributes) do indexed_fields do Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = $converter.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb index 172bd7a5..743db4e7 100644 --- a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb +++ b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb @@ -11,9 +11,6 @@ attempt 1 task_queue { Fabricate(:api_task_queue) } header do |attrs| - fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) - end - Temporalio::Api::Common::V1::Header.new(fields: fields) + Temporalio::Api::Common::V1::Header.new(fields: $converter.to_payload_map(attrs[:headers] || {})) end end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index e4ccdb2a..6999ba60 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -17,7 +17,7 @@ input: config.converter.to_payloads(input) ) end - let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) } + let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace, config.converter) } let(:workflow_name) { task.workflow_type.name } let(:activity_name) { 'TestActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -40,7 +40,7 @@ .and_return(connection) allow(Temporal::Metadata) .to receive(:generate_activity_metadata) - .with(task, namespace) + .with(task, namespace, config.converter) .and_return(metadata) allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config, heartbeat_thread_pool).and_return(context) diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index 1dd4995d..a292af7c 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -480,11 +480,7 @@ class NamespacedWorkflow < Temporal::Workflow ['string', 'a result'], ].each do |(type, expected_result)| it "completes and returns a #{type}" do - payload = Temporalio::Api::Common::V1::Payloads.new( - payloads: [ - Temporal.configuration.converter.to_payload(expected_result) - ], - ) + payload = $converter.to_result_payloads(expected_result) completed_event = Fabricate(:workflow_completed_event, result: payload) response = Fabricate(:workflow_execution_history, events: [completed_event]) expect(connection) diff --git a/spec/unit/lib/temporal/configuration_spec.rb b/spec/unit/lib/temporal/configuration_spec.rb index c1024e34..20e6f367 100644 --- a/spec/unit/lib/temporal/configuration_spec.rb +++ b/spec/unit/lib/temporal/configuration_spec.rb @@ -62,4 +62,30 @@ def inject!(_); end expect(subject.for_connection).to have_attributes(identity: new_identity) end end + + describe '#converter' do + it 'wraps the provided converter' do + converter_wrapper = subject.converter + + expect(converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + end + end + + describe '#converter=' do + let(:converter) { instance_double(Temporal::Connection::Converter::Composite) } + + it 'resets the wrapper when converter has changed' do + old_converter_wrapper = subject.converter + + expect(old_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(old_converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + + subject.converter = converter + new_converter_wrapper = subject.converter + + expect(new_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(new_converter_wrapper.send(:converter)).to eq(converter) + end + end end \ No newline at end of file diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index 046b066c..474e5bf1 100644 --- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -19,7 +19,7 @@ search_attributes: {'foo-search-attribute': 'qux'}, ) - result = described_class.new(command).to_proto + result = described_class.new(command, config.converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Command::V1::Command) expect(result.command_type).to eql( diff --git a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb index 4242554e..b8b72dab 100644 --- a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb @@ -6,9 +6,11 @@ class TestDeserializer end describe Temporal::Connection::Serializer::Failure do + let(:config) { Temporal::Configuration.new } + describe 'to_proto' do it 'produces a protobuf' do - result = described_class.new(StandardError.new('test')).to_proto + result = described_class.new(StandardError.new('test'), config.converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Failure::V1::Failure) end diff --git a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb index 211f807f..a4ee8ff0 100644 --- a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb @@ -2,6 +2,8 @@ require 'temporal/connection/serializer/retry_policy' describe Temporal::Connection::Serializer::RetryPolicy do + let(:config) { Temporal::Configuration.new } + describe 'to_proto' do let(:example_policy) do Temporal::RetryPolicy.new( @@ -14,7 +16,7 @@ end it 'converts to proto' do - proto = described_class.new(example_policy).to_proto + proto = described_class.new(example_policy, config.converter).to_proto expect(proto.initial_interval.seconds).to eq(1) expect(proto.backoff_coefficient).to eq(1.5) expect(proto.maximum_interval.seconds).to eq(5) diff --git a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb index 2e72951c..37d4b755 100644 --- a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb @@ -3,6 +3,7 @@ require 'temporal/connection/serializer/start_child_workflow' describe Temporal::Connection::Serializer::StartChildWorkflow do + let(:config) { Temporal::Configuration.new } let(:example_command) do Temporal::Workflow::Command::StartChildWorkflow.new( workflow_id: SecureRandom.uuid, @@ -24,7 +25,7 @@ command.parent_close_policy = :invalid expect do - described_class.new(command).to_proto + described_class.new(command, config.converter).to_proto end.to raise_error(Temporal::Connection::ArgumentError) do |e| expect(e.message).to eq("Unknown parent_close_policy '#{command.parent_close_policy}' specified") end @@ -40,7 +41,7 @@ command = example_command command.parent_close_policy = policy_name - result = described_class.new(command).to_proto + result = described_class.new(command, config.converter).to_proto attribs = result.start_child_workflow_execution_command_attributes expect(attribs.parent_close_policy).to eq(expected_parent_close_policy) end diff --git a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb index bc94128f..58d59a28 100644 --- a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb @@ -3,11 +3,9 @@ require 'temporal/connection/serializer/upsert_search_attributes' require 'temporal/workflow/command' -class TestDeserializer - extend Temporal::Concerns::Payloads -end - describe Temporal::Connection::Serializer::UpsertSearchAttributes do + let(:config) { Temporal::Configuration.new } + it 'produces a protobuf that round-trips' do expected_attributes = { 'CustomStringField' => 'moo', @@ -22,14 +20,14 @@ class TestDeserializer search_attributes: expected_attributes ) - result = described_class.new(command).to_proto + result = described_class.new(command config.converter).to_proto expect(result).to be_an_instance_of(Temporalio::Api::Command::V1::Command) expect(result.command_type).to eql( :COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES ) command_attributes = result.upsert_workflow_search_attributes_command_attributes expect(command_attributes).not_to be_nil - actual_attributes = TestDeserializer.from_payload_map_without_codec(command_attributes&.search_attributes&.indexed_fields) + actual_attributes = config.converter.from_payload_map_without_codec(command_attributes&.search_attributes&.indexed_fields) expect(actual_attributes).to eql(expected_attributes) end diff --git a/spec/unit/lib/temporal/converter_wrapper_spec.rb b/spec/unit/lib/temporal/converter_wrapper_spec.rb new file mode 100644 index 00000000..24cdf55f --- /dev/null +++ b/spec/unit/lib/temporal/converter_wrapper_spec.rb @@ -0,0 +1,105 @@ +require 'temporal/converter_wrapper' +require 'temporal/connection/converter/payload/bytes' +require 'temporal/connection/converter/payload/nil' +require 'temporal/connection/converter/composite' + +describe Temporal::ConverterWrapper do + subject { described_class.new(converter) } + let(:converter) do + Temporal::Connection::Converter::Composite.new(payload_converters: [ + Temporal::Connection::Converter::Payload::Bytes.new, + Temporal::Connection::Converter::Payload::Nil.new + ]) + end + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes, payload_nil]) } + let(:payload_bytes) { Fabricate(:api_payload_bytes, bytes: 'test-payload') } + let(:payload_nil) { Fabricate(:api_payload_nil) } + + describe '#from_payloads' do + it 'converts' do + expect(subject.from_payloads(payloads)).to eq(['test-payload', nil]) + end + end + + describe '#from_payload' do + it 'converts' do + expect(subject.from_payload(payload_bytes)).to eq('test-payload') + end + end + + describe '#from_result_payloads' do + it 'converts' do + expect(subject.from_result_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_details_payloads' do + it 'converts first payload' do + expect(subject.from_details_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_signal_payloads' do + it 'converts first payload' do + expect(subject.from_signal_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_payload_map' do + let(:payload_map) do + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m| + m['first'] = payload_bytes + m['second'] = payload_nil + end + end + + it 'converts first payload' do + expect(subject.from_payload_map(payload_map)) + .to eq('first' => 'test-payload', 'second' => nil) + end + end + + describe '#to_payloads' do + it 'converts' do + expect(subject.to_payloads(['test-payload'.b, nil])).to eq(payloads) + end + end + + describe '#to_payload' do + it 'converts' do + expect(subject.to_payload('test-payload'.b)).to eq(payload_bytes) + end + end + + describe '#to_result_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_result_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_details_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_details_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_signal_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_signal_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_payload_map' do + let(:payload_map) { { first: payload_bytes, second: payload_nil } } + + it 'converts' do + expect(subject.to_payload_map(first: 'test-payload'.b, second: nil)).to eq(payload_map) + end + end +end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index ee3c1fcb..e0a26942 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -4,6 +4,7 @@ describe Temporal::Connection::GRPC do let(:identity) { 'my-identity' } let(:binary_checksum) { 'v1.0.0' } + let(:config) { Temporal::Configuration.new } let(:grpc_stub) { double('grpc stub') } let(:grpc_operator_stub) { double('grpc stub') } let(:namespace) { 'test-namespace' } @@ -11,7 +12,7 @@ let(:run_id) { SecureRandom.uuid } let(:now) { Time.now} - subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure) } + subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, config.converter) } class TestDeserializer extend Temporal::Concerns::Payloads diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index cd21fb76..a77ffca6 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -1,8 +1,10 @@ require 'temporal/metadata' describe Temporal::Metadata do + let(:config) { Temporal::Configuration.new } + describe '.generate_activity_metadata' do - subject { described_class.generate_activity_metadata(data, namespace) } + subject { described_class.generate_activity_metadata(data, namespace, config.converter) } let(:data) { Fabricate(:api_activity_task) } let(:namespace) { 'test-namespace' } @@ -46,7 +48,7 @@ end context '.generate_workflow_metadata' do - subject { described_class.generate_workflow_metadata(event, task_metadata) } + subject { described_class.generate_workflow_metadata(event, task_metadata, config.converter) } let(:event) { Temporal::Workflow::History::Event.new(Fabricate(:api_workflow_execution_started_event)) } let(:task_metadata) { Fabricate(:workflow_task_metadata) } let(:namespace) { nil } diff --git a/spec/unit/lib/temporal/workflow/errors_spec.rb b/spec/unit/lib/temporal/workflow/errors_spec.rb index 53d86b68..5fa8fda4 100644 --- a/spec/unit/lib/temporal/workflow/errors_spec.rb +++ b/spec/unit/lib/temporal/workflow/errors_spec.rb @@ -27,6 +27,8 @@ def initialize(foo, bar) end describe Temporal::Workflow::Errors do + let(:config) { Temporal::Configuration.new } + describe '.generate_error' do it "instantiates properly when the client has the error" do message = "An error message" @@ -38,7 +40,7 @@ def initialize(foo, bar) error_class: SomeError.to_s ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(SomeError) expect(e.message).to eq(message) expect(e.backtrace).to eq(stack_trace) @@ -68,7 +70,7 @@ def initialize(foo, bar) error_class: 'NonexistentError', ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("NonexistentError: An error message") expect(e.backtrace).to eq(stack_trace) @@ -94,7 +96,7 @@ def initialize(foo, bar) error_class: ErrorWithTwoArgs.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorWithTwoArgs: An error message") expect(e.backtrace).to eq(stack_trace) @@ -127,7 +129,7 @@ def initialize(foo, bar) error_class: ErrorThatRaisesInInitialize.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorThatRaisesInInitialize: An error message") expect(e.backtrace).to eq(stack_trace) diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb index ad3368f2..d49d52c4 100644 --- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb +++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb @@ -1,7 +1,8 @@ require 'temporal/workflow/execution_info' describe Temporal::Workflow::ExecutionInfo do - subject { described_class.generate_from(api_info) } + subject { described_class.generate_from(api_info, config.converter) } + let(:config) { Temporal::Configuration.new } let(:api_info) { Fabricate(:api_workflow_execution_info, workflow: 'TestWorkflow', workflow_id: '') } describe '.generate_for' do @@ -25,7 +26,7 @@ it 'deserializes if search_attributes is nil' do api_info.search_attributes = nil - result = described_class.generate_from(api_info) + result = described_class.generate_from(api_info, config.converter) expect(result.search_attributes).to eq({}) end end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index bad27d22..bac9404e 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -7,6 +7,7 @@ require 'temporal/errors' describe Temporal::Workflow::StateManager do + let(:config) { Temporal::Configuration.new } describe '#schedule' do class MyWorkflow < Temporal::Workflow; end @@ -24,7 +25,7 @@ class MyWorkflow < Temporal::Workflow; end ) ].each do |terminal_command| it "fails to validate if #{terminal_command.class} is not the last command scheduled" do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, config) next_command = Temporal::Workflow::Command::RecordMarker.new( name: Temporal::Workflow::StateManager::RELEASE_MARKER,