Skip to content

Commit

Permalink
Python Instance should resort to default serializer if nothing is spe…
Browse files Browse the repository at this point in the history
…cified (apache#180)
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 14b0fe4 commit 75ce665
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions pulsar-functions/runtime/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
# This is the message that the consumers put on the queue for the function thread to process
InternalMessage = namedtuple('InternalMessage', 'message topic serde consumer')
InternalQuitMessage = namedtuple('InternalQuitMessage', 'quit')
DEFAULT_SERIALIZER = "pulsarfunction.serde.IdentitySerDe"

# We keep track of the following metrics
class Stats(object):
Expand Down Expand Up @@ -128,7 +129,8 @@ def run(self):
)

for topic in self.instance_config.function_config.inputs:
serde_kclass = util.import_class(os.path.dirname(self.user_code), "pulsarfunction.IdentitySerDe", try_internal=True)
global DEFAULT_SERIALIZER
serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER, try_internal=True)
self.input_serdes[topic] = serde_kclass()
Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
self.consumers[topic] = self.pulsar_client.subscribe(
Expand All @@ -137,11 +139,7 @@ def run(self):
message_listener=partial(self.message_listener, topic, self.input_serdes[topic])
)

# See if we need to setup output producers/output serializers
if self.instance_config.function_config.outputSerdeClassName != None and \
len(self.instance_config.function_config.outputSerdeClassName) > 0:
serde_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_config.outputSerdeClassName, try_internal=True)
self.output_serde = serde_kclass()
# See if we need to setup output producers
function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_config.className, try_internal=True)
self.function_class = function_kclass()
if self.instance_config.function_config.sinkTopic != None and \
Expand Down Expand Up @@ -199,6 +197,8 @@ def done_producing(self, consumer, orig_message, result, sent_message):
def process_result(self, output, msg):
if output is not None and self.producer is not None:
output_bytes = None
if self.output_serde is None:
self.setup_output_serde()
try:
output_bytes = self.output_serde.serialize(output)
except:
Expand All @@ -213,6 +213,16 @@ def process_result(self, output, msg):
elif self.auto_ack and self.atleast_once:
msg.consumer.acknowledge(msg.message)

def setup_output_serde(self):
if self.instance_config.function_config.outputSerdeClassName != None and \
len(self.instance_config.function_config.outputSerdeClassName) > 0:
serde_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_config.outputSerdeClassName, try_internal=True)
self.output_serde = serde_kclass()
else:
global DEFAULT_SERIALIZER
serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER, try_internal=True)
self.output_serde = serde_kclass()

def message_listener(self, topic, serde, consumer, message):
item = InternalMessage(message, topic, serde, consumer)
self.queue.put(item, True)
Expand Down

0 comments on commit 75ce665

Please sign in to comment.