Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
in progress
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
  • Loading branch information
pdxjohnny committed Mar 26, 2022
1 parent 1d56926 commit f7a433a
Showing 1 changed file with 69 additions and 13 deletions.
82 changes: 69 additions & 13 deletions docs/arch/0004-DataFlow-as-Class.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ which takes
return new_fn
def config_to_new_fn(
new_fn: Callable,
) -> Callable:
# The function which will be in charge of running the dataflow
# We will fixup it's method signature using inspect.
def func_run_dataflow(self, *args, **kwargs):
return self._func_run_dataflow(*args, **kwargs)
func_run_dataflow = gen_fn(args)
# {'a': int}
return func_run_dataflow
records = [
dffml.Record("a", data={"features": {"one": 42}}),
dffml.Record("b", data={"features": {"one": 420}}),
Expand All @@ -137,15 +152,56 @@ which takes
class DataFlowClass:
pass
@classmethod
def _add_dataflow_method(cls, methods):
pass
dffml.DataFlow._add_dataflow_method = _add_dataflow_method
@classmethod
def _add_dataflow_method(cls, name, config: RunCMDConfig):
"""
@config
class RunCMDConfig:
dataflow: str = field(
"File containing exported DataFlow", required=True,
)
configloader: BaseConfigLoader = field(
"ConfigLoader to use for importing DataFlow", default=None,
)
sources: Sources = FIELD_SOURCES
caching: List[str] = field(
"Skip running DataFlow if a record already contains these features",
default_factory=lambda: [],
)
no_update: bool = field(
"Update record with sources", default=False,
)
no_echo: bool = field(
"Do not echo back records", default=False,
)
no_strict: bool = field(
"Do not exit on operation exceptions, just log errors", default=False,
)
orchestrator: BaseOrchestrator = field(
"Orchestrator", default=MemoryOrchestrator,
)
inputs: List[str] = field(
"Other inputs to add under each ctx (record's key will "
+ "be used as the context)",
action=ParseInputsAction,
default_factory=lambda: [],
)
record_def: str = field(
"Definition to be used for record.key."
+ "If set, record.key will be added to the set of inputs "
+ "under each context (which is also the record's key)",
default=False,
)
"""
setattr(
cls,
name,
config_to_new_fn(
name,
config,
dataflow,
)
)
# TODO Implement calling lambda's defined within config field metadata
Expand Down Expand Up @@ -206,7 +262,7 @@ which takes
)
# Check if we need to set config object for __init__ as CONFIG to
# support DFFML plugin classes
plugin_cls = dffml.DataFlow.as_class(
plugin_cls = cls.as_class(
name,
dataflow,
config_cls,
Expand Down Expand Up @@ -320,8 +376,8 @@ which takes
dbm_source_dataflow,
DBMSourceConfig,
{
"record": (
dffml.DataFlow(
"record": {
"dataflow": dffml.DataFlow(
dffml.GetSingle,
seed=[
dffml.Input(
Expand All @@ -330,7 +386,7 @@ which takes
),
],
),
),
},
}
)
Expand Down

0 comments on commit f7a433a

Please sign in to comment.