-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event time processing #127
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work working on this. I know you have a bit of stuff that is blocking this, but wanted to give you some comments on what I was thinking for event time.
@davidselassie I updated the PR following yesterday's conversation, using SystemTime do determine events' lateness. edit Added an |
System(SystemClock), | ||
} | ||
|
||
impl InternalClock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the thinking behind having a wrapping class here? Rather than using Box<dyn Clock<V>>
and the methods on that directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that I only wanted to explicitly allow a subset of the clocks we have here.
It doesn't make sense to allow an EventTimeClock
as the internal one for example.
We could still control this when building the clock from the config, but the generic type would not be representing what I wanted.
Using Box<dyn Clock<V>>
also makes things a bit more complicated with the type system, but I think that can be solved.
There is another problem though, with the fix needed in the EventTimeClock::watermark
function, we ask for TestingClock
's time even if an event didn't arrive. This makes the internal counter of the TestingClock
advance more than once per event.
I'm tempted to go back to just having an TestingEventTimeClock
, duplicating some code, but having full control of what happens there, and not polluting the EventTimeClock
with things that (right now at least) are only needed for tests. But if you think it's worth it for other possible use cases I'll look for a way to make this work.
Thanks for the thorough review @davidselassie , this was needed. Right now the tests are broken due to the use of the As I said there, I'm tempted to go back to having a Do we have a use case outside of testing to have different kind of clocks for the |
pytests/test_event_time.py
Outdated
def input_builder(worker_index, worker_count, state): | ||
# Each yield advances the clock by 1 second | ||
# This should be processed in the first window | ||
yield temp(1, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the comments! At the risk of having a less legit, real-life flow, you could modify the events to be strings instead of temp values, so their values could be more revealing, i.e. "window 1", "drop since late", "window 2".
src/window/event_time_clock.rs
Outdated
#[pyo3(get)] | ||
pub(crate) late_after_system_duration: chrono::Duration, | ||
#[pyo3(get)] | ||
pub(crate) system_clock_config: Option<TestingClockConfig>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have an optional TestingClockConfig here to allow for testing windowing with event time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's now slightly different, but yes, we still accept a parameter to use a testing clock rather than the default system clock in the event time clock. As I said in another comment, initially I implemented two separate clocks, EventTimeClock
and TestingEventTimeClock
, with the only difference between the two that the testing clock was using an auto incremented counter rather than system's now
.
This meant keeping the two implementations in sync by hand, and could lead to problems if not (a test passing when it shouldn't because test and production use two different implementation of the event time clock), so I decided to find a way to reuse the same EventClock changing only the meaning of now
.
I'm still not convinced it's the best approach, since there is a parameter needed only for testing inside a "production" object, but I felt this was better than the way I did it previously.
Open to suggestions though
0cf8971
to
bb629e2
Compare
3055da3
to
7d522e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working so hard on this. I try not to nitpick people's algorithms, but I wanted to really double check that this produced the correct behavior and happened to think of some maybe clearer phrasing? Feel free to discard this advice on your discretion.
src/window/system_clock.rs
Outdated
@@ -18,8 +18,15 @@ use super::{Clock, ClockConfig}; | |||
/// Config object. Pass this as the `clock_config` parameter to | |||
/// your windowing operator. | |||
#[pyclass(module="bytewax.window", extends=ClockConfig)] | |||
#[derive(Clone, Copy)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these suddenly needed on our config types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is due to the new ClockBuilder
trait, which consumes self
.
Previously in the build_clock_builder
function we were manually cloning all the fields in the config, and passing them to the builder. With the new trait we clone the whole config instead (required by the FromPyObject
trait, used in extract
). So instead of implicitly requiring all the fields in the config to be Clone
, we require the config itself to be cloneable.
Since we end up cloning all the fields in the end, I thought having this requirement here wouldn't do much of a difference.
This change is not needed for this PR, I can easily revert it, remove the ClockBuilder
trait, and implement the builder
function on the struct like it was previously. It's a refactoring I did along the way to make the pattern we use more explicit, and I feel it makes the code more readable, but it works the other way too.
bb629e2
to
3434e3e
Compare
023a8af
to
423ba33
Compare
/// Config object. Pass this as the `clock_config` parameter to your | ||
/// windowing operator. | ||
#[pyclass(module="bytewax.window", extends=ClockConfig)] | ||
#[pyo3(text_signature = "(dt_getter, wait_for_system_duration, system_clock)")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think wait_for_system_duration
is explicit but perhaps leaking implementation. What about something like lateness_buffer
or late_after_duration
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's "internal implementation" but it's also the correct interpretation of this duration, as there's sort of multiple types of time going on.
de7ffb3
to
684ee4a
Compare
f8139d7
to
1bcfc7d
Compare
1bcfc7d
to
3450c6c
Compare
This PR introduces a new Clock Config:
EventClockConfig
.EventClockConfig
is a clock based on datetimes retrieved from events.Whenever an event is received, its datetime is extracted through a user specified function.
Events are awaited until a user specified duration is passed reached, if an event comes late it's dropped.
This configuration requires the user to specify 3 parameters:
dt_getter
: a function that receives an event as input and should return a datetime extracted from the eventlate
: a duration after which (in event-time, not realtime) the window for waiting late events is closedsystem_clock
: by default the system clock is used internally, but a TestingClock can be passed hereThis PR is ready for review, but is blocked by:TODO
time.sleep
in tests