diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 676a170c9..153a4e11c 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -59,6 +59,7 @@ import com.linkedin.datastream.server.DatastreamProducerRecordBuilder; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.FlushlessEventProducerHandler; +import com.linkedin.datastream.server.NoOpTransportProviderAdminFactory; import com.linkedin.datastream.server.api.transport.SendCallback; @@ -86,7 +87,6 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa private static final String KAFKA_ORIGIN_OFFSET = "kafka-origin-offset"; private static final Duration LOCK_ACQUIRE_TIMEOUT = Duration.ofMinutes(3); private static final String TASK_LOCK_ACQUIRE_ERROR_RATE = "taskLockAcquireErrorRate"; - private static final String DATASTREAM_NAME_BASED_CLIENT_ID_FORMAT = "%s-%s"; // constants for flushless mode and flow control @@ -261,10 +261,14 @@ protected DatastreamProducerRecord translate(ConsumerRecord fromKafka, Ins builder.addEvent(envelope); builder.setEventsSourceTimestamp(eventsSourceTimestamp); builder.setSourceCheckpoint(new KafkaMirrorMakerCheckpoint(topic, partition, offset).toString()); - builder.setDestination(_datastreamTask.getDatastreamDestination() - .getConnectionString() - .replace(KafkaMirrorMakerConnector.MM_TOPIC_PLACEHOLDER, - StringUtils.isBlank(_destinationTopicPrefix) ? topic : _destinationTopicPrefix + topic)); + + if (!_datastreamTask.getTransportProviderName(). + equalsIgnoreCase(NoOpTransportProviderAdminFactory.NoOpTransportProvider.NAME)) { + builder.setDestination(_datastreamTask.getDatastreamDestination() + .getConnectionString() + .replace(KafkaMirrorMakerConnector.MM_TOPIC_PLACEHOLDER, + StringUtils.isBlank(_destinationTopicPrefix) ? topic : _destinationTopicPrefix + topic)); + } if (_isIdentityMirroringEnabled) { builder.setPartition(partition); } diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/NoOpTransportProviderAdminFactory.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/NoOpTransportProviderAdminFactory.java index d1086b6a1..32f768a22 100644 --- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/NoOpTransportProviderAdminFactory.java +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/NoOpTransportProviderAdminFactory.java @@ -34,6 +34,7 @@ public TransportProviderAdmin createTransportProviderAdmin(String transportProvi * A {@link TransportProvider} implementation that does nothing */ public static class NoOpTransportProvider implements TransportProvider { + public static final String NAME = "NoOp"; @Override public void send(String destination, DatastreamProducerRecord record, SendCallback onComplete) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java index c1aeeaae5..1eee644f5 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -271,8 +272,12 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord try { // Send the event to the transport - String destination = - record.getDestination().orElse(_datastreamTask.getDatastreamDestination().getConnectionString()); + String destination = StringUtils.EMPTY; + if (!_datastreamTask.getTransportProviderName(). + equalsIgnoreCase(NoOpTransportProviderAdminFactory.NoOpTransportProvider.NAME)) { + destination = + record.getDestination().orElse(_datastreamTask.getDatastreamDestination().getConnectionString()); + } record.setEventsSendTimestamp(System.currentTimeMillis()); long recordEventsSourceTimestamp = record.getEventsSourceTimestamp(); long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L); diff --git a/gradle/maven.gradle b/gradle/maven.gradle index e6f8f7eed..85edc5890 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "5.3.5-SNAPSHOT" + version = "5.3.6-SNAPSHOT" } subprojects {