-
Notifications
You must be signed in to change notification settings - Fork 1.2k
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental processing or streaming in micro-batches #331
Comments
Hi @kskyten |
Let's say you have a folder of images and you define a computationally intensive pipeline for them. After having defined this pipeline you acquire new images that you want to process. This corresponds to the inter-datum incrementality in Pachyderm, if I understood correctly. |
We are going to implement Dependency to directory #154 which is a little bit related to this incremental feature. But it is not enough. This is a good scenario. Let's keep this feature request - it will be implemented. We just need to decide a priority. |
@dmpetrov , are we there yet or we should keep this open? |
@MrOutis Yep, this should work now by using Closing, please feel free to reopen. |
Guys, this is actually a request for streaming\micro-batch processing. Scenario: I have a dir with images and I'd like to process each image with It is mostly data engineering scenario, not data science. So, we should think carefully if DVC should handle this or not. But it is still a valid case and it will be especially useful when we implement multi-processing #755. So, I'm renaming and reopening. |
Thanks, @dmpetrov |
Hi, has this eventually been implemented ? |
Hi @Pierre-Bartet, it have not been yet implemented. Would you find this feature useful? Could you describe your use case? |
@pared Use cases equivalent to the one already described:
In fact this is exactly at this point that data version control tools could become really useful because the situation is messier than in the 'vanilla' use case. AFAIK Pachyderm is the only tool with this kind of feature but it is super heavy. I think what is needed is to be able to
So that a dvc repro would only run on needed files. The hard part is that:
could both mean:
So I guess the '--d' should be replaced by something else to distinguish between those two cases. |
I don't think that distinguishing whether to apply a given code on each image or all images should be Having said that I think that we could implement this feature (applying on "new" data) for directory dependencies, but it would need to be users' conscious decision to use it. Some notes:
After writing this, it seems to me that my proposition for this solution is wrong since it can easily be misused. Leaving this here for further discussion. Maybe we could implement this as |
@Pierre-Bartet @pared an interesting discussion. I also don't see a way how DVC being completely agnostic to the way code and files are organized now can solve this now. Thinking about the right abstraction for the job, I have first the question back: Let's imaging we do:
How does this script organized internally? How does it read the data from this I'm asking because right now like I mentioned we don't dictate any of that. But when we start doing things incrementally how do pass that delta information to the script? How will it know that it needs to read only certain files from the directory? @Pierre-Bartet do you have some ideas that comes to your mind? |
I've been thinking about it but I see no solution that would not totally change DVC's behaviour, mostly because of what you just said:
The behaviour I had in mind was to have the dependencies given as arguments to the scripts (here computation.py) so that dvc run could either:
But that is indeed a huge deviation from the current dvc behaviour |
This is also a feature which I would like to see handled better with DVC if possible. Currently, the only way to avoid expensive re-computation on partial data sets when adding new data is to bypass DVC altogether, execute the processing on only the new files, and then run dvc commit on the results -- which seems rather to detract from the whole point of using DVC in the first place to track changes to datasets. And yet, it must be possible to avoid re-processing tens of thousands of files just because a single new file was added to an input directory. @Pierre-Bartet, my thoughts for a possible behaviour were also in the same direction as yours, passing [delta] dependencies to the script being run. A gentler approach along the same lines may be the following:
This is for sure "advanced" behaviour and requires users to be aware of the existence of the index. But at least it doesn't mean DVC having to change its behaviour in regard to how it calls commands with dvc run. I realise this suggestion isn't necessarily entirely thought through, for which I apologise. Please feel free to tear it down where necessary :) |
I find this feature really useful 😅 The simplest scenario that I can imagine is by giving the user a way to tell which files are mkdir data
for i in {1..3}; do echo $i > data/$i; done
dvc add data When you add more files: for i in {4..10}; do echo $i > data/i; done Doing
that way the user would iterate over all of those files and run the respective command and then The implementation for from dvc.repo import Repo
from dvc.stage import Stage
repo = Repo('.')
stage = Stage.load(repo, 'data.dvc')
out = stage.outs[0]
path = out.path_info
cached = set(path / info['relpath'] for info in out.dir_cache)
current = set(out.remote.walk_files(path))
new = cached - current
for path in new:
print(new) We could start with a simple implementation and add more features depending on users request, what do you think? A lot of manual work, but if you want to process everything with a single call, like @Pierre-Bartet suggested in #331 (comment), you can link those files to a new directory and use it as an argument for your script: new_stuff=$(mktemp -d)
for file in $(dvc ls --new data.dvc); do
ln -s $(realpath $file) $new_stuff
done
python script.py $new_stuff && rm -rf $new_stuff |
This is especially frustrating now that I tried to implement my own incremental-processing workaround: I simply set an argument to my computation.py which, if true, doesn't touch any input file that already has a corresponding output file in the output. I realise that this breaks the ability for DVC to know that all outputs were created by the command being run (but I already can't use DVC properly in my case anyway, since the only solution I have to avoid reprocessing all files every time a single file changes in the input is to manually run computation.py outside of DVC and So I executed my pseudo-incremental version of computation.py on my input, only to discover that DVC automatically removes anything in the output location every time |
Hi @markrowan ! DVC deletes those outputs automatically only if it knows that it already has them in the cache, so you would be able to |
This is a great use-case. I even heard about this requirements for a couple of times outside of our issue tracker like "can DVC processes only new images?". It needs to be implemented as a part of DVC. ChallengesThere are some challenges with API and how to align the feature with the current DVC interface and which part goes to code:
SolutionsAll of the above questions are great. Also, @Pierre-Bartet, @pared and @markrowan pointed to possible solutions. Some of the solutions might align well with DVC design. Passing files as an argument (append to the args list) to the code is the best protocol between DVC and code that I see for now (I mean the first option with
How this can look from command line:
What is important:
Open design questions
Presenting result to usersWhen the use case will be implemented we need to show a nice looking use case in the documentation or a separate blog post. It's great to have this use case even before the release - it will help driving requirements and test the implementation. @Pierre-Bartet @markrowan @kskyten could you please help us to define the use-case? The major questions:
If someone can participate or even drive this doc/blog-post activity it would be awesome. Related issuesFor a broader perspective, there are some issues that might be (or might not) related to this one. It would be great is we find some similarities in the scenarios and can reuse this.
|
Yes both the input and the output will be some kind of new 'collection' objects.
This can be done using hash lists or hash trees.
Therefore hash trees. Also trees can help if you want to be able to support folder with a large number of files (I've faced several use cases involving millions of files).
Isn't all this also related to parallel processing ? (not a rhetoric question) |
Right. We are already using some kind of hashing in DVC. It would be great if the current hashing mechanism would be reused. The existing one might not work for not-data files and dirs.
Absolutely! Updated. |
There is still a sub use case of training on top. For this you both need an old output (a pretrained model) and a set of added files (new inputs), which is essential to some deep learning, esp. images/nlp scenarios. This, however, complicates things a lot:
|
We should also consider raising the priority. What do you think guys? |
Ok my bad I misunderstood what you meant, so the flow will be like that: As long as the dependency graph is a DAG there should be no checksum problem. Here maybe what is confusing is that both 'model 1' and 'model 2' could share the same key, for example 'model.pkl', but which one would you choose ? If you choose 'Model 1 + 2', then 'Model 1' will be inaccessible. It's a problem of hiding an intermediate result, there is no checksum problem here. |
By nature of dvc both models will share the same filename, they will go in different git revisions though. Batches also combine into a single dep, in current design |
You can add a dir with files into dvc, and then add some files to that dir and
You mean add a new stage to produce a new model? This is a dirty non-scalable workaround) You will need to duplicate everything for that:
So you dup all outs and deps and stages. We can say that in dvc logic the data dep, might not be simply a dir of files, but a collection of batches (or a dir with some history), i.e. an incremental data artifact, then an input of a stage is also not a dir, but a collection of batches, then output will be reproducible, only to reproduce that you need to do:
We will need probably an erase history command too. We still have an out as a dep situation though. |
I've never encountered the situation you describe: either the number of batch was small, and indeed I wanted to train Model 1 + 2 using Model 1 + Batch 2, with a tractable number of batch and a need for being able to access Model 1 and model 2 separately, or the number of batch was big enough so that I wanted to retrain on all the batches (not incremental). But indeed your use case exists and it would be a nice feature. I don't know about the exact DVC internal mechanism, but it seems to me that you (the front user) could write a pipeline stage like this:
But under the hood it would be in reality:
|
@Suor @Pierre-Bartet either I'm missing something or the case of running the stage on all images + pre-trained model can be solved by #331 (comment) (it's already in DVC but we don't merge docs for since don't like introducing "virtual" cycles into DAGs for now - so consider this semi-official). The case from the ticket description is and a bunch of comments below is about actual Pachyderm-like incremental processing - when you run your script only on new files. Just to clarify, which one those have you been discussing? |
@shcheklein persist outs might help, we still need two things:
|
@shcheklein : Indeed what we've been discussing with @Suor is a different kind of 'incremental' |
This issue describes our use case as well. We have raw data which is produced continuously. I intended to bin this into e.g. daily chunks, then run multiple stages of processing on each chunk, possibly using the inputs of other stages from the same time bin. My problem is that I don't want to reprocess years worth of data every day when a new file is added to the raw data, but I also don't want to manually create thousands of processing stages for every raw data file and all their children. |
I think this depends on the what the DVC think itself to be? For me it's a data tracking tool in training staging, and data tracking and metrics comparing help me a lot when I make some experiments to the model, after that I use other tools to deploy my model. So I use DVC as a data tracking tool like git, not a computation engine .I don't know what kind of tool, you guys want it to be |
One more case and request - https://discordapp.com/channels/485586884165107732/563406153334128681/718108510671601824
|
So we might have map-style incremental processing and reduce-style one. The map seems like a lot simpler should we separate it? |
@Suor it seems to me that both map or reduce would require some general mechanism first - how to apply certain logic only to new files. The way that logic being applied and actual output is being produced seems to be a simpler question to solve (and we even have some hacks like |
I am the author of the comment above on Discord:
I think it's also interesting to look at that use-case in relation with:
A potential different workaround would be to allow pipelines to create pipelines, so a "master" pipeline could create N different pipelines (one per file to process), and those would be executed one after the other. This would also allow very flexible workflows with dynamic pipeline reconfiguration based on scripts. "ML experiments and hyperparameters tuning" #2799 could be related. I can't find any issue related to that one, is it hidden or should I create it? |
@MatthieuBizien Please feel free to create an issue for that :) |
I just want to use it for what its name stands for: "Data Version Control". The difficulty is that since we are talking about data, it can be so big that we don't want to actually commit it or have all of it on our computers (otherwise we would just use git and commit / push / pull / clone everything). Going from code (git) to data (dvc) means you want to be able to deal with only a subset of the full repository:
I don't want it to be a computation engine but that is a direct consequence of this. |
I hope this suggestion adds something to the above thread. As we all know, if I define a pipeline:
then suggestion: if we were able to define a rule that connects directly in the DAG pairs of
It's like we are enhancing the DAG to include more local dependencies (and independencies), between input files and output files. |
@jonilaserson the discussion here circles around that in a way. Your use case is however is a subset of whatever we discuss. It also has a workaround employing an undocumented |
I'm using DVC for a project which processes continually produces time-series data in a series of steps. I described my use case above a little, but maybe I'll mention how I'm using it now. My current system wraps DVC in code which produces DVC jobs based on the input data: it scans the input directories, creates a DVC file describing a job for each file that it finds unless a DVC file is already present and then runs DVC repro to compute any steps which aren't yet present. The benefits of this structure are that, once the wrapper step has run, the repository is a fully valid DVC repository which doesn't need any special tool beyond DVC itself to interact with. In order to make this work, I needed to bin my data into preset chunks and have a way of mapping from inputs to outputs (since those outputs are then used in subsequent steps). In my time-series application, I did this by mandating filenames like "20200720_stepA_1.csv". To bring this functionality into DVC itself, I'd imagine a couple of changes. Meta jobsI found the generation of DVC jobs to work quite well. You could imagine a "meta-job" which defines how input data should be processed to output data, and can then be used in later steps. Meta jobs can form their own DAG, even if there's no input data yet. Running a metajob creates normal DVC jobs for each valid combination of input chunks present. A chunk is present if either
Metajobs can also depend on normal DVC jobs/files, in which case the dependency is passed to all generated jobs. All metajobs are run before normal jobs, to generate a set of normal jobs for all available input data. The normal jobs can then be run in the same way as they currently are. Step to step mapping - inputsThe step-to-step mapping would need to be configurable. This would need to be done such that subsequent steps can use the outputs of previous ones. You'd need to decide "do I always map a single step output to a single step input?" I.e. if my input source (call it "stepA") has data like this:
And "stepB" depends on "stepA", should "stepB" be called three times (for the three days, with all inputs for a given day passes as inputs to stepB), six times (for each file, passed individually) or once (easier implemented by passing the whole folder, as in the current version of DVC). You might achieve this by e.g. defining a regex for inputs. For stepB you might store the regex For case 2 (stepB should be called six times), this would be enough.
where this tells DVC to group by the contents of capture group 1 in the regex (i.e. the timestamp). Step to step mapping - outputsThe outputs would also need defining. "stepB" might only have a single output, with the form Input passingSince your steps are being called with different inputs each time, you'll also need some way of passing these inputs to the command that's being called. That might involve a substitution token in the command, like That's a bit ugly right now, I'm sure some thought could make it more elegant. SummaryThat's a bit of a brain-dump and I'm sorry if it's not totally coherent. I thought I'd document my experience with adding this kind of functionality on in case it informs the requirements for adding this into DVC itself. I'd summarise my suggestion by:
|
This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
It seems like it is only possible to replace a dataset entirely and then re-run the analysis. Incremental processing would enable more efficient processing by avoiding recomputation. Here's how Pachyderm does it.
The text was updated successfully, but these errors were encountered: