Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ protected abstract void producerSendFailed(
List<SourceRecord> toSend;
protected Map<String, String> taskConfig;
protected boolean started = false;
private volatile boolean producerClosed = false;

protected AbstractWorkerSourceTask(ConnectorTaskId id,
SourceTask task,
Expand Down Expand Up @@ -315,6 +316,7 @@ protected void close() {

private void closeProducer(Duration duration) {
if (producer != null) {
producerClosed = true;
Utils.closeQuietly(() -> producer.close(duration), "source task producer");
}
}
Expand Down Expand Up @@ -397,7 +399,11 @@ boolean sendRecords() {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
log.debug("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
if (producerClosed) {
Copy link
Contributor Author

@C0urante C0urante Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this check to clean up the flood of ERROR-level log messages that occurs when the task's producer is closed while there are still in-flight messages.

This issue was not specific to these integration tests or to KIP-618, but it clogged up the logs for these tests badly enough that a small tweak in the code base to address it seemed warranted.

log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", AbstractWorkerSourceTask.this, topic, e);
} else {
log.error("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
}
log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord);
producerSendFailed(false, producerRecord, preTransformRecord, e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ protected void producerSendFailed(
);
commitTaskRecord(preTransformRecord, null);
} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector,

@PUT
@Path("/{connector}/fence")
@Operation(hidden = true, summary = "This operation is only for inter-worker communications")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this line to hide the newly-introduced zombie fencing API from the OpenAPI spec that we started generating with #12067.

public void fenceZombies(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public void deleteTask(String taskId) {
taskHandles.remove(taskId);
}

/**
* Delete all task handles for this connector.
*/
public void clearTasks() {
log.info("Clearing {} existing task handles for connector {}", taskHandles.size(), connectorName);
taskHandles.clear();
}

/**
* Set the number of expected records for this connector.
*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
Expand All @@ -32,6 +35,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -47,6 +51,20 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);

public static final String TOPIC_CONFIG = "topic";
public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";

public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = "custom.exactly.once.support";
public static final String EXACTLY_ONCE_SUPPORTED = "supported";
public static final String EXACTLY_ONCE_UNSUPPORTED = "unsupported";
public static final String EXACTLY_ONCE_NULL = "null";
public static final String EXACTLY_ONCE_FAIL = "fail";

public static final String CUSTOM_TRANSACTION_BOUNDARIES_CONFIG = "custom.transaction.boundaries";
public static final String TRANSACTION_BOUNDARIES_SUPPORTED = "supported";
public static final String TRANSACTION_BOUNDARIES_UNSUPPORTED = "unsupported";
public static final String TRANSACTION_BOUNDARIES_NULL = "null";
public static final String TRANSACTION_BOUNDARIES_FAIL = "fail";

private String connectorName;
private ConnectorHandle connectorHandle;
private Map<String, String> commonConfigs;
Expand Down Expand Up @@ -74,7 +92,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>(commonConfigs);
config.put("connector.name", connectorName);
config.put("task.id", connectorName + "-" + i);
config.put("task.id", taskId(connectorName, i));
configs.add(config);
}
return configs;
Expand All @@ -92,18 +110,55 @@ public ConfigDef config() {
return new ConfigDef();
}

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
String supportLevel = connectorConfig.getOrDefault(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null").toLowerCase(Locale.ROOT);
switch (supportLevel) {
case EXACTLY_ONCE_SUPPORTED:
return ExactlyOnceSupport.SUPPORTED;
case EXACTLY_ONCE_UNSUPPORTED:
return ExactlyOnceSupport.UNSUPPORTED;
case EXACTLY_ONCE_FAIL:
throw new ConnectException("oops");
default:
case EXACTLY_ONCE_NULL:
return null;
}
}

@Override
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
String supportLevel = connectorConfig.getOrDefault(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, TRANSACTION_BOUNDARIES_UNSUPPORTED).toLowerCase(Locale.ROOT);
switch (supportLevel) {
case TRANSACTION_BOUNDARIES_SUPPORTED:
return ConnectorTransactionBoundaries.SUPPORTED;
case TRANSACTION_BOUNDARIES_FAIL:
throw new ConnectException("oh no :(");
case TRANSACTION_BOUNDARIES_NULL:
return null;
default:
case TRANSACTION_BOUNDARIES_UNSUPPORTED:
return ConnectorTransactionBoundaries.UNSUPPORTED;
}
}

public static String taskId(String connectorName, int taskId) {
return connectorName + "-" + taskId;
}

public static class MonitorableSourceTask extends SourceTask {
private String connectorName;
private String taskId;
private String topicName;
private TaskHandle taskHandle;
private volatile boolean stopped;
private long startingSeqno;
private long seqno;
private long throughput;
private int batchSize;
private ThroughputThrottler throttler;

private long priorTransactionBoundary;
private long nextTransactionBoundary;

@Override
public String version() {
return "unknown";
Expand All @@ -112,21 +167,24 @@ public String version() {
@Override
public void start(Map<String, String> props) {
taskId = props.get("task.id");
connectorName = props.get("connector.name");
String connectorName = props.get("connector.name");
topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
throughput = Long.parseLong(props.getOrDefault("throughput", "-1"));
batchSize = Integer.parseInt(props.getOrDefault("messages.per.poll", "1"));
batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
Map<String, Object> offset = Optional.ofNullable(
context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
seqno = startingSeqno;
log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(), taskId, props);
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
throttler = new ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")), System.currentTimeMillis());
taskHandle.recordTaskStart();
priorTransactionBoundary = 0;
nextTransactionBoundary = 1;
if (Boolean.parseBoolean(props.getOrDefault("task-" + taskId + ".start.inject.error", "false"))) {
throw new RuntimeException("Injecting errors during task start");
}
calculateNextBoundary();
}

@Override
Expand All @@ -136,19 +194,24 @@ public List<SourceRecord> poll() {
throttler.throttle();
}
taskHandle.record(batchSize);
log.info("Returning batch of {} records", batchSize);
log.trace("Returning batch of {} records", batchSize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was way too noisy at INFO-level.

return LongStream.range(0, batchSize)
.mapToObj(i -> new SourceRecord(
Collections.singletonMap("task.id", taskId),
Collections.singletonMap("saved", ++seqno),
topicName,
null,
Schema.STRING_SCHEMA,
"key-" + taskId + "-" + seqno,
Schema.STRING_SCHEMA,
"value-" + taskId + "-" + seqno,
null,
new ConnectHeaders().addLong("header-" + seqno, seqno)))
.mapToObj(i -> {
seqno++;
SourceRecord record = new SourceRecord(
sourcePartition(taskId),
sourceOffset(seqno),
topicName,
null,
Schema.STRING_SCHEMA,
"key-" + taskId + "-" + seqno,
Schema.STRING_SCHEMA,
"value-" + taskId + "-" + seqno,
null,
new ConnectHeaders().addLong("header-" + seqno, seqno));
maybeDefineTransactionBoundary(record);
return record;
})
.collect(Collectors.toList());
}
return null;
Expand All @@ -172,5 +235,43 @@ public void stop() {
stopped = true;
taskHandle.recordTaskStop();
}

/**
* Calculate the next transaction boundary, i.e., the seqno whose corresponding source record should be used to
* either {@link org.apache.kafka.connect.source.TransactionContext#commitTransaction(SourceRecord) commit}
* or {@link org.apache.kafka.connect.source.TransactionContext#abortTransaction(SourceRecord) abort} the next transaction.
* <p>
* This connector defines transactions whose size correspond to successive elements of the Fibonacci sequence,
* where transactions with an even number of records are aborted, and those with an odd number of records are committed.
*/
private void calculateNextBoundary() {
while (nextTransactionBoundary <= seqno) {
nextTransactionBoundary += priorTransactionBoundary;
priorTransactionBoundary = nextTransactionBoundary - priorTransactionBoundary;
}
Comment on lines +247 to +251
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention something about Fibonacci numbers in comment here? I think it's not obvious what we're trying to achieve here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, done 👍

}

private void maybeDefineTransactionBoundary(SourceRecord record) {
if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
return;
}
// If the transaction boundary ends on an even-numbered offset, abort it
// Otherwise, commit
boolean abort = nextTransactionBoundary % 2 == 0;
calculateNextBoundary();
if (abort) {
context.transactionContext().abortTransaction(record);
} else {
context.transactionContext().commitTransaction(record);
}
}
}

public static Map<String, Object> sourcePartition(String taskId) {
return Collections.singletonMap("task.id", taskId);
}

public static Map<String, Object> sourceOffset(long seqno) {
return Collections.singletonMap("saved", seqno);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class EmbeddedConnectClusterAssertions {
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5);
public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2);
// Creating a connector requires two rounds of rebalance; destroying one only requires one
// Assume it'll take ~half the time to destroy a connector as it does to create one
public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);

private final EmbeddedConnectCluster connect;
Expand Down
Loading