-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flushless producer supporting both comparable and non comparable offsets #873
Flushless producer supporting both comparable and non comparable offsets #873
Conversation
...m-server-api/src/main/java/com/linkedin/datastream/server/FlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...m-server-api/src/main/java/com/linkedin/datastream/server/FlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
@@ -32,10 +32,38 @@ | |||
private static final String TOPIC = "MyTopic"; | |||
private static final Random RANDOM = new Random(); | |||
|
|||
private static final String OFFSET_CHECKPOINT_TRACKING_STRATEGY_WITH_COMPARABLE_OFFSETS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of redefining, you can reference these configs.
...api/src/main/java/com/linkedin/datastream/server/CallbackStatusWithNonComparableOffsets.java
Outdated
Show resolved
Hide resolved
...rver-api/src/test/java/com/linkedin/datastream/server/TestFlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
...rver-api/src/test/java/com/linkedin/datastream/server/TestFlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
@@ -110,6 +114,9 @@ public KafkaBasedConnectorConfig(Properties properties) { | |||
INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID, DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID); | |||
_enablePartitionAssignment = verifiableProperties.getBoolean(ENABLE_PARTITION_ASSIGNMENT, Boolean.FALSE); | |||
|
|||
_callbackStatusStrategy = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS, | |||
CallbackStatusWithComparableOffsetsFactory.class.getName()); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add verification as well here and save the object rather then saving the name and putting the onus on the caller to verify and throw exception.
...nector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java
Outdated
Show resolved
Hide resolved
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java
Outdated
Show resolved
Hide resolved
...m-server-api/src/main/java/com/linkedin/datastream/server/FlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
...va/com/linkedin/datastream/server/callbackstatus/CallbackStatusWithNonComparableOffsets.java
Outdated
Show resolved
Hide resolved
.../java/com/linkedin/datastream/server/callbackstatus/CallbackStatusWithComparableOffsets.java
Show resolved
Hide resolved
// Hashset storing all the records which are yet to be acked | ||
private final Set<T> _inFlight = Collections.synchronizedSet(new LinkedHashSet<>()); | ||
|
||
// Deque to store all the messages which are inflight until the last consumer checkpoint is made | ||
private final Deque<T> _inFlightUntilLastConsumerCheckpoint = new ArrayDeque<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear how are both of these different. Can you please explain the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list maintains the records which are inflight until the latest checkpoint is made at the consumer side,
Example :
If we have 3 records for a topic partition that are consumed as L1, L2, L3, and we receive ack for L2, the _inFlights set would have 2 records, since we already got produce ack for L2, but the _inFlightUntilLastConsumerCheckpoint would have all 3 records as we still don't have a new consumer checkpoint to make until everything before L2 is acked from producer side.
...va/com/linkedin/datastream/server/callbackstatus/CallbackStatusWithNonComparableOffsets.java
Outdated
Show resolved
Hide resolved
...va/com/linkedin/datastream/server/callbackstatus/CallbackStatusWithNonComparableOffsets.java
Outdated
Show resolved
Hide resolved
_currentCheckpoint = _inFlightUntilLastConsumerCheckpoint.pollFirst(); | ||
|
||
if (!_acked.remove(_currentCheckpoint)) { | ||
LOG.error("Internal state error; could not remove checkpoint {}", _currentCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: How will currentCheckpoints will look in case of non-comparable offsets? How will we debug the issues, i.e. stuck partitions? Offset numbers were easy to compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkpoints in non-comparable scenarios could be of byte type.
...rver-api/src/test/java/com/linkedin/datastream/server/TestFlushlessEventProducerHandler.java
Show resolved
Hide resolved
...rver-api/src/test/java/com/linkedin/datastream/server/TestFlushlessEventProducerHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -6,11 +6,16 @@ | |||
package com.linkedin.datastream.server.callbackstatus; | |||
|
|||
/** | |||
* Interface for CallbackStatus Factories | |||
* Factory implementation for Callback Status With Non Comparable Offsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lower case for Callback Status With Non Comparable Offsets
…ets (linkedin#873) * Flushless producer without comparing offsets * Flushless Producer Supporting Both Comparable and Non Comparable Offsets for Ack-ing * Fix comments * Making methods abstract * Created config support to use both strategies and added tests * Address comments * Address comments Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn1.linkedin.biz>
In Mirrormaking, the connector job polls a list of records from the source and directs those records to be sent out to the destination via the transport provider using the flushless event producer. The flushless event producer keeps track of the in-flight messages and the acknowledged checkpoints for each source partition. These checkpoints are then used by the connector to commit the safe offsets to the source via the consumer API. And this is how the whole mirror maker process works in brief.
Before this change, this flushless event producer supported any pub-sub framework that has comparable type offsets. The offsets were compared by using a priority queue and based on that the checkpoints were moved in the flushless event producer's handler.
This new change provides support for both comparable offsets by using a priority queue and comparing offsets to move the checkpoint, and noncomparable offsets by using a simple deque-based approach that removes the dependency on comparable offsets for any underlying pub-sub framework.
Example Scenario:
Let's suppose that the connector reads messages from a source topic T, partition P from offsets L1, L2, L3 (in order) and the flushless event producer sends those messages out to the destination.
We receive a callback for the events in the order of L2, L1, L3 confirming that it is produced successfully in that order.
1. Flushless event producer with Comparable offsets
Callback for L2
{ L1,
L2, L3 }Inflights: L1, L3
Acked: L2
Since there is no acked checkpoint that is smaller than the oldest inflight checkpoint, there is no safe checkpoint to send back to the connector here.
Safe checkpoint:
<none>
Callback for L1
{
L1,L2, L3}Inflights: L3
Acked: L1, L2
Following the algorithm the largest acked checkpoint that is still smaller than the oldest inflight checkpoint is L2. The FlushlessEventProducerHandler removes all checkpoints in acked list that are smaller than the oldest inflight checkpoint, and then returns the largest (L2) to the connector as the safe checkpoint for topic T partition P.
Safe checkpoint:
L2
Callback for L3
{
L3}Inflights: {}
Acked: L3
Safe checkpoint:
L3
2. Flushless event producer with Non Comparable offsets
Callback for L2
{ L1,
L2, L3 }Safe checkpoint:
<none>
Callback for L1
{
L1,L2, L3}Since there is a contiguous prefix of acknowledged checkpoints, update the safe checkpoint to be the last removed checkpoint from the deque.
Safe checkpoint:
L2
Callback for L3
{
L3}Since there is a contiguous prefix of acknowledged checkpoints, update the safe checkpoint to be the last removed checkpoint from the deque.
Safe checkpoint:
L3