Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-1667] Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs #341

Closed
tamsanh opened this issue Apr 25, 2020 · 4 comments

Comments

@tamsanh
Copy link
Contributor

tamsanh commented Apr 25, 2020

Introduction

A high-level, short overview of the problem(s) you are designing a solution for

In an ideal world, where limits in compute and I/O don't exist, I think kedro is the perfect framework for writing data pipeline systems. However, as a Data Engineer, I must face these problems constantly. In order to optimize this work, I require highly contextual related to a node's operation. However, I feel that kedro does not offer sufficient built-in utility to address this, and actively inhibits that effort by enforcing direct acyclic graph structures on our pipeline, whereas data I need is cyclic in nature. As a result, I spend extra engineering effort in order to hand-craft brittle compensation measures. However, I believe there is a straightforward solution to that addresses this problem, without sacrificing any of our values.

Background

Provide the reader with the context surrounding the problem(s) you are trying to solve.

The world is stateful and, whether we like it or not, kedro does, more often than not, eventually change world state. Unfortunately, kedro currently lacks the ability to directly access and react to the way it changes the state of the world around itself. Rather, it must instead recompute the nuances that it drives, from its dataset sources. This is a difficult task, and prompts developers to invest time into writing custom datasets which keep track of that world state, whilst simultaneously modifying. The problem with this is that, as far as I know, an acceptable standard to follow for how that tracking should be done is unclear. This leads to logic and code to write this world-state modification and tracking becoming cryptic and opaque, exacerbated by the fact that it is usually implemented behind the dataset _save and _load function calls, which are almost an after-thought when one thinks about a kedro pipeline. Given that the true power of kedro is its software engineering best practices applied by its framework, I believe that this aspect of data pipelines should also be addressed, and rather than letting them fester in the shadows, we can bring them in the open and care for them.

Problem

What follows are two categories of real world problems that kedro engineers must face, currently not solved by kedro.

Preservation of Past Work:
1.1. There is a node that takes a lot of computation to output.
1.2. There is an IO process that takes a lot of time to read.
1.3. There are similar, independent, long-running, high-failure-rate operations.

Limitation from Past Work:
2.1. There is an API that has time-based call limits.
2.2. There is an operation that must be called only once.

The theme with all of these that there exists data required to decide what operations to do next.

For the Preservation group, the data is regarding “what work has been completed that I can take advantage of.”

For the Limitation group, the data is regarding “what work has been completed that I am not allowed to do."

The way that this is currently being solved is that these extra data bits are being loaded and stored as a side effect of the _save and _load functions. Obviously, we want to avoid this as much as possible.

Concrete Example

Let's say I have a kedro pipeline that reads data from a JDBC connection, partitions it by dates, encodes it as parquet, and uploads it to a data lake. The connection to JDBC is slow, the connection to the data lake is highly prone to failure, and we are pressed for time. I decide to write a pipeline that downloads, encodes, and uploads in parallel, without overwriting its previous work.

How will it know when previous work is done? We cannot rely on the existence of the partition folder in the data lake, because if an upload failed midway, the folder would exist, but partitions inside the folder would be incomplete. We must instead have separate audit data that keeps track of the true, completed work.

How do we implement this audit data? This is where it gets tricky. Due to the atemporal DAG constraints in kedro, I cannot create a dataset in the catalog that I can use to save and load the marking data without creating two separate datasets pointing to the same file location. This is a clear hack, at the mercy of anyone who modifies the catalog.yml in the future, as well as the mechanics of the Pipeline topology sort itself.

To keep this separate, I must instead create a custom dataset that internally loads and saves the audit data on its own. This not only breaks DAG rules, but also contradicts the spirit of a DataSet, which should be limited to serialization.

Thus, by some measure, we must break kedro's framework in order to implement the marking data. I am not shy of breaking framework rules, when there are rare exceptions, but this use case has thus far been proven to be far from rare, and I encounter it in almost every pipeline I build.

What's not in scope

  1. How people design their state tracking
  2. What is being tracked in state

Design

Explain your design to the solution here. Diagrams could help.

Philosophy

Kedro is a framework for dealing with data, so why not instead accept our audit data as proper data to be considered? Philosophically, it’s something that we seem to be against.

An implementation where data in the latter part of the pipeline affects data in the former breaks the idea of a proper Direct Acyclic Graph (DAG). I whole heartedly support DAGs, but I believe we may be making an assumption that is putting undue constraints on their flexibility.

That is: We have always thought of DAGs as atemporal, without time. However, I think that if we bring time into the equation, we can solve this problem.

If you think about it, this is similar to why we are already doing dataset transcoding: it allows us to think about our data in different physical representation, to manipulate it as appropriate.

What if we instead we created a method to allow us to think about our data in different temporal representations? I'd like to call it: chronocoding.

Imagine the following example, where there are three datasets, A, B, C, and that the output of C would directly affect A.
Of course, this situation is untenable.

+---------------+
|               |
V               |
A +---> B +---> C

However, when we consider the time dimension, the graph changes.
Below, A represents A at T (current run) while A’ is A at T+1 (next run).

A +---> B +---> C +---> A'

Now, we are no longer arrested by a cyclic dependency; instead, the order of operations is still clear.

Practically

What this means is that we would introduce a new representation of datasets in our pipelines. One that would indicating we are operating in a T+1 space.

I propose that this be represented by a ! at the end of a dataset's name, and only for output datasets. This way, a user knows that the _save function they are calling will be available for _load in the next run of the kedro pipeline.

The ! was chosen as it is indicates the operation requires extra attention. The idea is similar to Ruby’s “bang methods,” where instance methods that end in a ! are modifying the instance directly. In the kedro case, we would be modifying the future state of the dataset.

Application

This then allows us to very cleanly solve all the problems from above, demonstrated here with pseudo code.

Preservation of Past Work:
1.1. There is a node that takes a lot of computation to output.
1.2. There is an IO process that takes a lot of time to read.
1.3. There are similar, independent, long-running, high-failure-rate operations.

node(big_compute, inputs=[“data”, “compute_history”], outputs=“compute_history!”)

def big_compute(data, hist):
    # filter data by hist
    # compute
    # calculate new history
    return new_hist

Limitation from Past Work:
2.1. There is an API that has time-based call limits.
2.2. There is an operation that must be called only once.

node(limited_call, inputs=[“payload”, “call_history”], outputs=“call_history!”)

def limited_call(payload, hist):
    # compare history with threshold
    # operate or not, based on comparison
    return new_hist

This very clearly brings our side-affecting code right out in the open, pulling them into kedro itself. The historical metadata can then take advantage of all that kedro has to offer, in terms of datasets, without requiring any additional syntax magic.

Thus, datasets can go back to focusing on what they do best, which is the serialization of data, and nodes can now do more of what they do best, which is the computation and operations on the data, all the while making it so much easier to test exactly what effect will the temporal state have on our computations.

Thus, we fully take advantage of all the benefits kedro currently celebrates, while cleanly introducing functionality to support workflows that would otherwise would be implemented sub-optimally.

Alternatives considered

Explain the trade off between different alternatives to your solution.

IncrementalDataSet

The obvious alternative here is the IncrementalDataSet, which I believe is an excellent solution for addressing incrementally updated data, but not all data is incremental in nature.

This also brings up another question around whether or not the kedro team wants to address the unmentioned natures of data.

Confirms

The other alternative is to override the confirms method on custom datasets. Indeed, it is an indirect way of reaching back up the pipeline, to hand off information to previous nodes in order to inform future runs.

However, the issue is that confirms function ends up being removed from the actual operations we wish to track, and as a result loses the critical pieces of the context puzzle we wish to track.

It also pushes our data filtering back behind "closed doors," hidden away in dataset operations, where they will eventually cause problems.

Manual Additions to catalog.yml

This works well, but meticulous care must be taken to ensure sure that:

  1. The filepaths for chronocoded datasets are the same.
  2. The topology is valid.

Which doesn't seem like a lot, but even for a small pipeline, it can get tedious.

Testing

Explain the testing strategies to verify your design correctness (if possible).

We just need to test that the new syntax is picked up by the Pipeline and runners.

Rollout strategy

Is the change backward compatible? If not, what is the migration strategy?

Yes, fully backward compatible, as long as nobody has an ! at the end of their datasets.

We must also continue to enforce the uniqueness of an output, i.e. two nodes may not output to the same dataset, regardless of its temporality.

We must also take into account the topological sort of this new time dimension.

Future iterations

Will there be future iterations of this design?

It’s possible that chronocoded datasets will require configuration that is separate to the present dataset. We may need to implement transcoding on top of chronocoding, but I can't think of a true use case for it at this time.

@921kiyo
Copy link
Contributor

921kiyo commented May 13, 2020

@tamsanh Thank you for the great suggestion and apologies for being late in response.

There are lots of information and suggestion for chronocoding in your descriptions (which is really interesting!), but I would like to clarify the problem from your concrete example first, before thinking about the solution.

Regarding the concrete example you faced with JDBC connection, am I correct that the problem was there was no easy way for you to know which nodes (load data->execute node func -> save data) the runner actually did run successfully/failed? If you could tell that somehow (either with audit data or some other forms), you could try to run the failed part of your pipeline, and the problem would be solved?

I decide to write a pipeline that downloads, encodes, and uploads in parallel, without overwriting its previous work.

Would your Kedro pipeline complete without any errors if upload failed midway, or something went wrong?

@921kiyo 921kiyo changed the title Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs [KED-1667] Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs May 13, 2020
@tamsanh
Copy link
Contributor Author

tamsanh commented Jun 10, 2020

Oh @921kiyo, just realized I didn't see your comment!

Yes, the problem was that there was no easy way to know what data was saved or failed. Additionally, the operation is very slow to execute serially, and needed to be parallelized.

Chronocoded data here is effectively that auditing data, to tell what was saved and what was not.

In this case, the kedro pipeline would complete a run without failure if some of the operations failed, to be lenient when encountering some instability in the system. We were dealing with hundreds of tables from dozens of different systems, and a few data stores, so missing some data in favor of getting most data was preferable.

@tamsanh
Copy link
Contributor Author

tamsanh commented Jun 18, 2020

Hey team, just a quick update, I was able to put this behavior into the newest version of my pipelines-made-easy plugin Kedro Wings.

Works perfectly fine thanks to our lovely hooks API.
Video on usage here: https://youtu.be/n--1fO-NxC0

pull bot pushed a commit to FoundryAI/kedro that referenced this issue Jul 17, 2020
@yetudada
Copy link
Contributor

We suggest using @tamsanh's work on this one. We'll update you if this ever becomes a core part of Kedro. For now, I'll close this ticket. Thanks for Kedro Wings @tamsanh!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants