Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions bin/kafka-transactions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TransactionsCommand "$@"
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,8 @@ project(':tools') {
compile libs.jettyServlets

testCompile project(':clients')
testCompile libs.junitJupiter
testCompile libs.junitJupiterApi
testCompile libs.junitVintageEngine
testCompile project(':clients').sourceSets.test.output
testCompile libs.mockitoInline // supports mocking static methods, final classes, etc.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

@InterfaceStability.Evolving
public class AbortTransactionOptions extends AbstractOptions<AbortTransactionOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;

import java.util.Map;
import java.util.concurrent.ExecutionException;

@InterfaceStability.Evolving
public class AbortTransactionResult {
private final Map<TopicPartition, KafkaFutureImpl<Void>> futures;

AbortTransactionResult(Map<TopicPartition, KafkaFutureImpl<Void>> futures) {
this.futures = futures;
}

public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
.thenApply(nil -> {
for (Map.Entry<TopicPartition, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
try {
KafkaFutureImpl<Void> future = entry.getValue();
future.get();
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new KafkaException(e);
}
}
return null;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Objects;

@InterfaceStability.Evolving
public class AbortTransactionSpec {
private final TopicPartition topicPartition;
private final long producerId;
private final int producerEpoch;
private final int coordinatorEpoch;

public AbortTransactionSpec(
TopicPartition topicPartition,
long producerId,
int producerEpoch,
int coordinatorEpoch
) {
this.topicPartition = topicPartition;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.coordinatorEpoch = coordinatorEpoch;
}

public TopicPartition topicPartition() {
return topicPartition;
}

public long producerId() {
return producerId;
}

public int producerEpoch() {
return producerEpoch;
}

public int coordinatorEpoch() {
return coordinatorEpoch;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbortTransactionSpec that = (AbortTransactionSpec) o;
return producerId == that.producerId &&
producerEpoch == that.producerEpoch &&
coordinatorEpoch == that.coordinatorEpoch &&
Objects.equals(topicPartition, that.topicPartition);
}

@Override
public int hashCode() {
return Objects.hash(topicPartition, producerId, producerEpoch, coordinatorEpoch);
}

@Override
public String toString() {
return "AbortTransactionSpec{" +
"topicPartition=" + topicPartition +
", producerId=" + producerId +
", producerEpoch=" + producerEpoch +
", coordinatorEpoch=" + coordinatorEpoch +
'}';
}

}
87 changes: 87 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,93 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);

/**
* Describe producer state on a set of topic partitions. See
* {@link #describeProducers(Collection, DescribeProducersOptions)} for more details.
*
* @param partitions The set of partitions to query
* @return The result
*/
default DescribeProducersResult describeProducers(Collection<TopicPartition> partitions) {
return describeProducers(partitions, new DescribeProducersOptions());
}

/**
* Describe active producer state on a set of topic partitions. Unless a specific broker
* is requested through {@link DescribeProducersOptions#setBrokerId(int)}, this will
* query the partition leader to find the producer state.
*
* @param partitions The set of partitions to query
* @param options Options to control the method behavior
* @return The result
*/
DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options);

/**
* Describe the state of a set of transactionalIds. See
* {@link #describeTransactions(Collection, DescribeTransactionsOptions)} for more details.
*
* @param transactionalIds The set of transactionalIds to query
* @return The result
*/
default DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds) {
return describeTransactions(transactionalIds, new DescribeTransactionsOptions());
}

/**
* Describe the state of a set of transactionalIds from the respective transaction coordinators,
* which are dynamically discovered.
*
* @param transactionalIds The set of transactionalIds to query
* @param options Options to control the method behavior
* @return The result
*/
DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options);

/**
* List active transactions in the cluster. See
* {@link #listTransactions(ListTransactionsOptions)} for more details.
*
* @return The result
*/
default ListTransactionsResult listTransactions() {
return listTransactions(new ListTransactionsOptions());
}

/**
* List active transactions in the cluster. This will query all potential transaction
* coordinators in the cluster and collect the state of all transactionalIds. Users
* should typically attempt to reduce the size of the result set using
* {@link ListTransactionsOptions#filterProducerIds(Set)} or
* {@link ListTransactionsOptions#filterStates(Set)}
*
* @param options Options to control the method behavior (including filters)
* @return The result
*/
ListTransactionsResult listTransactions(ListTransactionsOptions options);

/**
* Forcefully abort a transaction which is open on a topic partition. See
* {@link #abortTransaction(AbortTransactionSpec, AbortTransactionOptions)} for more details.
*
* @param spec The transaction specification including topic partition and producer details
* @return The result
*/
default AbortTransactionResult abortTransaction(AbortTransactionSpec spec) {
return abortTransaction(spec, new AbortTransactionOptions());
}

/**
* Forcefully abort a transaction which is open on a topic partition. This will
* send a `WriteTxnMarkers` request to the partition leader in order to abort the
* transaction. This requires administrative privileges.
*
* @param spec The transaction specification including topic partition and producer details
* @param options Options to control the method behavior (including filters)
* @return The result
*/
AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortTransactionOptions options);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Objects;
import java.util.OptionalInt;

/**
* Options for {@link Admin#describeProducers(Collection)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeProducersOptions extends AbstractOptions<DescribeProducersOptions> {
private OptionalInt brokerId = OptionalInt.empty();

public DescribeProducersOptions setBrokerId(int brokerId) {
this.brokerId = OptionalInt.of(brokerId);
return this;
}

public OptionalInt brokerId() {
return brokerId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DescribeProducersOptions that = (DescribeProducersOptions) o;
return Objects.equals(brokerId, that.brokerId);
}

@Override
public int hashCode() {
return Objects.hash(brokerId);
}

@Override
public String toString() {
return "DescribeProducersOptions(" +
"brokerId=" + brokerId +
", timeoutMs=" + timeoutMs +
')';
}
}
Loading