Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2404,13 +2404,16 @@ private boolean requestNotSignedProperly(InternalRequestSignature requestSignatu
requestSignature.keyAlgorithm(),
keySignatureVerificationAlgorithms
));
} else {
if (!requestSignature.isValid(sessionKey)) {
requestValidationError = new ConnectRestException(
Response.Status.FORBIDDEN,
"Internal request contained invalid signature."
);
}
} else if (sessionKey == null) {
requestValidationError = new ConnectRestException(
Response.Status.SERVICE_UNAVAILABLE,
"This worker is still starting up and has not been able to read a session key from the config topic yet"
);
} else if (!requestSignature.isValid(sessionKey)) {
requestValidationError = new ConnectRestException(
Response.Status.FORBIDDEN,
"Internal request contained invalid signature."
);
}
if (requestValidationError != null) {
callback.onCompletion(requestValidationError, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.tools.ThroughputThrottler;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -31,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The
Expand All @@ -48,6 +51,7 @@ public class VerifiableSourceTask extends SourceTask {
public static final String ID_CONFIG = "id";
public static final String TOPIC_CONFIG = "topic";
public static final String THROUGHPUT_CONFIG = "throughput";
public static final String COMPLETE_RECORD_DATA_CONFIG = "complete.record.data";

private static final String ID_FIELD = "id";
private static final String SEQNO_FIELD = "seqno";
Expand All @@ -61,6 +65,15 @@ public class VerifiableSourceTask extends SourceTask {
private long startingSeqno;
private long seqno;
private ThroughputThrottler throttler;
private boolean completeRecordData;

private static final Schema COMPLETE_VALUE_SCHEMA = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("task", Schema.INT32_SCHEMA)
.field("topic", Schema.STRING_SCHEMA)
.field("time_ms", Schema.INT64_SCHEMA)
.field("seqno", Schema.INT64_SCHEMA)
.build();

@Override
public String version() {
Expand All @@ -87,6 +100,7 @@ public void start(Map<String, String> props) {
seqno = 0;
startingSeqno = seqno;
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
completeRecordData = "true".equalsIgnoreCase(props.get(COMPLETE_RECORD_DATA_CONFIG));

log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno);
}
Expand Down Expand Up @@ -114,7 +128,9 @@ public List<SourceRecord> poll() {
System.out.println(dataJson);

Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno);
Schema valueSchema = completeRecordData ? COMPLETE_VALUE_SCHEMA : Schema.INT64_SCHEMA;
Object value = completeRecordData ? completeValue(data) : seqno;
SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, valueSchema, value);
List<SourceRecord> result = Collections.singletonList(srcRecord);
seqno++;
return result;
Expand All @@ -141,6 +157,15 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) {

@Override
public void stop() {
throttler.wakeup();
if (throttler != null)
throttler.wakeup();
}

private Object completeValue(Map<String, Object> data) {
Struct result = new Struct(COMPLETE_VALUE_SCHEMA);
Stream.of("name", "task", "topic", "time_ms", "seqno").forEach(
field -> result.put(field, data.get(field))
);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public KafkaBasedLog(String topic,
this.readLogEndOffsetCallbacks = new ArrayDeque<>();
this.time = time;
this.initializer = initializer != null ? initializer : admin -> { };
// Initialize the producer Optional here to prevent NPEs later on
this.producer = Optional.empty();

// If the consumer is configured with isolation.level = read_committed, then its end offsets method cannot be relied on
// as it will not take records from currently-open transactions into account. We want to err on the side of caution in that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@

import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
Expand Down Expand Up @@ -2773,7 +2774,15 @@ public void testPutTaskConfigsInvalidSignature() {
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes();

PowerMock.replayAll(taskConfigCb, signature);
SessionKey sessionKey = EasyMock.mock(SessionKey.class);
SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
EasyMock.expect(sessionKey.key()).andReturn(secretKey);
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());

PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);

// Read a new session key from the config topic
configUpdateListener.onSessionKeyUpdate(sessionKey);

herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);

Expand All @@ -2782,6 +2791,28 @@ public void testPutTaskConfigsInvalidSignature() {
assertEquals(FORBIDDEN.getStatusCode(), ((ConnectRestException) errorCapture.getValue()).statusCode());
}

@Test
public void putTaskConfigsWorkerStillStarting() {
Callback<Void> taskConfigCb = EasyMock.mock(Callback.class);
Capture<Throwable> errorCapture = Capture.newInstance();
taskConfigCb.onCompletion(capture(errorCapture), EasyMock.eq(null));
EasyMock.expectLastCall().once();

EasyMock.expect(member.currentProtocolVersion()).andReturn(CONNECT_PROTOCOL_V2).anyTimes();

InternalRequestSignature signature = EasyMock.mock(InternalRequestSignature.class);
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes();

PowerMock.replayAll(taskConfigCb, signature);

herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);

PowerMock.verifyAll();
assertTrue(errorCapture.getValue() instanceof ConnectRestException);
assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), ((ConnectRestException) errorCapture.getValue()).statusCode());
}

@Test
public void testPutTaskConfigsValidRequiredSignature() {
Callback<Void> taskConfigCb = EasyMock.mock(Callback.class);
Expand All @@ -2794,7 +2825,15 @@ public void testPutTaskConfigsValidRequiredSignature() {
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(true).anyTimes();

PowerMock.replayAll(taskConfigCb, signature);
SessionKey sessionKey = EasyMock.mock(SessionKey.class);
SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
EasyMock.expect(sessionKey.key()).andReturn(secretKey);
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());

PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);

// Read a new session key from the config topic
configUpdateListener.onSessionKeyUpdate(sessionKey);

herder.putTaskConfigs(CONN1, TASK_CONFIGS, taskConfigCb, signature);

Expand Down Expand Up @@ -2893,7 +2932,15 @@ public void testFenceZombiesInvalidSignature() {
EasyMock.expect(signature.keyAlgorithm()).andReturn("HmacSHA256").anyTimes();
EasyMock.expect(signature.isValid(EasyMock.anyObject())).andReturn(false).anyTimes();

PowerMock.replayAll(taskConfigCb, signature);
SessionKey sessionKey = EasyMock.mock(SessionKey.class);
SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
EasyMock.expect(sessionKey.key()).andReturn(secretKey);
EasyMock.expect(sessionKey.creationTimestamp()).andReturn(time.milliseconds());

PowerMock.replayAll(taskConfigCb, signature, sessionKey, secretKey);

// Read a new session key from the config topic
configUpdateListener.onSessionKeyUpdate(sessionKey);

herder.fenceZombieSourceTasks(CONN1, taskConfigCb, signature);

Expand Down
15 changes: 10 additions & 5 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,17 @@ def start_and_wait_to_join_group(self, node, worker_type, remote_connector_confi
err_msg="Never saw message indicating Kafka Connect joined group on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))

def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
def stop_node(self, node, clean_shutdown=True, await_shutdown=None):
if await_shutdown is None:
await_shutdown = clean_shutdown
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account) \
+ " and " + ("" if await_shutdown else "not ") + "awaiting shutdown")
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL

for pid in pids:
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
if await_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
node.account) + " took too long to exit")
Expand Down Expand Up @@ -464,13 +467,14 @@ class VerifiableSource(VerifiableConnector):
Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output.
"""

def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000):
def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000, complete_records=False):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.tasks = tasks
self.topic = topic
self.throughput = throughput
self.complete_records = complete_records

def committed_messages(self):
return list(filter(lambda m: 'committed' in m and m['committed'], self.messages()))
Expand All @@ -485,7 +489,8 @@ def start(self):
'connector.class': 'org.apache.kafka.connect.tools.VerifiableSourceConnector',
'tasks.max': self.tasks,
'topic': self.topic,
'throughput': self.throughput
'throughput': self.throughput,
'complete.record.data': self.complete_records
})


Expand Down
Loading