Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for NoOpTransportProvider #940

Merged
merged 7 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa
private static final String KAFKA_ORIGIN_OFFSET = "kafka-origin-offset";
private static final Duration LOCK_ACQUIRE_TIMEOUT = Duration.ofMinutes(3);
private static final String TASK_LOCK_ACQUIRE_ERROR_RATE = "taskLockAcquireErrorRate";

private static final String DATASTREAM_NAME_BASED_CLIENT_ID_FORMAT = "%s-%s";
private static final String NOOP_TRANSPORT_PROVIDER_NAME = "NoOp";

// constants for flushless mode and flow control
protected static final String CONFIG_MAX_IN_FLIGHT_MSGS_THRESHOLD = "maxInFlightMessagesThreshold";
Expand Down Expand Up @@ -261,10 +261,13 @@ protected DatastreamProducerRecord translate(ConsumerRecord<?, ?> fromKafka, Ins
builder.addEvent(envelope);
builder.setEventsSourceTimestamp(eventsSourceTimestamp);
builder.setSourceCheckpoint(new KafkaMirrorMakerCheckpoint(topic, partition, offset).toString());
builder.setDestination(_datastreamTask.getDatastreamDestination()
.getConnectionString()
.replace(KafkaMirrorMakerConnector.MM_TOPIC_PLACEHOLDER,
StringUtils.isBlank(_destinationTopicPrefix) ? topic : _destinationTopicPrefix + topic));

if (!_datastreamTask.getTransportProviderName().equalsIgnoreCase(NOOP_TRANSPORT_PROVIDER_NAME)) {
builder.setDestination(_datastreamTask.getDatastreamDestination()
.getConnectionString()
.replace(KafkaMirrorMakerConnector.MM_TOPIC_PLACEHOLDER,
StringUtils.isBlank(_destinationTopicPrefix) ? topic : _destinationTopicPrefix + topic));
}
if (_isIdentityMirroringEnabled) {
builder.setPartition(partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class EventProducer implements DatastreamEventProducer {
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs";
static final String NOOP_TRANSPORT_PROVIDER_NAME = "NoOp";
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved

private static final String MODULE = EventProducer.class.getSimpleName();
private static final String METRICS_PREFIX = MODULE + MetricsAware.KEY_REGEX;
Expand Down Expand Up @@ -271,8 +272,11 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord

try {
// Send the event to the transport
String destination =
record.getDestination().orElse(_datastreamTask.getDatastreamDestination().getConnectionString());
String destination = "dummy";
jzakaryan marked this conversation as resolved.
Show resolved Hide resolved
if (!_datastreamTask.getTransportProviderName().equalsIgnoreCase(NOOP_TRANSPORT_PROVIDER_NAME)) {
destination =
record.getDestination().orElse(_datastreamTask.getDatastreamDestination().getConnectionString());
}
record.setEventsSendTimestamp(System.currentTimeMillis());
long recordEventsSourceTimestamp = record.getEventsSourceTimestamp();
long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L);
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "5.3.5-SNAPSHOT"
version = "5.3.6-SNAPSHOT"
}

subprojects {
Expand Down