track per-stream record counts and records committed #3247
Labels
area/platform
issues related to the platform
team/compose
team/platform-move
type/enhancement
New feature or request
Milestone
Goals
We currently track the total number of records emitted by the Source. We want to track two messages with regard to volume.
Ideally we would have the combination of the two: number of records per stream committed to the Destination.
In this iteration of this feature, we do not want to add a new message type to the Airbyte Protocol to track this stat. Instead we want to see how much was simply track in the
ReplicationWorker
.Proposed Solution
One solution we have discuss is the following:
Modify the
MessageTracker
to track the record count at the stream level. That means instead of tracking a single count for the total records, we would have it track records by stream.In order to support tracking the number of records committed to a destination, we need to track counts by state. That is because the Destination will emit state messages when it commits the associated records. There is no guarantee that every state message gets emitted by the Destination. The only guarantee is that it will commit at least the last state message that was committed. Leveraging this knowledge means that if the
MessageTracker
tracks record counts by state, we can take whatever State message was from the Destination and know that at least the records emitted at that State message have now been committed.Thus the
MessageTracker
needs to track counts by state objects. Here is an example of what this might look like:If we saw the following messages emitted from the source:
We would expect the
MessageTracker
to be able to tell us:Storing the Metadata
Because the number of State message is technically unbounded, we need to be careful to store this metadata in such a way that does not cause memory problems. This section describes 2 potential approaches.
Option 1: Use an fixed-length array to store counts for each State message
We can store a
Pair
of the state object and the total records counts for each stream. Instead of storing the whole state object we can hash the state objects to 4 bytes with murmur hash (guava:Hasing.mumur()
). The counts can be stored in an array where the index in the array corresponds to the stream. For example if we had stream1, stream2, stream3, we would know that given this array[10, 100, 32]
that at this State message, stream1 had 10 records, stream2 had 100, and stream3 has 32.Pros
Cons
Thus the final data structure could just be an Long array, where the first value was the long representation of the state and each other element would correspond to a stream. We could store each of these arrays in a List.
Option 2: For each state, only store counts for streams that actually had a record
Instead of storing a count for every stream for each state, we could just store the delta of the streams that actually saw at least one record. They could be stored in a byte array with the following schema.
<8 bytes: state hash><4 bytes: index of stream><8 bytes: records added in this state message>... the last 2 repeat for as many streams are present in that state message
For example, reusing this case:
We would have 2 byte arrays:
<hash of stream 1 (long)>1(int)2(long)2(int)1(long)
<hash of stream 2 (long)>2(int)1(long)
We could then store each byte array in a java list.
Pros
Cons
Note: we could actually use a short instead of an int for the index of the stream names. We do not need to support more 32K stream names.
Based on our best guess on the density, we should go with the second solution.
Edge Cases
Collisions
If 2 state objects have hashes that collide, we will not be able to tell which state object the metadata belongs to. If we see two, non-consecutive state files that hash to the same value, then we should not track the metadata for the states with collisions. We would do this by: 1. don't track the metadata for the new state that collided 2. remove the metadata for the state that was already tracked (the one that was collided with), 3. add this has into a hash set of bad states so that if see this hash again we don't store it. See the next section for how the system should handle the case when a Destination outputs a state message that is not tracked.
There is an alternative more complicated approach where we track multiple hashes for state objects when this case arises, but because we think the case will be rare, it's not worth adding the extra complexity.
Memory Management
In pathological cases, it is still possible for the amount of memory to be used by this metadata tracking to get large. We do not want to risk it causing an OOM for a sync, so we should make sure it is capped. In the case where that cap is reached, we should start removing the state metadata starting with the oldest first. If when the job completes and the Destination emits a state, if it is in what is left of the track metadata, then we can still report the committed records count. If it is not, then we should note that we not know how many records were committed.
We think running into memory issues will be rare, if this turns out not to be true, we can consider spilling to disk or some other non-memory persistence layer.
Persistence
In the
StandardSyncSummary
we already have arecordsSynced
field. That name is a little vague, because it implies it is the number of records committed but in practice it is really the number of records emitted. Thus we will focus on adding new, more clearly named, fields to the struct. After they are added we will populate the new fields and old fields. Then we can run a database migration to removerecordsSynced
and for old jobs move the value ofrecordsSynced
into the new schema.New fields:
recordsSynced
the values will go here)Open Questions
The text was updated successfully, but these errors were encountered: