Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flushless producer supporting both comparable and non comparable offsets #873

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.datastream.common.VerifiableProperties;
import com.linkedin.datastream.kafka.factory.KafkaConsumerFactory;
import com.linkedin.datastream.kafka.factory.KafkaConsumerFactoryImpl;
import com.linkedin.datastream.server.callbackstatus.CallbackStatusFactory;
import com.linkedin.datastream.server.callbackstatus.CallbackStatusWithComparableOffsetsFactory;

/**
Expand Down Expand Up @@ -74,7 +75,8 @@ public class KafkaBasedConnectorConfig {
private final long _nonGoodStateThresholdMillis;
private final boolean _enablePartitionAssignment;

private final String _callbackStatusStrategy;
// Kafka based pub sub framework uses Long as their offset type, hence instantiating a Long parameterized factory
private final CallbackStatusFactory<Long> _callbackStatusStrategyFactory;

/**
* Constructor for KafkaBasedConnectorConfig.
Expand Down Expand Up @@ -114,8 +116,12 @@ public KafkaBasedConnectorConfig(Properties properties) {
INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID, DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID);
_enablePartitionAssignment = verifiableProperties.getBoolean(ENABLE_PARTITION_ASSIGNMENT, Boolean.FALSE);

_callbackStatusStrategy = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS,
String callbackStatusStrategyFactoryClass = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS,
CallbackStatusWithComparableOffsetsFactory.class.getName());
_callbackStatusStrategyFactory = ReflectionUtils.createInstance(callbackStatusStrategyFactoryClass);
if (_callbackStatusStrategyFactory == null) {
throw new DatastreamRuntimeException("Unable to instantiate factory class: " + callbackStatusStrategyFactoryClass);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add verification as well here and save the object rather then saving the name and putting the onus on the caller to verify and throw exception.

String factory =
verifiableProperties.getString(CONFIG_CONSUMER_FACTORY_CLASS, KafkaConsumerFactoryImpl.class.getName());
Expand Down Expand Up @@ -205,7 +211,7 @@ public boolean getEnablePartitionAssignment() {
return _enablePartitionAssignment;
}

public String getCallbackStatusStrategy() {
return _callbackStatusStrategy;
public CallbackStatusFactory<Long> getCallbackStatusStrategyFactory() {
return _callbackStatusStrategyFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea
LOG.info("Destination topic prefix has been set to {}", _destinationTopicPrefix);

if (_isFlushlessModeEnabled) {
_flushlessProducer = new FlushlessEventProducerHandler<>(_producer,
ReflectionUtils.createInstance(config.getCallbackStatusStrategy()));
_flushlessProducer = new FlushlessEventProducerHandler<Long>(_producer,
config.getCallbackStatusStrategyFactory());
_flowControlEnabled = config.getConnectorProps().getBoolean(CONFIG_FLOW_CONTROL_ENABLED, false);
_maxInFlightMessagesThreshold =
config.getConnectorProps().getLong(CONFIG_MAX_IN_FLIGHT_MSGS_THRESHOLD, DEFAULT_MAX_IN_FLIGHT_MSGS_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public FlushlessEventProducerHandler(DatastreamEventProducer eventProducer, Call
/**
* Creating a new instance of the OffsetCheckpointTrackingStrategy to be used for checkpointing
*/
public CallbackStatus<T> createCallbackStatusInstance() {
private CallbackStatus<T> createCallbackStatusInstance() {
return _callbackStatusFactory.createCallbackStatusStrategy();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ abstract public class CallbackStatus<T> {

/**
* Registers the given checkpoint.
* @param checkpoint the checkpoint to register
* @param checkpoint is the latest record acked by the producer of the underlying pub sub framework
*/
abstract public void register(T checkpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
*/
public interface CallbackStatusFactory<T> {
/**
* Create a callback status strategy
* */
* Creates a callback status strategy that checkpoints the consumer offset on successful produce of that record
* @return CallbackStatus strategy construct
*/
CallbackStatus<T> createCallbackStatusStrategy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,44 @@ public class CallbackStatusWithComparableOffsets<T extends Comparable<T>> extend

private static final Logger LOG = LoggerFactory.getLogger(CallbackStatusWithComparableOffsets.class);

private final Queue<T> _acked = new PriorityQueue<>();
private final Set<T> _inFlight = Collections.synchronizedSet(new LinkedHashSet<>());

private T _highWaterMark = null;
shrinandthakkar marked this conversation as resolved.
Show resolved Hide resolved

// the last checkpoint-ed record's offset
protected T _currentCheckpoint = null;

private final Queue<T> _acked = new PriorityQueue<>();
private final Set<T> _inFlight = Collections.synchronizedSet(new LinkedHashSet<>());

/**
* Get the latest checkpoint to be acked
* @return <T> Type of the comparable checkpoint object internally used by the connector.
*/
@Override
public T getAckCheckpoint() {
return _currentCheckpoint;
}

/**
* Get the count of the records which are in flight
*/
@Override
public long getInFlightCount() {
return _inFlight.size();
}

/**
* Get the count of the records which are all acked from the producer
*/
@Override
public long getAckMessagesPastCheckpointCount() {
return _acked.size();
}

/**
* Registers the given checkpoint by adding it to the set of in-flight checkpoints.
* @param checkpoint the checkpoint to register
* @param checkpoint is the latest record acked by the producer of the underlying pub sub framework
*/
@Override
public synchronized void register(T checkpoint) {
_inFlight.add(checkpoint);
}
Expand All @@ -57,6 +71,7 @@ public synchronized void register(T checkpoint) {
* of the high watermark, and only update the ackCheckpoint when we are sure all events before it has
* been received.
*/
shrinandthakkar marked this conversation as resolved.
Show resolved Hide resolved
@Override
public synchronized void ack(T checkpoint) {
if (!_inFlight.remove(checkpoint)) {
LOG.error("Internal state error; could not remove checkpoint {}", checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
package com.linkedin.datastream.server.callbackstatus;

/**
* Interface for CallbackStatus Factories
* Factory implementation for Callback Status With Comparable Offsets
* @param <T> The type of the offset position that the underlying pub-sub system uses.
*/
public class CallbackStatusWithComparableOffsetsFactory<T extends Comparable<T>> implements CallbackStatusFactory<T> {

/**
* Creates a callback status strategy that checkpoints the consumer offset on successful produce of that record
* with comparable offsets
* @return CallbackStatus strategy construct
*/
@Override
public CallbackStatus<T> createCallbackStatusStrategy() {
return new CallbackStatusWithComparableOffsets<T>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
*/
package com.linkedin.datastream.server.callbackstatus;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -25,56 +25,71 @@ public class CallbackStatusWithNonComparableOffsets<T> extends CallbackStatus<T>

private static final Logger LOG = LoggerFactory.getLogger(CallbackStatusWithNonComparableOffsets.class);

// the last checkpoint-ed record's offset
protected T _currentCheckpoint = null;

// Hashset storing all the records which are yet to be acked
private final Set<T> _inFlight = Collections.synchronizedSet(new LinkedHashSet<>());

// Deque to store all the messages which are inflight until the last consumer checkpoint is made
private final Deque<T> _inFlightUntilLastConsumerCheckpoint = new ArrayDeque<>();
// Deque to store all the messages which are inflight after the last consumer checkpoint is made
private final Deque<T> _inFlightAfterLastConsumerCheckpoint = new LinkedList<>();

// Hashset storing all the records for which the ack is received
private final Set<T> _acked = Collections.synchronizedSet(new HashSet<>());

// the last checkpoint-ed record's offset
protected T _currentCheckpoint = null;

/**
* Get the latest checkpoint to be acked
* @return <T> Type of the comparable checkpoint object internally used by the connector.
*/
@Override
public T getAckCheckpoint() {
shrinandthakkar marked this conversation as resolved.
Show resolved Hide resolved
return _currentCheckpoint;
}

/**
* Get the count of the records which are in flight
*/
@Override
public long getInFlightCount() {
return _inFlight.size();
}

/**
* Get all the messages which are in flight until the last checkpoint at consumer
* Get the count of the records which are all acked from the producer
*/
public long getinFlightUntilLastConsumerCheckpointCount() {
return _inFlight.size();
}

@Override
public long getAckMessagesPastCheckpointCount() {
return _acked.size();
}

/**
* Registers the given checkpoint by adding it to the deque of in-flight checkpoints.
* @param checkpoint the checkpoint to register
* @param checkpoint is the latest record acked by the producer of the underlying pub sub framework
*/
@Override
public synchronized void register(T checkpoint) {
_inFlight.add(checkpoint);
_inFlightUntilLastConsumerCheckpoint.offerLast(checkpoint);
_inFlightAfterLastConsumerCheckpoint.offerLast(checkpoint);
}

/**
* The checkpoint acknowledgement can be received out of order. So here, we track the checkpoints by adding
* them in the _acked set and only update the _currentCheckpoint if a contiguous sequence of offsets are ack-ed
* from the front of the queue.
*/
@Override
public synchronized void ack(T checkpoint) {
// adding the checkpoint in the _acked set
_acked.add(checkpoint);

// removing the checkpoint from the _inFlight set as we got acknowledgement for this checkpoint from producer
_inFlight.remove(checkpoint);
while (!_inFlightUntilLastConsumerCheckpoint.isEmpty() && !_acked.isEmpty() && _acked.contains(_inFlightUntilLastConsumerCheckpoint.peekFirst())) {
_currentCheckpoint = _inFlightUntilLastConsumerCheckpoint.pollFirst();

// Until a contiguous sequence of offsets are not ack-ed from the producer for all the consumed records, we can't
// commit new checkpoint to consumer. This loops checks for that contiguous acked offsets.
while (!_inFlightAfterLastConsumerCheckpoint.isEmpty() && !_acked.isEmpty() && _acked.contains(
_inFlightAfterLastConsumerCheckpoint.peekFirst())) {
_currentCheckpoint = _inFlightAfterLastConsumerCheckpoint.pollFirst();

if (!_acked.remove(_currentCheckpoint)) {
LOG.error("Internal state error; could not remove checkpoint {}", _currentCheckpoint);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: How will currentCheckpoints will look in case of non-comparable offsets? How will we debug the issues, i.e. stuck partitions? Offset numbers were easy to compare.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoints in non-comparable scenarios could be of byte type.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
package com.linkedin.datastream.server.callbackstatus;

/**
* Interface for CallbackStatus Factories
* Factory implementation for Callback Status With Non Comparable Offsets
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lower case for Callback Status With Non Comparable Offsets

* @param <T> The type of the offset position that the underlying pub-sub system uses.
*/
public class CallbackStatusWithNonComparableOffsetsFactory<T extends Comparable<T>> implements CallbackStatusFactory<T> {

/**
* Creates a callback status strategy that checkpoints the consumer offset on successful produce of that record
* with non comparable offsets
* @return CallbackStatus strategy construct
*/
@Override
public CallbackStatus<T> createCallbackStatusStrategy() {
return new CallbackStatusWithNonComparableOffsets<T>();
Expand Down
Loading