Skip to content

Commit

Permalink
Feature/python executor (early sight version) (#26)
Browse files Browse the repository at this point in the history
* Ideas for an actor-based implementation of the executor

* Notes on execution graph builder

* Start sketching out ideas for V2 engine using actors

* Add dependency on PyYAML for YAML config files

* Utility methods for setting up loggers

* Config parser for converting raw JSON/YAML into config classes

* Add sys config classes (until these are generated from proto files)

* Add node type for graph processing in engine V2

* Start sketching out a top level class for the runtime

* Stubs for launch package

* Test cases to run the Python examples from doc/

* Fixes for the HelloPandas example

* Add run configs for Python runtime

* V2 engine rough work

* Flesh out ideas for the graph processor

* Work on actors and V2 engine

* Set minimum Python version to 3.7 in CI build

* Minor fixes

* Actors scaffolding to let a job begin

* Fill in one basic test case for actors

* Actor tidy ups, stop test from waiting until shutdown is built

* ActorSystem shutdown sequence

* Timeouts for basic build jobs (kill jobs if there are deadlocks exposed by the tests)

* Consider stubs for actor system tests

* Stubs for actor system tests

* Remove old engine implementation notes

* Rename V2 engine implementation module

* Engine tweaks

* Add some comments in engine.py

* Sketch out more of the engine scaffolding

* Skeleton from launch_model to node processor

* Use a single storage key in hello_pandas sample config

* Explicitly load module and launch model in trac_example test

* Make all fields optional in generated domain classes
(this is temporary, domain class generator to be updated to use dataclasses)

* API naming changes in TracContext

* Stub implementation of TracContext

* Python runtime - engine updates

* Add hello world example - model with just parameters, no inputs/outputs - for working through the skeleton framework

* Handle optional types in config_parser

* Engine framework loads model and enters run_model

* Write model logs to the console

* Pass job status back to the main engine actor

* Shut down the engine after a batch run

* Actor system - Failure processing for a single (root) actor

* Actor system - message signature checking

* Support new-style deferred type annotations in message signature checking

* Make config parser work with deferred type annotations

* Add unit tests for child actors and signals

* Work on the actor system

* Log colour highlighting

* Actor updates

* Actors fix: Stop sibling check

* Actors: Fix duplicate child failure signals

* Clean up engine shutdown sequence

* Actors: Fix partial application of self

* Actors: Implement error propagation

* Make hello world model pick up parameters from job config

* Fix and enable test for hello_pandas example

* Initial work on the graph and graph builder

* Execution graph updates

* Update to NodeProcessor and NodeIds

* Improve error propagation in the graph processor

* Error propagation

* Runtime data storage work

* Data / storage definition updates

* Python runtime code generator updates

* Work on the storage implementation

* Latest python runtime work

* Generate dev mode input definitions

* Fire the LoadDataFunc node in the engine

* Hello Pandas model fixes

* Pass input data through to main model API

* Config parsing for dicts of mixed primitive values

* Stricter config parsing for primitive values

* Pandas outputs into local model context

* Dev mode translation for job outputs

* Start work on data save

* Work on execution graph for outputs, use granular items and mappings

* Set up local context for model execution by filtering the job's graph context to the model namespace

* Use metadata to determine locations of outputs, prevent output overwriting

* Create unique snaps for dev mode outputs on each run of the model (do not overwrite data)

* Change example sys_config to work when run from the project root directory

* Assign a job ID in dev mode

* Tidy up ordering and logging for the execution graph

* Rename runtime config -> system config for Python runtime

* Update metadata service and tests to handle new storage definition object type in metadata

* Run python example models as part of the basic build in CI

* Config parser: Support Python 3.7 in handling of generic type info

* Add explicit job dependency for models with no inputs/outputs

* Use model class name in model node ID

* Update readme for the Python model runtime
  • Loading branch information
Martin Traverse authored Mar 3, 2021
1 parent 54b0f66 commit 2ef33e5
Show file tree
Hide file tree
Showing 46 changed files with 4,943 additions and 97 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/basic-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ jobs:
platform_build:

runs-on: ubuntu-latest
timeout-minutes: 20

steps:

Expand All @@ -30,6 +31,7 @@ jobs:
python_runtime:

runs-on: ubuntu-latest
timeout-minutes: 20

# Testing targets for the Python model runtime
# Include the latest stable release (3.9)
Expand All @@ -50,8 +52,8 @@ jobs:
PANDAS_VERSION: ">= 1.1.0, < 2.0.0",
PYPANDOC: false }

- { ENV_NAME: "Oldest: Python 3.6, PySpark 2.4.0, Pandas 1.0.0",
PYTHON_VERSION: "3.6",
- { ENV_NAME: "Oldest: Python 3.7, PySpark 2.4.0, Pandas 1.0.0",
PYTHON_VERSION: "3.7",
PYSPARK_VERSION: "== 2.4.0",
PANDAS_VERSION: "== 1.0.0",
PYPANDOC: true }
Expand Down Expand Up @@ -99,6 +101,13 @@ jobs:
export PYTHONPATH=trac-runtime/python/test:$PYTHONPATH
python -m unittest discover -s trac-runtime/python/test/trac_test -t trac-runtime/python/test
- name: Python example models
run: |
export PYTHONPATH=trac-runtime/python/generated
export PYTHONPATH=trac-runtime/python/src:$PYTHONPATH
export PYTHONPATH=trac-runtime/python/test:$PYTHONPATH
python -m unittest discover -s trac-runtime/python/test/trac_examples -t trac-runtime/python/test
- name: Build packages
run: |
cd trac-runtime/python
Expand Down
23 changes: 23 additions & 0 deletions dev/ide/idea/runConfigurations/Python_Runtime__Codegen.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Python Runtime: Codegen" type="PythonConfigurationType" factoryName="Python">
<module name="python-model-runtime" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/trac-runtime/python/codegen" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="false" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/trac-runtime/python/codegen/protoc-ctrl.py" />
<option name="PARAMETERS" value="--domain" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
17 changes: 17 additions & 0 deletions dev/ide/idea/runConfigurations/Python_Runtime__Examples.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Python Runtime: Examples" type="tests" factoryName="Unittests">
<module name="python-model-runtime" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="false" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="_new_pattern" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;discover -s trac-runtime/python/test/trac_examples -t trac-runtime/python/test&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;CUSTOM&quot;" />
<method v="2" />
</configuration>
</component>
17 changes: 17 additions & 0 deletions dev/ide/idea/runConfigurations/Python_Runtime__Unit_Tests.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Python Runtime: Unit Tests" type="tests" factoryName="Unittests">
<module name="python-model-runtime" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="false" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="_new_pattern" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;discover -s trac-runtime/python/test/trac_test -t trac-runtime/python/test&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;CUSTOM&quot;" />
<method v="2" />
</configuration>
</component>
16 changes: 8 additions & 8 deletions doc/examples/models/python/hello_pandas/hello_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,31 @@ def run_model(self, ctx: trac.TracContext):
default_weighting = ctx.get_parameter("default_weighting")
filter_defaults = ctx.get_parameter("filter_defaults")

customer_loans = ctx.get_pandas_dataset("customer_loans")
customer_loans = ctx.get_pandas_table("customer_loans")

if filter_defaults:
customer_loans = customer_loans[customer_loans["loan_condition_cat"] == 0]
customer_loans.loc[:, :] = customer_loans[customer_loans["loan_condition_cat"] == 0]

customer_loans["gross_profit_unweighted"] = \
customer_loans.loc[:, "gross_profit_unweighted"] = \
customer_loans["total_pymnt"] - \
customer_loans["loan_amount"]

customer_loans["gross_profit_weighted"] = \
customer_loans.loc[:, "gross_profit_weighted"] = \
customer_loans["gross_profit_unweighted"] * \
customer_loans["loan_condition_cat"] \
.apply(lambda c: default_weighting if c > 0 else 1.0)

customer_loans["gross_profit"] = \
customer_loans["gross_profit_eur"] \
customer_loans.loc[:, "gross_profit"] = \
customer_loans["gross_profit_weighted"] \
.apply(lambda x: x * eur_usd_rate)

profit_by_region = customer_loans \
.groupby("region", as_index=False) \
.aggregate({"gross_profit": "sum"})

ctx.put_pandas_dataset("profit_by_region", profit_by_region)
ctx.put_pandas_table("profit_by_region", profit_by_region)


if __name__ == "__main__":
import trac.launch as launch
import trac.rt.launch as launch
launch.launch_model(HelloPandas, "hello_pandas.yaml", "examples/sys_config.yaml")
6 changes: 2 additions & 4 deletions doc/examples/models/python/hello_pandas/hello_pandas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ parameters:
filter_defaults: false

inputs:
customer_loans:
storage_key: "example_inputs"
path: "loan_final313_100.csv"
customer_loans: "inputs/loan_final313_100.csv"

outputs:
profit_by_region: "hello_pandas/profit_by_region.csv"
profit_by_region: "outputs/hello_pandas/profit_by_region.csv"
6 changes: 3 additions & 3 deletions doc/examples/models/python/hello_pyspark/hello_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_model(self, ctx: trac.TracContext):
default_weighting = ctx.get_parameter("default_weighting")
filter_defaults = ctx.get_parameter("filter_defaults")

customer_loans = ctx.get_spark_sql_dataset("customer_loans")
customer_loans = ctx.get_spark_table("customer_loans")

if filter_defaults:
customer_loans = customer_loans.filter(f.col("loan_condition_cat") == 0)
Expand All @@ -80,9 +80,9 @@ def run_model(self, ctx: trac.TracContext):
.groupBy("region") \
.agg(f.col("region"), f.sum(f.col("gross_profit")).alias("gross_profit"))

ctx.put_spark_sql_dataset("profit_by_region", profit_by_region)
ctx.put_spark_table("profit_by_region", profit_by_region)


if __name__ == "__main__":
import trac.launch as launch
import trac.rt.launch as launch
launch.launch_model(HelloPyspark, "hello_pyspark.yaml", "examples/sys_config.yaml")
41 changes: 41 additions & 0 deletions doc/examples/models/python/hello_world/hello_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2021 Accenture Global Solutions Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing as tp
import trac.rt.api as trac


class HelloWorldModel(trac.TracModel):

def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:

return trac.define_parameters(

trac.P("meaning_of_life", trac.BasicType.INTEGER,
label="The answer to the ultimate question of life, the universe and everything"))

def define_inputs(self) -> tp.Dict[str, trac.TableDefinition]:

return {}

def define_outputs(self) -> tp.Dict[str, trac.TableDefinition]:

return {}

def run_model(self, ctx: trac.TracContext):

ctx.log().info("Hello world model is running")

meaning_of_life = ctx.get_parameter("meaning_of_life")
ctx.log().info(f"The meaning of life is {meaning_of_life}")
3 changes: 3 additions & 0 deletions doc/examples/models/python/hello_world/hello_world.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

parameters:
meaning_of_life: 42
23 changes: 8 additions & 15 deletions doc/examples/models/python/sys_config.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@

storageSettings:
defaultStorge: example_outputs
defaultStorage: example_data
defaultFormat: CSV

storage:

example_inputs:
example_data:

storageType: LOCAL_STORAGE
storageConfig:
rootPath: ./data/inputs

example_outputs:

storageType: LOCAL_STORAGE
storageConfig:
rootPath: ./data/outputs

rootPath: doc/examples/models/python/data

sparkSettings:

sparkConfig:
[spark.cores.max]: 4
[spark.driver.memory]: 2G
[spark.executor.memory]: 2G
[spark.default.parallelism]: 8
[spark.sql.shuffle.partitions]: 8
spark.cores.max: 4
spark.driver.memory: 2G
spark.executor.memory: 2G
spark.default.parallelism: 8
spark.sql.shuffle.partitions: 8
43 changes: 38 additions & 5 deletions trac-api/trac-metadata/src/main/proto/trac/metadata/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,46 @@ message TableDefinition {
repeated FieldDefinition field = 1;
}



message DataDefinition {

TableDefinition schema = 1;
enum PartType {
PART_ROOT = 0;
PART_BY_RANGE = 1;
PART_BY_VALUE = 2;
}

message PartKey {

string opaqueKey = 1;

PartType partType = 2;
repeated Value partValues = 3;
Value partRangeMin = 4;
Value partRangeMax = 5;
}

message Delta {

repeated string storage = 2;
map<string, Value> storageInfo = 3;
uint32 deltaIndex = 1;
string dataItemId = 2;
}

message Snap {

uint32 snapIndex = 1;
repeated Delta deltas = 2;
}

message Part {

PartKey partKey = 1;
Snap snap = 2;
}

TableDefinition schema = 1;
map<string, Part> parts = 2;

string path = 4;
DataFormat format = 5;
string storageId = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import "trac/metadata/flow.proto";
import "trac/metadata/job.proto";
import "trac/metadata/file.proto";
import "trac/metadata/custom.proto";
import "trac/metadata/stoarge.proto";


/**
Expand Down Expand Up @@ -72,5 +73,6 @@ message ObjectDefinition {
JobDefinition job = 5;
FileDefinition file = 6;
CustomDefinition custom = 7;
StorageDefinition storage = 8;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum ObjectType {
JOB = 4;
FILE = 5;
CUSTOM = 6;
STORAGE = 7;
}


Expand Down
Loading

0 comments on commit 2ef33e5

Please sign in to comment.