Incremental processing or streaming in micro-batches #5917
Replies: 47 comments 10 replies
-
Hi @kskyten |
Beta Was this translation helpful? Give feedback.
-
Let's say you have a folder of images and you define a computationally intensive pipeline for them. After having defined this pipeline you acquire new images that you want to process. This corresponds to the inter-datum incrementality in Pachyderm, if I understood correctly. |
Beta Was this translation helpful? Give feedback.
-
We are going to implement Dependency to directory #154 which is a little bit related to this incremental feature. But it is not enough. This is a good scenario. Let's keep this feature request - it will be implemented. We just need to decide a priority. |
Beta Was this translation helpful? Give feedback.
-
@dmpetrov , are we there yet or we should keep this open? |
Beta Was this translation helpful? Give feedback.
-
@MrOutis Yep, this should work now by using Closing, please feel free to reopen. |
Beta Was this translation helpful? Give feedback.
-
Guys, this is actually a request for streaming\micro-batch processing. Scenario: I have a dir with images and I'd like to process each image with It is mostly data engineering scenario, not data science. So, we should think carefully if DVC should handle this or not. But it is still a valid case and it will be especially useful when we implement multi-processing #755. So, I'm renaming and reopening. |
Beta Was this translation helpful? Give feedback.
-
Thanks, @dmpetrov |
Beta Was this translation helpful? Give feedback.
-
Hi, has this eventually been implemented ? |
Beta Was this translation helpful? Give feedback.
-
Hi @Pierre-Bartet, it have not been yet implemented. Would you find this feature useful? Could you describe your use case? |
Beta Was this translation helpful? Give feedback.
-
@pared Use cases equivalent to the one already described:
In fact this is exactly at this point that data version control tools could become really useful because the situation is messier than in the 'vanilla' use case. AFAIK Pachyderm is the only tool with this kind of feature but it is super heavy. I think what is needed is to be able to
So that a dvc repro would only run on needed files. The hard part is that:
could both mean:
So I guess the '--d' should be replaced by something else to distinguish between those two cases. |
Beta Was this translation helpful? Give feedback.
-
I don't think that distinguishing whether to apply a given code on each image or all images should be Having said that I think that we could implement this feature (applying on "new" data) for directory dependencies, but it would need to be users' conscious decision to use it. Some notes:
After writing this, it seems to me that my proposition for this solution is wrong since it can easily be misused. Leaving this here for further discussion. Maybe we could implement this as |
Beta Was this translation helpful? Give feedback.
-
@Pierre-Bartet @pared an interesting discussion. I also don't see a way how DVC being completely agnostic to the way code and files are organized now can solve this now. Thinking about the right abstraction for the job, I have first the question back: Let's imaging we do:
How does this script organized internally? How does it read the data from this I'm asking because right now like I mentioned we don't dictate any of that. But when we start doing things incrementally how do pass that delta information to the script? How will it know that it needs to read only certain files from the directory? @Pierre-Bartet do you have some ideas that comes to your mind? |
Beta Was this translation helpful? Give feedback.
-
I've been thinking about it but I see no solution that would not totally change DVC's behaviour, mostly because of what you just said:
The behaviour I had in mind was to have the dependencies given as arguments to the scripts (here computation.py) so that dvc run could either:
But that is indeed a huge deviation from the current dvc behaviour |
Beta Was this translation helpful? Give feedback.
-
This is also a feature which I would like to see handled better with DVC if possible. Currently, the only way to avoid expensive re-computation on partial data sets when adding new data is to bypass DVC altogether, execute the processing on only the new files, and then run dvc commit on the results -- which seems rather to detract from the whole point of using DVC in the first place to track changes to datasets. And yet, it must be possible to avoid re-processing tens of thousands of files just because a single new file was added to an input directory. @Pierre-Bartet, my thoughts for a possible behaviour were also in the same direction as yours, passing [delta] dependencies to the script being run. A gentler approach along the same lines may be the following:
This is for sure "advanced" behaviour and requires users to be aware of the existence of the index. But at least it doesn't mean DVC having to change its behaviour in regard to how it calls commands with dvc run. I realise this suggestion isn't necessarily entirely thought through, for which I apologise. Please feel free to tear it down where necessary :) |
Beta Was this translation helpful? Give feedback.
-
I find this feature really useful 😅 The simplest scenario that I can imagine is by giving the user a way to tell which files are mkdir data
for i in {1..3}; do echo $i > data/$i; done
dvc add data When you add more files: for i in {4..10}; do echo $i > data/i; done Doing
that way the user would iterate over all of those files and run the respective command and then The implementation for from dvc.repo import Repo
from dvc.stage import Stage
repo = Repo('.')
stage = Stage.load(repo, 'data.dvc')
out = stage.outs[0]
path = out.path_info
cached = set(path / info['relpath'] for info in out.dir_cache)
current = set(out.remote.walk_files(path))
new = cached - current
for path in new:
print(new) We could start with a simple implementation and add more features depending on users request, what do you think? A lot of manual work, but if you want to process everything with a single call, like @Pierre-Bartet suggested in #331 (comment), you can link those files to a new directory and use it as an argument for your script: new_stuff=$(mktemp -d)
for file in $(dvc ls --new data.dvc); do
ln -s $(realpath $file) $new_stuff
done
python script.py $new_stuff && rm -rf $new_stuff |
Beta Was this translation helpful? Give feedback.
-
@shcheklein : Indeed what we've been discussing with @Suor is a different kind of 'incremental' |
Beta Was this translation helpful? Give feedback.
-
This issue describes our use case as well. We have raw data which is produced continuously. I intended to bin this into e.g. daily chunks, then run multiple stages of processing on each chunk, possibly using the inputs of other stages from the same time bin. My problem is that I don't want to reprocess years worth of data every day when a new file is added to the raw data, but I also don't want to manually create thousands of processing stages for every raw data file and all their children. |
Beta Was this translation helpful? Give feedback.
-
I think this depends on the what the DVC think itself to be? For me it's a data tracking tool in training staging, and data tracking and metrics comparing help me a lot when I make some experiments to the model, after that I use other tools to deploy my model. So I use DVC as a data tracking tool like git, not a computation engine .I don't know what kind of tool, you guys want it to be |
Beta Was this translation helpful? Give feedback.
-
One more case and request - https://discordapp.com/channels/485586884165107732/563406153334128681/718108510671601824
|
Beta Was this translation helpful? Give feedback.
-
So we might have map-style incremental processing and reduce-style one. The map seems like a lot simpler should we separate it? |
Beta Was this translation helpful? Give feedback.
-
@Suor it seems to me that both map or reduce would require some general mechanism first - how to apply certain logic only to new files. The way that logic being applied and actual output is being produced seems to be a simpler question to solve (and we even have some hacks like |
Beta Was this translation helpful? Give feedback.
-
I am the author of the comment above on Discord:
I think it's also interesting to look at that use-case in relation with:
A potential different workaround would be to allow pipelines to create pipelines, so a "master" pipeline could create N different pipelines (one per file to process), and those would be executed one after the other. This would also allow very flexible workflows with dynamic pipeline reconfiguration based on scripts. "ML experiments and hyperparameters tuning" #2799 could be related. I can't find any issue related to that one, is it hidden or should I create it? |
Beta Was this translation helpful? Give feedback.
-
@MatthieuBizien Please feel free to create an issue for that :) |
Beta Was this translation helpful? Give feedback.
-
I just want to use it for what its name stands for: "Data Version Control". The difficulty is that since we are talking about data, it can be so big that we don't want to actually commit it or have all of it on our computers (otherwise we would just use git and commit / push / pull / clone everything). Going from code (git) to data (dvc) means you want to be able to deal with only a subset of the full repository:
I don't want it to be a computation engine but that is a direct consequence of this. |
Beta Was this translation helpful? Give feedback.
-
I hope this suggestion adds something to the above thread. As we all know, if I define a pipeline:
then suggestion: if we were able to define a rule that connects directly in the DAG pairs of
It's like we are enhancing the DAG to include more local dependencies (and independencies), between input files and output files. |
Beta Was this translation helpful? Give feedback.
-
@jonilaserson the discussion here circles around that in a way. Your use case is however is a subset of whatever we discuss. It also has a workaround employing an undocumented |
Beta Was this translation helpful? Give feedback.
-
I'm using DVC for a project which processes continually produces time-series data in a series of steps. I described my use case above a little, but maybe I'll mention how I'm using it now. My current system wraps DVC in code which produces DVC jobs based on the input data: it scans the input directories, creates a DVC file describing a job for each file that it finds unless a DVC file is already present and then runs DVC repro to compute any steps which aren't yet present. The benefits of this structure are that, once the wrapper step has run, the repository is a fully valid DVC repository which doesn't need any special tool beyond DVC itself to interact with. In order to make this work, I needed to bin my data into preset chunks and have a way of mapping from inputs to outputs (since those outputs are then used in subsequent steps). In my time-series application, I did this by mandating filenames like "20200720_stepA_1.csv". To bring this functionality into DVC itself, I'd imagine a couple of changes. Meta jobsI found the generation of DVC jobs to work quite well. You could imagine a "meta-job" which defines how input data should be processed to output data, and can then be used in later steps. Meta jobs can form their own DAG, even if there's no input data yet. Running a metajob creates normal DVC jobs for each valid combination of input chunks present. A chunk is present if either
Metajobs can also depend on normal DVC jobs/files, in which case the dependency is passed to all generated jobs. All metajobs are run before normal jobs, to generate a set of normal jobs for all available input data. The normal jobs can then be run in the same way as they currently are. Step to step mapping - inputsThe step-to-step mapping would need to be configurable. This would need to be done such that subsequent steps can use the outputs of previous ones. You'd need to decide "do I always map a single step output to a single step input?" I.e. if my input source (call it "stepA") has data like this:
And "stepB" depends on "stepA", should "stepB" be called three times (for the three days, with all inputs for a given day passes as inputs to stepB), six times (for each file, passed individually) or once (easier implemented by passing the whole folder, as in the current version of DVC). You might achieve this by e.g. defining a regex for inputs. For stepB you might store the regex For case 2 (stepB should be called six times), this would be enough.
where this tells DVC to group by the contents of capture group 1 in the regex (i.e. the timestamp). Step to step mapping - outputsThe outputs would also need defining. "stepB" might only have a single output, with the form Input passingSince your steps are being called with different inputs each time, you'll also need some way of passing these inputs to the command that's being called. That might involve a substitution token in the command, like That's a bit ugly right now, I'm sure some thought could make it more elegant. SummaryThat's a bit of a brain-dump and I'm sorry if it's not totally coherent. I thought I'd document my experience with adding this kind of functionality on in case it informs the requirements for adding this into DVC itself. I'd summarise my suggestion by:
|
Beta Was this translation helpful? Give feedback.
-
I am late to this party, but I think that a better way to deal with this would be to start with a file containing names of files to be processed. A processing step would look at this inventory file and compare it to the inventory file recording files processed (letting that file survive would be a change). All files mentioned in the inventory file and not mentioned in the processed inventory would require incremental processing. The output of the processing step would be a new output inventory to be used in the next incremental step. Version controlling the input, processed inventory and output files would be done outside of the context of the processing step rather than marking them as dependencies or outputs of the processing step. Only the input inventory would be a dependency and only the output inventory would be an output of the step. Back fill could be triggered by a (version controlled) change to the processed inventory. This approach also allows for the relationship between input files and output files to not be one-to-one. This is useful if a sketch of all files processed in a single batch is put out as a single file. Since sketches are typically much smaller than the overall data, large numbers of sketches can typically be combined non-incrementally for global aggregates. The only extensions here would be
It would be desirable if memoization were possible. Thus if some branch somewhere has processed the same input batch with the same code, we should be allowed to assume that the outputs will be the same and simply check them out. The effect of this would be that back-fill with a trivial new version of a processing step that actually doesn't change semantics (think formatting or comment changes) could proceed nearly instantaneously. Likewise, if you have a new version of a DAG you want to stage, those aspects of work that are identical to the production version could be short-circuited by checking out the output files from the production runs. Does this make sense? |
Beta Was this translation helpful? Give feedback.
-
Additional discussion here: https://discuss.dvc.org/t/need-to-build-non-ml-data-pipeline-is-dvc-good-fit/849/8 Net-net, I think that this can be done by allowing small state files for processing steps and a helper that updates a "new files" output. These are not a circular dependency since they
As such, reproducibility and idempotency are not compromised. Further, there is no bright line between data preparation or engineering and machine learning. Speaking from a LOT of experience, getting both right and both integrated is really crucial and DVC could really shine as an integrated path for both tasks. Speaking based on recent conversations in $dayjob, Pachyderm will eat DVC's lunch without something like this. |
Beta Was this translation helpful? Give feedback.
-
I almost always work on ML projects that involve a growing dataset. Preprocessing steps are often long running and the preprocessing of each file is typically independent. Scenario:
Here is one workaround: Create a script
Have your
If anyone happens to have a more elegant workaround for this scenario, please share it as I face this situation all the time. The downside of this approach is that to run
Whichever path you take it's quite a lot of boilerplate for a common and simple scenario. It also doesn't seem to reach far beyond the existing capabilities of DVC, since you can almost get there with the existing DVC YAML. Without knowing about the DVC implementation, I imagine there are a number of possible ways the YAML could directly support this, including maybe having
Or perhaps I'm wrong and there's some showstopper that makes this not feasible? There are probably better and terser ways than this too but that's just the natural thing that came to mind. |
Beta Was this translation helpful? Give feedback.
-
It seems like it is only possible to replace a dataset entirely and then re-run the analysis. Incremental processing would enable more efficient processing by avoiding recomputation. Here's how Pachyderm does it.
Beta Was this translation helpful? Give feedback.
All reactions