diff --git a/examples/bin/worker b/examples/bin/worker index 79b6313c..cead588d 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -1,6 +1,6 @@ #!/usr/bin/env ruby require_relative '../init' -require_relative '../lib/cryptconverter' +require_relative '../lib/crypt_payload_codec' require 'temporal/worker' @@ -11,8 +11,10 @@ Dir[File.expand_path('../middleware/*.rb', __dir__)].each { |f| require f } if !ENV['USE_ENCRYPTION'].nil? Temporal.configure do |config| config.task_queue = 'crypt' - config.converter = Temporal::CryptConverter.new( - payload_converter: Temporal::Configuration::DEFAULT_CONVERTER + config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new( + payload_codecs: [ + Temporal::CryptPayloadCodec.new + ] ) end end diff --git a/examples/lib/cryptconverter.rb b/examples/lib/crypt_payload_codec.rb similarity index 76% rename from examples/lib/cryptconverter.rb rename to examples/lib/crypt_payload_codec.rb index c968bfc9..72e6769d 100644 --- a/examples/lib/cryptconverter.rb +++ b/examples/lib/crypt_payload_codec.rb @@ -1,7 +1,8 @@ require 'openssl' +require 'temporal/connection/converter/codec/base' module Temporal - class CryptConverter < Temporal::Connection::Converter::Base + class CryptPayloadCodec < Temporal::Connection::Converter::Codec::Base CIPHER = 'aes-256-gcm'.freeze GCM_NONCE_SIZE = 12 GCM_TAG_SIZE = 16 @@ -10,26 +11,23 @@ class CryptConverter < Temporal::Connection::Converter::Base METADATA_ENCODING_KEY = 'encoding'.freeze METADATA_ENCODING = 'binary/encrypted'.freeze - def to_payloads(data) + def encode(payload) + return nil if payload.nil? + key_id = get_key_id key = get_key(key_id) - payloads = super(data) - - Temporalio::Api::Common::V1::Payloads.new( - payloads: payloads.payloads.map { |payload| encrypt_payload(payload, key_id, key) } - ) + encrypt_payload(payload, key_id, key) end + + def decode(payload) + return nil if payload.nil? - def from_payloads(payloads) - return nil if payloads.nil? - - payloads.payloads.map do |payload| - if payload.metadata[METADATA_ENCODING_KEY] == METADATA_ENCODING - payload = decrypt_payload(payload) - end - from_payload(payload) + if payload.metadata[METADATA_ENCODING_KEY] == METADATA_ENCODING + payload = decrypt_payload(payload) end + + payload end private diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index bc3f78a6..21878c1f 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -1,5 +1,5 @@ require 'workflows/hello_world_workflow' -require 'lib/cryptconverter' +require 'lib/crypt_payload_codec' require 'grpc/errors' describe 'Converter', :integration do @@ -8,8 +8,10 @@ Temporal.configure do |config| config.task_queue = 'crypt' - config.converter = Temporal::CryptConverter.new( - payload_converter: Temporal::Configuration::DEFAULT_CONVERTER + config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new( + payload_codecs: [ + Temporal::CryptPayloadCodec.new + ] ) end @@ -65,8 +67,8 @@ completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first result = completion_event.workflow_execution_completed_event_attributes.result - converter = Temporal.configuration.converter + payload_codec = Temporal.configuration.payload_codec - expect(converter.from_payloads(result)&.first).to eq('Hello World, Tom') + expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"') end end diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb index ad703542..5c771e21 100644 --- a/lib/temporal/concerns/payloads.rb +++ b/lib/temporal/concerns/payloads.rb @@ -2,13 +2,19 @@ module Temporal module Concerns module Payloads def from_payloads(payloads) + payloads = payload_codec.decodes(payloads) payload_converter.from_payloads(payloads) end def from_payload(payload) + payload = payload_codec.decode(payload) payload_converter.from_payload(payload) end + def from_payload_map_without_codec(payload_map) + payload_map.map { |key, value| [key, payload_converter.from_payload(value)] }.to_h + end + def from_result_payloads(payloads) from_payloads(payloads)&.first end @@ -30,11 +36,20 @@ def from_payload_map(payload_map) end def to_payloads(data) - payload_converter.to_payloads(data) + payloads = payload_converter.to_payloads(data) + payload_codec.encodes(payloads) end def to_payload(data) - payload_converter.to_payload(data) + payload = payload_converter.to_payload(data) + payload_codec.encode(payload) + end + + def to_payload_map_without_codec(data) + # skips the payload_codec step because search attributes don't use this pipeline + data.transform_values do |value| + payload_converter.to_payload(value) + end end def to_result_payloads(data) @@ -62,6 +77,10 @@ def to_payload_map(data) def payload_converter Temporal.configuration.converter end + + def payload_codec + Temporal.configuration.payload_codec + end end end end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 20fdd7cf..7ba12f28 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -7,6 +7,7 @@ require 'temporal/connection/converter/payload/json' require 'temporal/connection/converter/payload/proto_json' require 'temporal/connection/converter/composite' +require 'temporal/connection/converter/codec/chain' module Temporal class Configuration @@ -14,7 +15,7 @@ class Configuration Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) attr_reader :timeouts, :error_handlers - attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators + attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, :payload_codec # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -45,6 +46,14 @@ class Configuration Temporal::Connection::Converter::Payload::JSON.new ] ).freeze + + # The Payload Codec is an optional step that happens between the wire and the Payload Converter: + # Temporal Server <--> Wire <--> Payload Codec <--> Payload Converter <--> User code + # which can be useful for transformations such as compression and encryption + # more info at https://docs.temporal.io/security#payload-codec + DEFAULT_PAYLOAD_CODEC = Temporal::Connection::Converter::Codec::Chain.new( + payload_codecs: [] + ).freeze def initialize @connection_type = :grpc @@ -55,6 +64,7 @@ def initialize @task_queue = DEFAULT_TASK_QUEUE @headers = DEFAULT_HEADERS @converter = DEFAULT_CONVERTER + @payload_codec = DEFAULT_PAYLOAD_CODEC @use_error_serialization_v2 = false @error_handlers = [] @credentials = :this_channel_is_insecure diff --git a/lib/temporal/connection/converter/codec/base.rb b/lib/temporal/connection/converter/codec/base.rb new file mode 100644 index 00000000..d8748909 --- /dev/null +++ b/lib/temporal/connection/converter/codec/base.rb @@ -0,0 +1,35 @@ +module Temporal + module Connection + module Converter + module Codec + class Base + def encodes(payloads) + return nil if payloads.nil? + + Temporalio::Api::Common::V1::Payloads.new( + payloads: payloads.payloads.map(&method(:encode)) + ) + end + + def decodes(payloads) + return nil if payloads.nil? + + Temporalio::Api::Common::V1::Payloads.new( + payloads: payloads.payloads.map(&method(:decode)) + ) + end + + def encode(payload) + # should return Temporalio::Api::Common::V1::Payload + raise NotImplementedError, 'codec converter needs to implement encode' + end + + def decode(payload) + # should return Temporalio::Api::Common::V1::Payload + raise NotImplementedError, 'codec converter needs to implement decode' + end + end + end + end + end +end diff --git a/lib/temporal/connection/converter/codec/chain.rb b/lib/temporal/connection/converter/codec/chain.rb new file mode 100644 index 00000000..fc1a16f8 --- /dev/null +++ b/lib/temporal/connection/converter/codec/chain.rb @@ -0,0 +1,36 @@ +require 'temporal/connection/converter/codec/base' + +module Temporal + module Connection + module Converter + module Codec + # Performs encoding/decoding on the payloads via the given payload codecs. When encoding + # the codecs are applied last to first meaning the earlier encodings wrap the later ones. + # When decoding, the codecs are applied first to last to reverse the effect. + class Chain < Base + def initialize(payload_codecs:) + @payload_codecs = payload_codecs + end + + def encode(payload) + payload_codecs.reverse_each do |payload_codec| + payload = payload_codec.encode(payload) + end + payload + end + + def decode(payload) + payload_codecs.each do |payload_codec| + payload = payload_codec.decode(payload) + end + payload + end + + private + + attr_reader :payload_codecs + end + end + end + end +end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 52c440fa..e7b7e95e 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -138,7 +138,7 @@ def start_workflow_execution( fields: to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map(search_attributes || {}) + indexed_fields: to_payload_map_without_codec(search_attributes || {}) ), ) @@ -401,7 +401,7 @@ def signal_with_start_workflow_execution( fields: to_payload_map(memo || {}) ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map(search_attributes || {}) + indexed_fields: to_payload_map_without_codec(search_attributes || {}) ), ) diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index 9a6a7ecf..c2b484bb 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -43,7 +43,7 @@ def serialize_memo(memo) def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index 3cc3a0aa..90d08c79 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -66,7 +66,7 @@ def serialize_parent_close_policy(parent_close_policy) def serialize_search_attributes(search_attributes) return unless search_attributes - Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map(search_attributes)) + Temporalio::Api::Common::V1::SearchAttributes.new(indexed_fields: to_payload_map_without_codec(search_attributes)) end end end diff --git a/lib/temporal/connection/serializer/upsert_search_attributes.rb b/lib/temporal/connection/serializer/upsert_search_attributes.rb index 0af6b79f..e8aa652c 100644 --- a/lib/temporal/connection/serializer/upsert_search_attributes.rb +++ b/lib/temporal/connection/serializer/upsert_search_attributes.rb @@ -13,7 +13,7 @@ def to_proto upsert_workflow_search_attributes_command_attributes: Temporalio::Api::Command::V1::UpsertWorkflowSearchAttributesCommandAttributes.new( search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map(object.search_attributes || {}) + indexed_fields: to_payload_map_without_codec(object.search_attributes || {}) ), ) ) diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index 88d27cd7..e3f70021 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -3,7 +3,8 @@ module Temporal class Workflow - class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, :search_attributes, keyword_init: true) + class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, + :history_length, :memo, :search_attributes, keyword_init: true) extend Concerns::Payloads STATUSES = [ @@ -13,11 +14,11 @@ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, Temporal::Workflow::Status::CANCELED, Temporal::Workflow::Status::TERMINATED, Temporal::Workflow::Status::CONTINUED_AS_NEW, - Temporal::Workflow::Status::TIMED_OUT, + Temporal::Workflow::Status::TIMED_OUT ] def self.generate_from(response) - search_attributes = response.search_attributes.nil? ? {} : self.from_payload_map(response.search_attributes.indexed_fields) + search_attributes = response.search_attributes.nil? ? {} : from_payload_map_without_codec(response.search_attributes.indexed_fields) new( workflow: response.type.name, workflow_id: response.execution.workflow_id, @@ -26,8 +27,8 @@ def self.generate_from(response) close_time: response.close_time&.to_time, status: Temporal::Workflow::Status::API_STATUS_MAP.fetch(response.status), history_length: response.history_length, - memo: self.from_payload_map(response.memo.fields), - search_attributes: search_attributes, + memo: from_payload_map(response.memo.fields), + search_attributes: search_attributes ).freeze end diff --git a/proto b/proto index e4246bbd..4c2f6a28 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit e4246bbd59fd1f850bdd5be6a59d6d2f8e532d76 +Subproject commit 4c2f6a281fa3fde8b0a24447de3e0d0f47d230b4 diff --git a/spec/unit/lib/temporal/connection/converter/codec/base_spec.rb b/spec/unit/lib/temporal/connection/converter/codec/base_spec.rb new file mode 100644 index 00000000..22d0eecf --- /dev/null +++ b/spec/unit/lib/temporal/connection/converter/codec/base_spec.rb @@ -0,0 +1,71 @@ +require 'temporal/connection/converter/codec/chain' + +describe Temporal::Connection::Converter::Codec::Base do + let(:payloads) do + Temporalio::Api::Common::V1::Payloads.new( + payloads: [ + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'json/plain' }, + data: '{}'.b + ) + ] + ) + end + + let (:encoded_payload) do + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'binary/encrypted' }, + data: 'encrypted-payload'.b + ) + end + + let(:base_codec) { described_class.new } + + describe '#encodes' do + it 'returns nil if payloads is nil' do + expect(base_codec.encodes(nil)).to be_nil + end + + it 'encodes each payload in payloads' do + expect(base_codec).to receive(:encode).with(payloads.payloads[0]).and_return(encoded_payload) + base_codec.encodes(payloads) + end + + it 'returns a new Payloads object with the encoded payloads' do + encoded_payloads = Temporalio::Api::Common::V1::Payloads.new( + payloads: [Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'json/plain' }, + data: 'encoded_payload'.b + )] + ) + + allow(base_codec).to receive(:encode).and_return(encoded_payloads.payloads[0]) + + expect(base_codec.encodes(payloads)).to eq(encoded_payloads) + end + end + + describe '#decodes' do + it 'returns nil if payloads is nil' do + expect(base_codec.decodes(nil)).to be_nil + end + + it 'decodes each payload in payloads' do + expect(base_codec).to receive(:decode).with(payloads.payloads[0]).and_return(payloads.payloads[0]) + base_codec.decodes(payloads) + end + + it 'returns a new Payloads object with the decoded payloads' do + decoded_payloads = Temporalio::Api::Common::V1::Payloads.new( + payloads: [Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'json/plain' }, + data: 'decoded_payload'.b + )] + ) + + allow(base_codec).to receive(:decode).and_return(decoded_payloads.payloads[0]) + + expect(base_codec.decodes(payloads)).to eq(decoded_payloads) + end + end +end diff --git a/spec/unit/lib/temporal/connection/converter/codec/chain_spec.rb b/spec/unit/lib/temporal/connection/converter/codec/chain_spec.rb new file mode 100644 index 00000000..77c189db --- /dev/null +++ b/spec/unit/lib/temporal/connection/converter/codec/chain_spec.rb @@ -0,0 +1,60 @@ +require 'temporal/connection/converter/codec/chain' + +describe Temporal::Connection::Converter::Codec::Chain do + let(:codec1) { double('PayloadCodec1') } + let(:codec2) { double('PayloadCodec2') } + let(:codec3) { double('PayloadCodec3') } + + let(:payload_1) do + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'binary/plain' }, + data: 'payload_1'.b + ) + end + let(:payload_2) do + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'binary/plain' }, + data: 'payload_2'.b + ) + end + let(:payload_3) do + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'binary/plain' }, + data: 'payload_3'.b + ) + end + let(:payload_4) do + Temporalio::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'binary/plain' }, + data: 'payload_4'.b + ) + end + + subject { described_class.new(payload_codecs: [codec1, codec2, codec3]) } + + describe '#encode' do + it 'applies payload codecs in reverse order' do + expect(codec3).to receive(:encode).with(payload_1).and_return(payload_2) + expect(codec2).to receive(:encode).with(payload_2).and_return(payload_3) + expect(codec1).to receive(:encode).with(payload_3).and_return(payload_4) + + result = subject.encode(payload_1) + + expect(result.metadata).to eq(payload_4.metadata) + expect(result.data).to eq(payload_4.data) + end + end + + describe '#decode' do + it 'applies payload codecs in the original order' do + expect(codec1).to receive(:decode).with(payload_1).and_return(payload_2) + expect(codec2).to receive(:decode).with(payload_2).and_return(payload_3) + expect(codec3).to receive(:decode).with(payload_3).and_return(payload_4) + + result = subject.decode(payload_1) + + expect(result.metadata).to eq(payload_4.metadata) + expect(result.data).to eq(payload_4.data) + end + end +end diff --git a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb index da5d8879..bc94128f 100644 --- a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb @@ -29,7 +29,7 @@ class TestDeserializer ) command_attributes = result.upsert_workflow_search_attributes_command_attributes expect(command_attributes).not_to be_nil - actual_attributes = TestDeserializer.from_payload_map(command_attributes&.search_attributes&.indexed_fields) + actual_attributes = TestDeserializer.from_payload_map_without_codec(command_attributes&.search_attributes&.indexed_fields) expect(actual_attributes).to eql(expected_attributes) end