Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track max and mean time between state message emitted and committed #15702

Merged
merged 23 commits into from
Aug 24, 2022

Conversation

alovew
Copy link
Contributor

@alovew alovew commented Aug 16, 2022

Segment tracking for time between state message emitted from source and committed by destination

  • New class StateMetricsTracker for tracking state message data and calculating max/means for new metrics
  • This includes a memory limit so we don't fail syncs due to StateMetricsTracker hashmap hogging memory & Datadog tracking so we know whether this is happening
  • We are now tracking metrics for all types of syncs: per-stream, global, and legacy. For per-stream we are tracking state message commits per stream, but for global and legacy we are hashing all stream data together

@github-actions github-actions bot added area/platform issues related to the platform area/scheduler area/worker Related to worker labels Aug 16, 2022
@alovew alovew temporarily deployed to more-secrets August 16, 2022 20:05 Inactive
Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be one issue with how we track timestamps here.

Also, can we have a test with more than one state to track? For example, I don't think I saw test where we'd only expire some states.

streamDescriptorsToUpdate.forEach(streamDescriptor -> {
final HashMap<Integer, DateTime> stateHashToTimestamp = new HashMap<>();
stateHashToTimestamp.put(stateHash, timeEmitted);
streamDescriptorToStateMessageTimestamps.put(streamDescriptor, stateHashToTimestamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we erasing the previous data of the streamDescriptor in this loop? It should probably be a create if not exist instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! thank you


// update minTime to earliest timestamp that exists for a state message for this particular stream
// and delete state message entries that are equal to or earlier than the destination state message
for (final Map.Entry<Integer, DateTime> stateMessageTime : stateMessagesForStream.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OPT: Looking at this loop, it feels like we may as well track the <hash, datetime> in a queue.
In the current implementation, even though we have direct access through the hash, we have to scans through all the items to expire older messages.
Feels like having a list ordered by time more effective overall.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd still have to iterate through the whole list though and check the timestamp of each to see when we've reached the correct timestamp -- or are you thinking of a different kind of implementation that's not occurring to me right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the list is ordered, we can stop as soon as we find a match of the hash. All the items we saw should be expired/removed. Depending on the number of states we end up having in flight, it may matter.

If I got it right, current expectation is that for a given stream, state messages order is preserved, so if we get a state from the destination, it means that everything until that state has been processed. With that in mind, it would be more robust to just keep them in order rather than rely on timestamp for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, so still O(n) but in practice it could be faster. so it would look like this:

stream_1: [[state_hash_1, timestamp_1], [state_hash_2, timestamp_2], [state_hash_3, timestamp_3]],
stream_2: [[etc etc]]

then when we get a state message from the destination, we can safely start at the beginning of the state hash list and use the first timestamp as the min timestamp. then we'll remove them one by one as we iterate through until we get to the state hash that matches.

@alovew alovew temporarily deployed to more-secrets August 16, 2022 22:02 Inactive
@alovew
Copy link
Contributor Author

alovew commented Aug 16, 2022

can we have a test with more than one state to track?

@gosusnp you're right about tests, will add some now.

sourceOutputState.set(new State().withState(stateMessage.getData()));
totalSourceEmittedStateMessages.incrementAndGet();
final int stateHash = getStateHashCode(stateMessage);

if (AirbyteStateType.LEGACY != stateMessage.getType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we avoid the tracking if the state is legacy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to track the emit -> commit timing data per stream, and with legacy we can't track per stream state messages so it requires a separate code path, and since it's deprecated anyway we decided against supporting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now I'm actually getting confused about how state is stored for global vs per stream vs legacy though. in airbyte_protocol.yaml it looks like connections with GLOBAL state should look at the data field (the same as legacy) but I thought this was updated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global state should eventually be reading the AirbyteStateMessage::global attribute not the data. I thought reading data was part of our backward compatible implementation.

@alovew alovew temporarily deployed to more-secrets August 17, 2022 20:17 Inactive
} else if (AirbyteStateType.STREAM == stateMessage.getType()) {
stateMessageData = stateMessage.getStream().getStreamState();
} else if (AirbyteStateType.GLOBAL == stateMessage.getType()) {
stateMessageData = stateMessage.getGlobal().getSharedState();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp is shared state what I should be looking at here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau was saying that this may not uniquely identify a state

@alovew alovew temporarily deployed to more-secrets August 18, 2022 22:30 Inactive
@alovew alovew temporarily deployed to more-secrets August 18, 2022 23:42 Inactive
@alovew alovew temporarily deployed to more-secrets August 19, 2022 19:16 Inactive
@alovew alovew temporarily deployed to more-secrets August 19, 2022 19:54 Inactive
@alovew alovew force-pushed the anne/max-and-mean-time-state-message-to-commit branch from 23f6af9 to b4a34aa Compare August 19, 2022 20:41
@alovew alovew temporarily deployed to more-secrets August 19, 2022 20:42 Inactive
@alovew alovew requested review from gosusnp and benmoriceau August 19, 2022 20:43
@alovew alovew temporarily deployed to more-secrets August 19, 2022 20:51 Inactive
@alovew alovew temporarily deployed to more-secrets August 20, 2022 00:03 Inactive
Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a few issues around error tracking and namespace that may cause unexpected errors.
Happy to go over my comments if that helps.

} catch (final StateDeltaTracker.StateDeltaTrackerException e) {
log.warn("The message tracker encountered an issue that prevents committed record counts from being reliably computed.");
log.warn("This only impacts metadata and does not indicate a problem with actual sync data.");
log.warn(e.getMessage(), e);
unreliableCommittedCounts = true;
} catch (final StateMetricsTracker.StateMetricsTrackerException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right. If a StateDeltaTrackerException is thrown, we would not update the stateMetricsTracker which would also make the metrics inaccurate.

I Feel like trying to split the errors here could still lead to inaccurate metrics, if accuracy is our concern, I'd probably go for tracking exceptions as a whole to decide whether we should emit the metric. Keeping track of which exception was thrown would be good for our understanding of where it failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably confusing since I bundled it with the StateDeltaTracker, but it actually is the only error that can occur for the StateMetricsTracker. I can reword the error message too since I think it's not clear.


@Slf4j
public class AirbyteMessageTracker implements MessageTracker {

private static final long STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES = 20L * 1024L * 1024L; // 20 MiB, ~10% of default cloud worker memory
private static final long STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES = 10L * 1024L * 1024L; // 10 MiB, ~5% of default cloud worker memory
private static final long STATE_METRICS_TRACKER_MEMORY_LIMIT_BYTES = 10L * 1024L * 1024L; // 10 MiB, ~5% of default cloud worker memory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fairly suspicious overall, do we know if those numbers are still accurate?

* If the StateMetricsTracker throws an exception, this flag is set to true and the metrics around
* max and mean time between state message emitted and committed are unreliable
*/
private boolean unreliableStateTimingMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below, I think we should probably just keep track of errors instead of having a split here.

try {
if (!unreliableCommittedCounts) {
stateDeltaTracker.commitStateHash(getStateHashCode(stateMessage));
stateMetricsTracker.updateStates(stateMessage, stateHash, timeCommitted);
stateDeltaTracker.commitStateHash(stateHash);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this ever fail? I notice that we only track StateDelta exceptions and not StateMetrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good call - yes I refactored to use separate booleans here but didn't fully split the updating part.

return hashFunction.hashBytes(Jsons.serialize(stateMessage.getStream().getStreamState()).getBytes(Charsets.UTF_8)).hashCode();
} else {
// state type is GLOBAL
return Objects.hashCode(stateMessage.getGlobal());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this one using hashCode and the others hashBytes(Json.serialize(?

// do not track state message timestamps per stream for GLOBAL or LEGACY state
final byte[] stateTimestampByteArray = populateStateTimestampByteArray(stateHash, epochTime);
stateHashesAndTimestamps.add(stateTimestampByteArray);
remainingCapacity -= stateTimestampByteArray.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remainingCapacity should also be updated in the STREAM case.


public StateMetricsTracker(final Long memoryLimitBytes) {
this.stateHashesAndTimestamps = new ArrayList<>();
this.streamStateHashesAndTimestamps = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it simplify things to track LEGACY and GLOBAL with a hardcoded namespace? I suspect this would remove the need for two different code path most of the time.

final Long epochTimeEmitted) {

final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor();
final String streamNameAndNamespace = streamDescriptor.getName() + streamDescriptor.getNamespace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, namespace could be null.

while (iterator.hasNext()) {
final byte[] stateMessageTime = iterator.next();
final ByteBuffer current = ByteBuffer.wrap(stateMessageTime);
remainingCapacity += current.capacity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we increase the capacity by the capacity of the buffer? I suspect it should be BYTE_ARRAY_SIZE.

@alovew alovew temporarily deployed to more-secrets August 22, 2022 18:24 Inactive
@alovew alovew temporarily deployed to more-secrets August 22, 2022 18:30 Inactive
@alovew alovew temporarily deployed to more-secrets August 22, 2022 18:46 Inactive
@alovew alovew temporarily deployed to more-secrets August 22, 2022 19:42 Inactive
@alovew alovew temporarily deployed to more-secrets August 22, 2022 20:36 Inactive
@alovew alovew force-pushed the anne/max-and-mean-time-state-message-to-commit branch from d81804d to 9b70d23 Compare August 22, 2022 22:43
@alovew alovew temporarily deployed to more-secrets August 22, 2022 22:46 Inactive
@evantahler evantahler removed their request for review August 22, 2022 23:00
@alovew alovew temporarily deployed to more-secrets August 22, 2022 23:09 Inactive
@alovew alovew temporarily deployed to more-secrets August 23, 2022 17:15 Inactive
Copy link
Contributor

@gosusnp gosusnp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small note, probably not a big thing, but worth thinking about.

@@ -334,56 +354,45 @@ public Optional<Long> getTotalRecordsCommitted() {

@Override
public Long getTotalSourceStateMessagesEmitted() {
return totalSourceEmittedStateMessages.get();
return stateMetricsTracker.getTotalSourceStateMessageEmitted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more about how we track than the actual counter itself. If we run into an error, we throw an exception which may prevent us from updating some counters. For example, looking at StateMetricsTracker:84, if we fail to find a state, we'd throw an exception and we would not have updated some of those counts.
Some of it feels order dependent and a bit brittle, I wonder if it makes sense to just ignore metrics to be safe or if a partial view is better than nothing here.

@alovew
Copy link
Contributor Author

alovew commented Aug 24, 2022

@gosusnp I don't think state message emitted counts would be affected by the errors thrown in the state message tracker - max & mean metrics would, but we're setting those to null to avoid issue. the counts are incremented right when a source message is emitted and this should continue to increment even if there's some other error

@alovew alovew merged commit 6991452 into master Aug 24, 2022
@alovew alovew deleted the anne/max-and-mean-time-state-message-to-commit branch August 24, 2022 01:18
sophia-wiley pushed a commit that referenced this pull request Aug 25, 2022
…15702)

* Add logic in AirbyteMessageTracker for calculating max and mean time between state message emit and commit
rodireich pushed a commit that referenced this pull request Aug 25, 2022
…15702)

* Add logic in AirbyteMessageTracker for calculating max and mean time between state message emit and commit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/scheduler area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants