Skip to content

Commit

Permalink
Merge branch 'mdd_kafka-retry' into 'branch-0.2.1-EA'
Browse files Browse the repository at this point in the history
Adding retry to from-kafka and fixing dropna

Closes nv-morpheus#26

See merge request morpheus/morpheus!73
  • Loading branch information
mdemoret-nv committed Dec 14, 2021
2 parents c64120a + 7c2920f commit facd27c
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 285 deletions.
3 changes: 1 addition & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-whitelist=
extension-pkg-allow-list=neo,morpheus._lib

# Specify a score threshold to be exceeded before program exits with error.
fail-under=10
Expand Down Expand Up @@ -603,4 +603,3 @@ valid-metaclass-classmethod-first-arg=cls
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception

45 changes: 5 additions & 40 deletions morpheus/pipeline/general_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import cupy as cp
import neo
from neo.core import operators as ops
from tqdm import tqdm

import cudf
Expand Down Expand Up @@ -148,26 +149,7 @@ def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPai
# Store all messages until on_complete is called and then push them
def node_fn(input: neo.Observable, output: neo.Subscriber):

delayed_messages = []

def obs_on_next(x):

delayed_messages.append(x)

def obs_on_error(x):
output.on_error(x)

def obs_on_completed():

# Now push all the messages
for x in delayed_messages:
output.on_next(x)

output.on_completed()

obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed)

input.subscribe(obs)
input.pipe(ops.to_list(), ops.flatten()).subscribe(output)

node = seg.make_node_full(self.unique_name, node_fn)
seg.make_edge(input_stream[0])
Expand Down Expand Up @@ -468,30 +450,13 @@ def filter(self, x: MultiResponseProbsMessage) -> typing.List[MultiResponseProbs

def _build_single(self, seg: neo.Segment, input_stream: StreamPair) -> StreamPair:

# Reduce messages to only have detections
stream = seg.make_node(self.unique_name, self.filter)

seg.make_edge(input_stream[0], stream)

# Convert list back to single MultiResponseProbsMessage
def flatten_fn(input: neo.Observable, output: neo.Subscriber):
def obs_on_next(x: typing.List):

for y in x:
output.on_next(y)

def obs_on_error(x):
output.on_error(x)

def obs_on_completed():
output.on_completed()

obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed)

input.subscribe(obs)
input.pipe(ops.map(self.filter), ops.flatten()).subscribe(output)

flattened = seg.make_node_full(self.unique_name + "-flatten", flatten_fn)
seg.make_edge(stream, flattened)
flattened = seg.make_node_full(self.unique_name, flatten_fn)
seg.make_edge(input_stream[0], flattened)
stream = flattened

return stream, MultiResponseProbsMessage
Expand Down
15 changes: 4 additions & 11 deletions morpheus/pipeline/inference/inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import cupy as cp
import neo
from neo.core import operators as ops
from tornado.ioloop import IOLoop

from morpheus.config import Config
Expand Down Expand Up @@ -252,7 +253,7 @@ def py_inference_fn(input: neo.Observable, output: neo.Subscriber):

outstanding_requests = 0

def obs_on_next(x: MultiInferenceMessage):
def on_next(x: MultiInferenceMessage):
nonlocal outstanding_requests

batches = self._split_batches(x, self._max_batch_size)
Expand Down Expand Up @@ -283,17 +284,9 @@ def set_output_fut(resp: ResponseMemoryProbs, b, f: neo.Future):
for f in fut_list:
f.result()

output.on_next(output_message)
return output_message

def obs_on_error(x):
output.on_error(x)

def obs_on_completed():
output.on_completed()

obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed)

input.subscribe(obs)
input.pipe(ops.map(on_next)).subscribe(output)

assert outstanding_requests == 0, "Not all inference requests were completed"

Expand Down
62 changes: 24 additions & 38 deletions morpheus/pipeline/input/from_cloudtrail.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import neo
import numpy as np
import pandas as pd
from neo.core import operators as ops

from morpheus._lib.common import FiberQueue
from morpheus.config import Config
Expand Down Expand Up @@ -418,44 +419,29 @@ def _post_build_single(self, seg: neo.Segment, out_pair: StreamPair) -> StreamPa
out_stream = out_pair[0]
out_type = out_pair[1]

# At this point, we have batches of filenames to process. Make a node for processing batches of filenames into
# batches of dataframes
filenames_to_df = seg.make_node(
self.unique_name + "-filetodf",
partial(
self.files_to_dfs_per_user,
file_type=self._file_type,
userid_column_name=self._user_column_name,
feature_columns=None, # Use None here to leave all columns in
userid_filter=self._userid_filter,
repeat_count=self._repeat_count))
seg.make_edge(out_stream, filenames_to_df)

# Now group the batch of dataframes into a single df, split by user, and send a single UserMessageMeta per user
df_to_meta = seg.make_node(self.unique_name + "-usersplit", self._build_user_metadata)
seg.make_edge(filenames_to_df, df_to_meta)

# Finally, flatten to a single stream
def flatten_fn(input: neo.Observable, output: neo.Subscriber):
def obs_on_next(x: typing.List):

for y in x:
output.on_next(y)

def obs_on_error(x):
output.on_error(x)

def obs_on_completed():
output.on_completed()

obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed)

input.subscribe(obs)

flattened = seg.make_node_full(self.unique_name + "-post", flatten_fn)
seg.make_edge(df_to_meta, flattened)

out_stream = flattened
def node_fn(input: neo.Observable, output: neo.Subscriber):

input.pipe(
# At this point, we have batches of filenames to process. Make a node for processing batches of
# filenames into batches of dataframes
ops.map(
partial(
self.files_to_dfs_per_user,
file_type=self._file_type,
userid_column_name=self._user_column_name,
feature_columns=None, # Use None here to leave all columns in
userid_filter=self._userid_filter,
repeat_count=self._repeat_count)),
# Now group the batch of dataframes into a single df, split by user, and send a single UserMessageMeta
# per user
ops.map(self._build_user_metadata),
# Finally flatten to single meta
ops.flatten()).subscribe(output)

post_node = seg.make_node_full(self.unique_name + "-post", node_fn)
seg.make_edge(out_stream, post_node)

out_stream = post_node
out_type = UserMessageMeta

return super()._post_build_single(seg, (out_stream, out_type))
21 changes: 5 additions & 16 deletions morpheus/pipeline/input/from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import neo
import typing_utils
from neo.core import operators as ops

import morpheus._lib.stages as neos
from morpheus.config import Config
Expand Down Expand Up @@ -110,23 +111,11 @@ def _post_build_single(self, seg: neo.Segment, out_pair: StreamPair) -> StreamPa
# Convert our list of dataframes into the desired type. Flatten if necessary
if (typing_utils.issubtype(out_type, typing.List)):

def flatten_fn(input: neo.Observable, output: neo.Subscriber):
def obs_on_next(x: typing.List):
def node_fn(input: neo.Observable, output: neo.Subscriber):

for y in x:
output.on_next(y)
input.pipe(ops.flatten()).subscribe(output)

def obs_on_error(x):
output.on_error(x)

def obs_on_completed():
output.on_completed()

obs = neo.Observer.make_observer(obs_on_next, obs_on_error, obs_on_completed)

input.subscribe(obs)

flattened = seg.make_node_full(self.unique_name + "-post", flatten_fn)
flattened = seg.make_node_full(self.unique_name + "-post", node_fn)
seg.make_edge(out_stream, flattened)
out_stream = flattened
out_type = typing.get_args(out_type)[0]
Expand All @@ -139,7 +128,7 @@ def _generate_frames(self):
self._filename,
self._file_type,
filter_nulls=True,
df_type="pandas",
df_type="cudf",
)

count = 0
Expand Down
Loading

0 comments on commit facd27c

Please sign in to comment.