Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Preserve distinct ID locality on overflow rerouting #20945

Merged
merged 6 commits into from
Apr 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: message.key,
Copy link
Contributor

@xvello xvello Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.key can be empty if capture-side overflow detection triggers.

As we want to evaluate impact and probably don't want to invest a lot for now, I'm fine with:

  • checking how much capture-side detection triggers (on both new and old capture) vs plugin-server-side
  • disable capture-side detection on both captures for now while we evalutate this

Last monday's incident has shown us that we can read & unmarshall really fast (1.6M/minute with 8 historical pods dropping on token), so capture-side might not be really necessary.

The alternative would be to re-compute a key if missing, but then that's a third copy of that code to maintain, I'd rather avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to hold off on merging this for a bit — I had assumed there was already a method to turn off overflow routing wholesale on both captures but it doesn't look like that was a valid assumption for me to make. Seems like it'd make sense to take care of that first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be turned off in python, but you are right that it cannot be turned off in rust yet. I'd put very high thresholds in the rust config while working on a PR to add a boolean config to completely disable it.
BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.

Copy link
Contributor Author

@tkaemming tkaemming Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be turned off in python, but you are right that it cannot be turned off in rust yet.

Ah, I was thinking we'd want to bypass this entire conditional, but maybe that's overkill.

BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.

Good point, I hadn't really considered that — thanks for mentioning it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, we'd need to skip the check against LIKELY_ANONYMOUS_IDS too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ability to turn off random partitioning completely in old capture is here: #21168 (My initial thought was to keep that PR separate, but in retrospect, I suppose this could have been part of this change too.)

tkaemming marked this conversation as resolved.
Show resolved Hide resolved
headers: message.headers,
waitForAck: true,
})
Expand Down Expand Up @@ -330,7 +330,6 @@ export function splitIngestionBatch(
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
key: batch[0].key,
waitForAck: true,
})

Expand Down
Loading