Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rejectionPolicy to KafkaIndexTask to optionally ignore events outside of a window period #3029

Closed
wants to merge 1 commit into from

Conversation

himanshug
Copy link
Contributor

this will allow running batch ingestion tasks outside of the window period interval and never conflicting with kafka realtime task regarding lock acquisition
this is a workaround till #1679 is merged.

by default no events are rejected. however, if user is doing batch resets for previous day's data then they can set the windowPeriod to 1 day and that will ensure that kafka task and batch task would not interfere with each other and kafka task would simply drop events older than a day.

@himanshug himanshug added this to the 0.9.1 milestone May 26, 2016
@gianm
Copy link
Contributor

gianm commented May 26, 2016

@himanshug risk with this approach is that replicas can get out of sync due to different tasks interpreting rejection policies in different ways (clock drift for serverTime, message ordering differences across partitions for messageTime). If replicas are out of sync then that could cause three issues:

  • Query result inconsistency in real-time, although this one is probably not too bad, as it would happen even without rejection. (Just due to different tasks being at different points in the stream)
  • Query result inconsistency on historicals, due to segments in the same interval being out of sync. If an interval has more than maxRowsPerSegment rows, it will make more than one segment. It's possible that historicals will load a mix of segments from different kafka tasks. If they do this then due to the desync, things won't be lined up right.
  • Potential task failure due to segment allocation sequences being out of sync. This can happen if one task allocates a segment for an old event but another task drops the event. In this case whichever task loses the race will fail its next allocation, if any, and then exit and be replaced.

These issues might not happen in a real deployment, and might be okay for the short term, but are something to be aware of…

@himanshug
Copy link
Contributor Author

@gianm agree with above, I had thought about 1 and 2 but I think 3rd is most problematic.
I'm OK dropping this feature and noting "kafka task and batch ingestion task conflict" as a known issue for 0.9.1 .

@dclim
Copy link
Contributor

dclim commented May 26, 2016

Hm, if just supporting messageTime is sufficient, it shouldn't be too difficult to do a per-partition messageTime rejection strategy that would produce identical segments across replicas without any complex coordination.

@himanshug
Copy link
Contributor Author

@gianm @dclim one option could be that supervisor would pass a "start_time" (same value to all replicas within a partition group, similar to sequenceId) and task would use |event_time - start_time| to decide whether to accept/reject a message. that would be deterministic.
that should work?

@dclim
Copy link
Contributor

dclim commented May 27, 2016

I think that should work as long as you're okay with it operating differently from how windowPeriod works now - for example, if you specify a windowPeriod of 10 minutes for a task that runs for 60 minutes, at the beginning of the task it would accept messages as long as they are less than 10 minutes old but by the end it would accept messages up to 70 minutes old. So it's not really serverTime, or messageTime, but something like a taskStartTime rejection policy.

@himanshug
Copy link
Contributor Author

yes, that is true. but , that is ok, usecase here is not really that of window period as in the current realtime task but that of ensuring that events older than certain window are ignored to ensure that kafka task would never try to allocate segments beyond a known time window in the past.

@dclim
Copy link
Contributor

dclim commented May 27, 2016

@himanshug Yes, for that use case that seems like a reasonable solution to implement and as far as I can tell would produce correct results across replicas. I'm not sure if you've started implementing the proposed solution or not but I can take a look at it if you haven't started. Instead of passing both a "start time" and a "window period" to the task, I'm thinking of just having the supervisor pass a single minimumMessageTime or something to that effect that'll be calculated by the supervisor based on it's now time + a period specified in the spec. @gianm thoughts?

@gianm
Copy link
Contributor

gianm commented May 27, 2016

@dclim that sounds good to me. The only potential issue I can think of is replicas having different minimumMessageTimes. Is there going to be any issue with replica-replacement causing failed tasks to be replaced with ones that have a different minimumMessageTime? Even if the replicas are created by a different supervisor than the one that made the original tasks?

@himanshug
Copy link
Contributor Author

@dclim i haven't started on that, feel free to send the PR if you want and close this one.

@dclim
Copy link
Contributor

dclim commented May 27, 2016

@gianm yeah I was thinking about that too as the main complication - the supervisor would select a minimumMessageTime when it creates a taskGroup (the set of tasks processing the same partitions/starting from the same offset) and would apply that same value to all future tasks created in that taskGroup (such as if some failed and had to be replaced). New supervisors would have to read this value from any currently running tasks when the supervisor starts up and initialize the taskGroup with it. If it finds a task with one minimumMessageTime and then discovers another one with the same partitions/starting offsets with a different one, that should be an illegal state and it should kill one of them.

@dclim
Copy link
Contributor

dclim commented May 27, 2016

@himanshug okay I can take a look at it in a bit

@gianm
Copy link
Contributor

gianm commented May 27, 2016

@dclim does that mean the minimumMessageTime would be incorporated into the sequence name?

@dclim
Copy link
Contributor

dclim commented May 27, 2016

@gianm hm, it could be, or it could be omitted and handled separately if that's easier; I believe the supervisor currently uses part of the sequence name to determine task 'equality' - i.e. rather than comparing all the properties in the Kafka index task spec, it just uses the hash of the spec that's calculated for the sequence name.. so having this hash include minimumMessageTime might complicate things when a new supervisor starts up and checks to see if there are any tasks it should care about.. I'll take a look and see what's easier, unless there are other considerations of why it should or should not be incorporated into the sequence name.

@gianm
Copy link
Contributor

gianm commented May 27, 2016

@dclim I was thinking that it being part of the 'equality' is actually good, because two tasks with different minimumMessageTimes are not going to create the same segments are are not equal.

@gianm
Copy link
Contributor

gianm commented May 27, 2016

If there is some other way of making sure things work out, though, then that's okay too.

@gianm
Copy link
Contributor

gianm commented May 31, 2016

It sounds like we can close this in favor of #3035, @himanshug please reopen if desired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants