-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Refactor uploader to ease adding new data types #3215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
OK, I realized that making I will have to plumb through a streaming solution in a future PR for blobs anyway. Thus I could switch the present PR back to |
wchargin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sending this! In my experience, “pre-work” PRs like this can
make understanding subsequent changes much easier, so I appreciate it.
| if self._byte_budget < 0: | ||
| raise RuntimeError("Byte budget too small for experiment ID") | ||
|
|
||
| def add_event(self, run_name, event, value, metadata): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve been thinking about this a bit…
I note that the top-level _RequestBuilder is “pull-based” in the sense
that it takes an iterator and produces an iterator via a coroutine. This
fits in nicely with the surrounding code on both sides: the client just
iterates over the requests from the request builder, and the request
builder itself (inside the coroutine) iterates over the events from the
logdir loader.
The new _ScalarRequestBuilder, on the other hand, is “push-based”: it
receives a single event at a time and must immediately decide what
requests to emit and whether to put-back the event. From a different
perspective, both request builders conceptually have a loop, but the
loop in _RequestBuilder is internal while _ScalarRequestBuilder loop
is external.
I wonder if it could be cleaner and more uniform if everything were
pull-based. At a conceptual level it makes sense to me to think of the
_RequestBuilder as splitting its input stream into three—one stream
each for just the scalar, tensor, and blob sequence Values—and
delegating to _[DataClass]RequestBuilders, each of which takes a
single stream of events and returns a single stream of requests, just as
before. In particular, the “peeking” behavior would be pushed down into
the _[DataClass]RequestBuilders, so _ScalarRequestBuilder would
actually be exactly the same as _RequestBuilder was before this PR.
Then, the top-level _RequestBuilder would multiplex over these three
streams and return a single stream of their mixed responses.
In fact, LogdirLoader itself does something similar, multiplexing over
directory loaders, and directory loaders themselves multiplex over
timestamped event file loaders, which in turn abstract over a couple
more layers.
Here’s a summary picture to better convey what I mean:
https://gist.github.com/wchargin/c123db42927b93d7ebf4d9793a555b9a#gistcomment-3167958
So I’m curious—did you consider an approach like this? and what do you
think?
(For context, I went through a surprisingly large number of iterations
of this code and similar batching code in the backend, and it seemed to
all fall out more nicely with a pull-based approach. That doesn’t mean
that this will continue to be the case, though.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the principle of going fully pull-based, but I'm missing something about the implementation. In particular I'm not sure what you had in mind about splitting the stream into three streams.
There are various online discussions about this (e.g. http://paddy3118.blogspot.com/2013/06/filtering-iterator-into-n-parts-lazily.html is particularly relevant), but they all depend on itertools.tee() or similar. The issue is that if the multiplexer tries to pull from stream A to exhaustion first, then the splitting mechanism has no choice but to memorize all of stream B along the way. Of course the multiplexer could try to alternate among the streams, but that could amount to the same thing, depending on the distribution of the events among the three types, and the order in which they arrive at the splitter--which the multiplexer can't know about.
I wondered whether our multiplexer can be smart based on the event timestamps (i.e., not letting one of the three iterators get too far ahead of the others in time). However, the batching within each _*RequestBuilder obviates that possibility, because a single pull at the multiplexer level would produce many pulls at the splitter level-- which could cause one stream to dramatically outrun the others, despite the multiplexer's best efforts to keep them synchronized.
The upshot is that I think the solution in this PR actually does exactly what we want, which is to emit requests from the multiplexer in the optimal order--i.e., it requires no memorization of the input stream apart from what is needed for batching anyway, precisely because of the push nature of the component request builders.
Of course I'm very happy to discuss further, especially if you have another solution to these concerns!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After grappling with this some more, I'm now all-in on a push model, as the current revision shows. I think we need to push to the component streams in any case, as described above. And furthermore, each component stream may require a different writing mechanism (most dramatically, single RPCs vs. streaming RPCs). So there is nothing to be gained from trying to multiplex the split streams; instead, each one can flush itself to storage independently.
Note this also eliminates the need for peekable_iterator, and is generally (imho) easier to reason about.
Happy to discuss (ideally asap). To try to restate the argument compactly: any non-buffering stream must be driven by exactly one loop, the "pump". Anything upstream of the pump is pulled, and anything downstream is pushed. We know that we have to pull from the logdirs, and that we have to push to the write RPCs. So the question is at what point in between those to place the pump. The pump must be downstream of the logdir loader (because that is all done with a pull-style multiplexer which works fine and we don't want to touch). Also, the pump must be upstream of the split. So I think that leaves the present updated design as the only option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the careful consideration! This is both quite different
from my original design and quite clean and readable. The “pump”
argument was particularly helpful for convincing me that this makes
sense.
davidsoergel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent-- thanks for thinking this through again, in light of the new requirements.
This refactors uploader.py with an eye towards adding tensor and blob uploads, in addition to scalars.
The approach is to accumulate batched requests of the three data types in three separate accumulators, each of which maintains its own byte budget.
The main point of this PR is to factor apart the top-level
_RequestBuilder, which is responsible for iterating over the entire event stream, from the subsidiary_ScalarRequestBuilder(foreshadowing the other two), which is responsible for accumulating a batchedWriteScalarsRequest. The subsidiary builders are stateful, so their interface becomes primarilyadd_event(...)andemit_requests().Note that
emit_requests()returns an iterable, even though in the scalars case it may have at most one item. This is for consistency with the impending blob case, whereemit_requests()may return a generator for multiple chunked requests.The excellent preëxisting tests are unchanged, and thoroughly exercise this code. (I just added one test for an edge case that I think was not handled correctly before, but is now).