From 7c2920fde22112c6330ad51ed2eb471d48c08f03 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Tue, 14 Dec 2021 10:45:48 -0800 Subject: [PATCH] Adding retry to from-kafka and fixing dropna --- .pylintrc | 3 +- morpheus/pipeline/general_stages.py | 45 +---- .../pipeline/inference/inference_stage.py | 15 +- morpheus/pipeline/input/from_cloudtrail.py | 62 +++---- morpheus/pipeline/input/from_file.py | 21 +-- morpheus/pipeline/input/from_kafka.py | 172 +++++++++++------- morpheus/pipeline/messages.py | 2 +- morpheus/pipeline/output/serialize.py | 10 +- morpheus/pipeline/output/to_kafka.py | 38 ++-- morpheus/pipeline/output/validation.py | 23 +-- morpheus/pipeline/postprocess/timeseries.py | 25 ++- morpheus/pipeline/preprocess/autoencoder.py | 17 +- morpheus/pipeline/preprocessing.py | 61 +------ 13 files changed, 209 insertions(+), 285 deletions(-) diff --git a/.pylintrc b/.pylintrc index dec5efa3d2..6804420e18 100644 --- a/.pylintrc +++ b/.pylintrc @@ -3,7 +3,7 @@ # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may # run arbitrary code. -extension-pkg-whitelist= +extension-pkg-allow-list=neo,morpheus._lib # Specify a score threshold to be exceeded before program exits with error. fail-under=10 @@ -603,4 +603,3 @@ valid-metaclass-classmethod-first-arg=cls # "BaseException, Exception". overgeneral-exceptions=BaseException, Exception - diff --git a/morpheus/pipeline/general_stages.py b/morpheus/pipeline/general_stages.py index b97dfc0c23..ed2b67758e 100644 --- a/morpheus/pipeline/general_stages.py +++ b/morpheus/pipeline/general_stages.py @@ -18,6 +18,7 @@ import cupy as cp import neo +from neo.core import operators as ops from tqdm import tqdm import cudf @@ -148,26 +149,7 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai # Store all messages until on_complete is called and then push them def node_fn(input: neo.Observable, output: neo.Subscriber): - delayed_messages = [] - - def obs_on_next(x): - - delayed_messages.append(x) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - - # Now push all the messages - for x in delayed_messages: - output.on_next(x) - - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - - input.subscribe(obs) + input.pipe(ops.to_list(), ops.flatten()).subscribe(output) node = seg.make_node_full(self.unique_name, node_fn) seg.make_edge(input_stream[0]) @@ -468,30 +450,13 @@ def filter(self, x: MultiResponseProbsMessage) -> typing.List[MultiResponseProbs def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPair: - # Reduce messages to only have detections - stream = seg.make_node(self.unique_name, self.filter) - - seg.make_edge(input_stream[0], stream) - # Convert list back to single MultiResponseProbsMessage def flatten_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: typing.List): - - for y in x: - output.on_next(y) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - input.subscribe(obs) + input.pipe(ops.map(self.filter), ops.flatten()).subscribe(output) - flattened = seg.make_node_full(self.unique_name + "-flatten", flatten_fn) - seg.make_edge(stream, flattened) + flattened = seg.make_node_full(self.unique_name, flatten_fn) + seg.make_edge(input_stream[0], flattened) stream = flattened return stream, MultiResponseProbsMessage diff --git a/morpheus/pipeline/inference/inference_stage.py b/morpheus/pipeline/inference/inference_stage.py index f0195a0d53..98aaa444b7 100644 --- a/morpheus/pipeline/inference/inference_stage.py +++ b/morpheus/pipeline/inference/inference_stage.py @@ -21,6 +21,7 @@ import cupy as cp import neo +from neo.core import operators as ops from tornado.ioloop import IOLoop from morpheus.config import Config @@ -252,7 +253,7 @@ def py_inference_fn(input: neo.Observable, output: neo.Subscriber): outstanding_requests = 0 - def obs_on_next(x: MultiInferenceMessage): + def on_next(x: MultiInferenceMessage): nonlocal outstanding_requests batches = self._split_batches(x, self._max_batch_size) @@ -283,17 +284,9 @@ def set_output_fut(resp: ResponseMemoryProbs, b, f: neo.Future): for f in fut_list: f.result() - output.on_next(output_message) + return output_message - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - - input.subscribe(obs) + input.pipe(ops.map(on_next)).subscribe(output) assert outstanding_requests == 0, "Not all inference requests were completed" diff --git a/morpheus/pipeline/input/from_cloudtrail.py b/morpheus/pipeline/input/from_cloudtrail.py index 96e6f57944..e390afc55d 100644 --- a/morpheus/pipeline/input/from_cloudtrail.py +++ b/morpheus/pipeline/input/from_cloudtrail.py @@ -22,6 +22,7 @@ import neo import numpy as np import pandas as pd +from neo.core import operators as ops from morpheus._lib.common import FiberQueue from morpheus.config import Config @@ -418,44 +419,29 @@ def _post_build_single(self, seg: neo.Segment, out_pair: StreamPair) -> StreamPa out_stream = out_pair[0] out_type = out_pair[1] - # At this point, we have batches of filenames to process. Make a node for processing batches of filenames into - # batches of dataframes - filenames_to_df = seg.make_node( - self.unique_name + "-filetodf", - partial( - self.files_to_dfs_per_user, - file_type=self._file_type, - userid_column_name=self._user_column_name, - feature_columns=None, # Use None here to leave all columns in - userid_filter=self._userid_filter, - repeat_count=self._repeat_count)) - seg.make_edge(out_stream, filenames_to_df) - - # Now group the batch of dataframes into a single df, split by user, and send a single UserMessageMeta per user - df_to_meta = seg.make_node(self.unique_name + "-usersplit", self._build_user_metadata) - seg.make_edge(filenames_to_df, df_to_meta) - - # Finally, flatten to a single stream - def flatten_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: typing.List): - - for y in x: - output.on_next(y) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - - input.subscribe(obs) - - flattened = seg.make_node_full(self.unique_name + "-post", flatten_fn) - seg.make_edge(df_to_meta, flattened) - - out_stream = flattened + def node_fn(input: neo.Observable, output: neo.Subscriber): + + input.pipe( + # At this point, we have batches of filenames to process. Make a node for processing batches of + # filenames into batches of dataframes + ops.map( + partial( + self.files_to_dfs_per_user, + file_type=self._file_type, + userid_column_name=self._user_column_name, + feature_columns=None, # Use None here to leave all columns in + userid_filter=self._userid_filter, + repeat_count=self._repeat_count)), + # Now group the batch of dataframes into a single df, split by user, and send a single UserMessageMeta + # per user + ops.map(self._build_user_metadata), + # Finally flatten to single meta + ops.flatten()).subscribe(output) + + post_node = seg.make_node_full(self.unique_name + "-post", node_fn) + seg.make_edge(out_stream, post_node) + + out_stream = post_node out_type = UserMessageMeta return super()._post_build_single(seg, (out_stream, out_type)) diff --git a/morpheus/pipeline/input/from_file.py b/morpheus/pipeline/input/from_file.py index 7a744c043c..82ea2b70a3 100644 --- a/morpheus/pipeline/input/from_file.py +++ b/morpheus/pipeline/input/from_file.py @@ -17,6 +17,7 @@ import neo import typing_utils +from neo.core import operators as ops import morpheus._lib.stages as neos from morpheus.config import Config @@ -110,23 +111,11 @@ def _post_build_single(self, seg: neo.Segment, out_pair: StreamPair) -> StreamPa # Convert our list of dataframes into the desired type. Flatten if necessary if (typing_utils.issubtype(out_type, typing.List)): - def flatten_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: typing.List): + def node_fn(input: neo.Observable, output: neo.Subscriber): - for y in x: - output.on_next(y) + input.pipe(ops.flatten()).subscribe(output) - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - - input.subscribe(obs) - - flattened = seg.make_node_full(self.unique_name + "-post", flatten_fn) + flattened = seg.make_node_full(self.unique_name + "-post", node_fn) seg.make_edge(out_stream, flattened) out_stream = flattened out_type = typing.get_args(out_type)[0] @@ -139,7 +128,7 @@ def _generate_frames(self): self._filename, self._file_type, filter_nulls=True, - df_type="pandas", + df_type="cudf", ) count = 0 diff --git a/morpheus/pipeline/input/from_kafka.py b/morpheus/pipeline/input/from_kafka.py index fd1d78e14f..c9e0304891 100644 --- a/morpheus/pipeline/input/from_kafka.py +++ b/morpheus/pipeline/input/from_kafka.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import time import weakref @@ -26,6 +27,8 @@ from morpheus.pipeline.pipeline import SingleOutputSource from morpheus.pipeline.pipeline import StreamPair +logger = logging.getLogger(__name__) + class KafkaSourceStage(SingleOutputSource): """ @@ -124,93 +127,134 @@ def _source_generator(self, s: neo.Subscriber): # weakref.finalize(self, lambda c=consumer: _close_consumer(c)) tp = ck.TopicPartition(self._topic, 0, 0) - # blocks for consumer thread to come up - consumer.get_watermark_offsets(tp) - - if npartitions is None: - - kafka_cluster_metadata = consumer.list_topics(self._topic) + attempts = 0 + max_attempts = 5 - if self._engine == "cudf": # pragma: no cover - npartitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) - else: - npartitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - - positions = [0] * npartitions + # Attempt to connect to the cluster. Try 5 times before giving up + while attempts < max_attempts: + try: + # blocks for consumer thread to come up + consumer.get_watermark_offsets(tp) - tps = [] - for partition in range(npartitions): - tps.append(ck.TopicPartition(self._topic, partition)) + logger.debug("Connected to Kafka source at '%s' on attempt #%d/%d", + self._consumer_conf["bootstrap.servers"], + attempts + 1, + max_attempts) - while s.is_subscribed(): - try: - committed = consumer.committed(tps, timeout=1) - except ck.KafkaException: - pass - else: - for tp in committed: - positions[tp.partition] = tp.offset break + except (RuntimeError, ck.KafkaException): + attempts += 1 + + # Raise the error if we hit the max + if (attempts >= max_attempts): + logger.exception(("Error while getting Kafka watermark offsets. Max attempts (%d) reached. " + "Check the bootstrap_servers parameter ('%s')"), + max_attempts, + self._consumer_conf["bootstrap.servers"]) + raise + else: + logger.warning("Error while getting Kafka watermark offsets. Attempt #%d/%d", + attempts, + max_attempts, + exc_info=True) + + # Exponential backoff + time.sleep(2.0**attempts) - while s.is_subscribed(): - out = [] + try: + if npartitions is None: - if self._refresh_partitions: kafka_cluster_metadata = consumer.list_topics(self._topic) if self._engine == "cudf": # pragma: no cover - new_partitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) + npartitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) else: - new_partitions = len(kafka_cluster_metadata.topics[self._topic].partitions) + npartitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - if new_partitions > npartitions: - positions.extend([-1001] * (new_partitions - npartitions)) - npartitions = new_partitions + positions = [0] * npartitions + tps = [] for partition in range(npartitions): + tps.append(ck.TopicPartition(self._topic, partition)) - tp = ck.TopicPartition(self._topic, partition, 0) - + while s.is_subscribed(): try: - low, high = consumer.get_watermark_offsets(tp, timeout=0.1) - except (RuntimeError, ck.KafkaException): - continue + committed = consumer.committed(tps, timeout=1) + except ck.KafkaException: + pass + else: + for tp in committed: + positions[tp.partition] = tp.offset + break - started = True + while s.is_subscribed(): + out = [] - if 'auto.offset.reset' in consumer_params.keys(): - if consumer_params['auto.offset.reset'] == 'latest' and positions[partition] == -1001: - positions[partition] = high + if self._refresh_partitions: + kafka_cluster_metadata = consumer.list_topics(self._topic) - current_position = positions[partition] + if self._engine == "cudf": # pragma: no cover + new_partitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) + else: + new_partitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - lowest = max(current_position, low) + if new_partitions > npartitions: + positions.extend([-1001] * (new_partitions - npartitions)) + npartitions = new_partitions - if high > lowest + self._max_batch_size: - high = lowest + self._max_batch_size - if high > lowest: - out.append((consumer_params, self._topic, partition, self._keys, lowest, high - 1)) - positions[partition] = high + for partition in range(npartitions): - consumer_params['auto.offset.reset'] = 'earliest' + tp = ck.TopicPartition(self._topic, partition, 0) - if (out): - for part in out: + try: + low, high = consumer.get_watermark_offsets(tp, timeout=0.1) + except (RuntimeError, ck.KafkaException): + continue - meta = self._kafka_params_to_messagemeta(part) + if 'auto.offset.reset' in consumer_params.keys(): + if consumer_params['auto.offset.reset'] == 'latest' and positions[partition] == -1001: + positions[partition] = high - # Once the meta goes out of scope, commit it - def commit(topic, part_no, keys, lowest, offset): - # topic, part_no, _, _, offset = part[1:] - _tp = ck.TopicPartition(topic, part_no, offset + 1) - consumer.commit(offsets=[_tp], asynchronous=True) + current_position = positions[partition] - weakref.finalize(meta, commit, *part[1:]) + lowest = max(current_position, low) + + if high > lowest + self._max_batch_size: + high = lowest + self._max_batch_size + if high > lowest: + out.append((consumer_params, self._topic, partition, self._keys, lowest, high - 1)) + positions[partition] = high + + consumer_params['auto.offset.reset'] = 'earliest' + + if (out): + for part in out: + + meta = self._kafka_params_to_messagemeta(part) + + # Once the meta goes out of scope, commit it + def commit(topic, part_no, keys, lowest, offset): + # topic, part_no, _, _, offset = part[1:] + try: + _tp = ck.TopicPartition(topic, part_no, offset + 1) + consumer.commit(offsets=[_tp], asynchronous=True) + except: + logger.exception(("Error occurred in `from-kafka` stage with " + "broker '%s' while committing message: %d"), + self._consumer_conf["bootstrap.servers"], + offset) + + weakref.finalize(meta, commit, *part[1:]) + + # Push the message meta + s.on_next(meta) + else: + time.sleep(self._poll_interval) + except Exception: + logger.exception(("Error occurred in `from-kafka` stage with broker '%s' while processing messages"), + self._consumer_conf["bootstrap.servers"]) + raise - # Push the message meta - s.on_next(meta) - else: - time.sleep(self._poll_interval) finally: # Close the consumer and call on_completed consumer.close() @@ -235,6 +279,9 @@ def _read_gdf(kafka_configs, batch_timeout=10000, delimiter="\n", message_format="json"): + """ + Replicates `custreamz.Consumer.read_gdf` function which does not work for some reason + """ if topic is None: raise ValueError("ERROR: You must specifiy the topic " "that you want to consume from") @@ -265,7 +312,8 @@ def _read_gdf(kafka_configs, result = cudf_readers[message_format](kafka_datasource, engine="cudf", lines=lines) return cudf.DataFrame(data=result._data, index=result._index) - + except Exception as ex: + logger.exception("Error occurred converting KafkaDatasource to Dataframe.") finally: if (kafka_datasource is not None): # Close up the cudf datasource instance diff --git a/morpheus/pipeline/messages.py b/morpheus/pipeline/messages.py index 5fd962b672..0849072ea0 100644 --- a/morpheus/pipeline/messages.py +++ b/morpheus/pipeline/messages.py @@ -149,7 +149,7 @@ def get_meta(self, columns: typing.Union[None, str, typing.List[str]] = None): idx = self.meta.df.index[self.mess_offset:self.mess_offset + self.mess_count] if (isinstance(idx, cudf.RangeIndex)): - idx = slice(idx.start, idx.stop, idx.step) + idx = slice(idx.start, idx.stop - 1, idx.step) if (columns is None): return self.meta.df.loc[idx, :] diff --git a/morpheus/pipeline/output/serialize.py b/morpheus/pipeline/output/serialize.py index e43c838240..1cbd7f94b9 100644 --- a/morpheus/pipeline/output/serialize.py +++ b/morpheus/pipeline/output/serialize.py @@ -17,6 +17,7 @@ import re import typing from functools import partial +from io import StringIO import neo import pandas as pd @@ -121,11 +122,16 @@ def convert_to_json(x: MultiMessage, include_columns: typing.Pattern, exclude_co df = SerializeStage.convert_to_df(x, include_columns=include_columns, exclude_columns=exclude_columns) + str_buf = StringIO() + # Convert to list of json string objects - output_strs = [json.dumps(y) for y in df.to_dict(orient="records")] + df.to_json(str_buf, orient="records", lines=True) + + # Start from beginning + str_buf.seek(0) # Return list of strs to write out - return output_strs + return str_buf.readlines() @staticmethod def convert_to_csv(x: MultiMessage, include_columns: typing.Pattern, exclude_columns: typing.List[typing.Pattern]): diff --git a/morpheus/pipeline/output/to_kafka.py b/morpheus/pipeline/output/to_kafka.py index c79abffef4..0e5b5d54e3 100644 --- a/morpheus/pipeline/output/to_kafka.py +++ b/morpheus/pipeline/output/to_kafka.py @@ -12,16 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import time import typing import confluent_kafka as ck import neo +from neo.core import operators as ops from morpheus.config import Config from morpheus.pipeline.pipeline import SinglePortStage from morpheus.pipeline.pipeline import StreamPair +logger = logging.getLogger(__name__) + class WriteToKafkaStage(SinglePortStage): """ @@ -66,7 +70,6 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai # Convert the messages to rows of strings stream = input_stream[0] - input_type = input_stream[1] def node_fn(input: neo.Observable, output: neo.Subscriber): @@ -74,17 +77,20 @@ def node_fn(input: neo.Observable, output: neo.Subscriber): outstanding_requests = 0 - def obs_on_next(x: typing.List[str]): + def on_next(x: typing.List[str]): nonlocal outstanding_requests - futures = [] - - def cb(err, msg): + def cb(_, msg): if msg is not None and msg.value() is not None: # fut.set_result(None) pass else: # fut.set_exception(err or msg.error()) + logger.error(("Error occurred in `to-kafka` stage with broker '%s' " + "while committing message:\n%s\nError:\n%s"), + self._kafka_conf["bootstrap.servers"], + msg.value(), + msg.error()) output.on_error(msg.error()) for m in x: @@ -97,9 +103,12 @@ def cb(err, msg): break except BufferError: time.sleep(self._poll_time) - except Exception as e: - output.on_error(e) - return + except Exception: + logger.exception(("Error occurred in `to-kafka` stage with broker '%s' " + "while committing message:\n%s"), + self._kafka_conf["bootstrap.servers"], + m) + break finally: # Try and process some producer.poll(0) @@ -107,20 +116,13 @@ def cb(err, msg): while len(producer) > 0: producer.poll(0) - output.on_next(x) + return x - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): + def on_completed(): producer.flush(-1) - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) - - input.subscribe(obs) + input.pipe(ops.map(on_next), ops.on_completed(on_completed)).subscribe(output) assert outstanding_requests == 0, "Not all inference requests were completed" diff --git a/morpheus/pipeline/output/validation.py b/morpheus/pipeline/output/validation.py index 35781dce3e..cf3014ebb8 100644 --- a/morpheus/pipeline/output/validation.py +++ b/morpheus/pipeline/output/validation.py @@ -22,6 +22,7 @@ import neo import pandas as pd +from neo.core import operators as ops from pandas.core.frame import DataFrame import cudf @@ -227,29 +228,13 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai # Store all messages until on_complete is called and then build the dataframe and compare def node_fn(input: neo.Observable, output: neo.Subscriber): - - delayed_messages = [] - - def obs_on_next(x): - - delayed_messages.append(x) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): + def do_compare(delayed_messages): self._do_comparison(delayed_messages) - # Now push all the messages - for x in delayed_messages: - output.on_next(x) - - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) + return delayed_messages - input.subscribe(obs) + input.pipe(ops.to_list(), ops.map(do_compare), ops.flatten()).subscribe(output) node = seg.make_node_full(self.unique_name, node_fn) seg.make_edge(input_stream[0], node) diff --git a/morpheus/pipeline/postprocess/timeseries.py b/morpheus/pipeline/postprocess/timeseries.py index 3eaa2405d6..85cf45e03a 100644 --- a/morpheus/pipeline/postprocess/timeseries.py +++ b/morpheus/pipeline/postprocess/timeseries.py @@ -8,6 +8,7 @@ import cupy as cp import neo import pandas as pd +from neo.core import operators as ops from streamz.core import Stream from streamz.core import sync @@ -552,31 +553,27 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai out_type = input_stream[1] def node_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: MultiResponseAEMessage): + def on_next(x: MultiResponseAEMessage): message_list: typing.List[MultiResponseMessage] = self._call_timeseries_user(x) - for y in message_list: + return message_list - output.on_next(y) + def on_completed(): - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): + to_send = [] for ts in self._timeseries_per_user.values(): message_list: typing.List[MultiResponseMessage] = ts._calc_timeseries(None, True) - for y in message_list: - - output.on_next(y) - - output.on_completed() + to_send = to_send + message_list - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) + return to_send if len(to_send) > 0 else None - input.subscribe(obs) + input.pipe(ops.map(on_next), + ops.filter(lambda x: len(x) > 0), + ops.on_completed(on_completed), + ops.flatten()).subscribe(output) stream = seg.make_node_full(self.unique_name, node_fn) diff --git a/morpheus/pipeline/preprocess/autoencoder.py b/morpheus/pipeline/preprocess/autoencoder.py index 45ee327fcd..dc8e2db8f3 100644 --- a/morpheus/pipeline/preprocess/autoencoder.py +++ b/morpheus/pipeline/preprocess/autoencoder.py @@ -26,6 +26,7 @@ import pandas as pd import torch from dfencoder import AutoEncoder +from neo.core import operators as ops from morpheus.config import Config from morpheus.pipeline.file_types import FileTypes @@ -254,26 +255,22 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai get_model_fn = self._train_model def node_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: UserMessageMeta): + def on_next(x: UserMessageMeta): model = get_model_fn(x) full_message = MultiAEMessage(meta=x, mess_offset=0, mess_count=x.count, model=model) + to_send = [] + # Now split into batches for i in range(0, full_message.mess_count, self._batch_size): - output.on_next(full_message.get_slice(i, min(i + self._batch_size, full_message.mess_count))) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() + to_send.append(full_message.get_slice(i, min(i + self._batch_size, full_message.mess_count))) - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) + return to_send - input.subscribe(obs) + input.pipe(ops.map(on_next), ops.flatten()).subscribe(output) node = seg.make_node_full(self.unique_name, node_fn) seg.make_edge(stream, node) diff --git a/morpheus/pipeline/preprocessing.py b/morpheus/pipeline/preprocessing.py index 3233f81188..c521d4ff7a 100644 --- a/morpheus/pipeline/preprocessing.py +++ b/morpheus/pipeline/preprocessing.py @@ -25,6 +25,7 @@ import numpy as np import pandas as pd import typing_utils +from neo.core import operators as ops import cudf from cudf.core.subword_tokenizer import SubwordTokenizer @@ -99,60 +100,25 @@ def process_dataframe(x: MessageMeta, batch_size: int) -> typing.List[MultiMessa return output - @staticmethod - def add_start_time(x: MultiMessage): - - curr_time = get_time_ms() - - x.set_meta("ts_start", curr_time) - - return x - def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPair: stream = input_stream[0] out_type = MultiMessage - def deserialize_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: MessageMeta): - - message_list: typing.List[MultiMessage] = DeserializeStage.process_dataframe(x, self._batch_size) - - for y in message_list: - - output.on_next(y) - - def obs_on_error(x): - output.on_error(x) - - def obs_on_completed(): - output.on_completed() - - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) + def node_fn(input: neo.Observable, output: neo.Subscriber): - input.subscribe(obs) + input.pipe(ops.map(partial(DeserializeStage.process_dataframe, batch_size=self._batch_size)), + ops.flatten()).subscribe(output) if (Config.get().use_cpp): stream = neos.DeserializeStage(seg, self.unique_name, self._batch_size) else: - stream = seg.make_node_full(self.unique_name + "-flatten", deserialize_fn) + stream = seg.make_node_full(self.unique_name, node_fn) seg.make_edge(input_stream[0], stream) return stream, out_type - def _post_build_single(self, seg: neo.Segment, out_pair: StreamPair) -> StreamPair: - - if (self._should_log_timestamps): - - stream = seg.make_node(self.unique_name + "-ts", DeserializeStage.add_start_time) - seg.make_edge(out_pair[0], stream) - - # Only have one port - out_pair = (stream, out_pair[1]) - - return super()._post_build_single(seg, out_pair) - class DropNullStage(SinglePortStage): """ @@ -195,22 +161,13 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai # Finally, flatten to a single stream def node_fn(input: neo.Observable, output: neo.Subscriber): - def obs_on_next(x: MessageMeta): - - x.df = x.df[~x.df[self._column].isna()] - - if (not x.empty): - output.on_next(x) - - def obs_on_error(x): - output.on_error(x) + def on_next(x: MessageMeta): - def obs_on_completed(): - output.on_completed() + y = MessageMeta(x.df[~x.df[self._column].isna()]) - obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed) + return y - input.subscribe(obs) + input.pipe(ops.map(on_next), ops.filter(lambda x: not x.df.empty)).subscribe(output) node = seg.make_node_full(self.unique_name, node_fn) seg.make_edge(stream, node)