Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 25a50ca

Browse files
committedOct 24, 2021
fix(topicdata): display __transaction_state data
1 parent 6489780 commit 25a50ca

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed
 

‎src/main/java/org/akhq/models/Record.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import io.confluent.kafka.schemaregistry.json.JsonSchema;
1010
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
1111
import kafka.coordinator.group.GroupMetadataManager;
12+
import kafka.coordinator.transaction.TransactionLog;
13+
import kafka.coordinator.transaction.TxnKey;
1214
import lombok.*;
1315
import org.akhq.configs.SchemaRegistryType;
1416
import org.akhq.utils.AvroToJsonDeserializer;
@@ -191,26 +193,36 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
191193
return new String(payload);
192194
}
193195
} else if (topic.isInternalTopic() && topic.getName().equals("__consumer_offsets")) {
194-
if (isKey) {
195-
try {
196+
try {
197+
if (isKey) {
196198
return GroupMetadataManager.readMessageKey(ByteBuffer.wrap(payload)).key().toString();
197-
} catch (Exception exception) {
198-
this.exceptions.add(Optional.ofNullable(exception.getMessage())
199-
.filter(msg -> !msg.isBlank())
200-
.orElseGet(() -> exception.getClass().getCanonicalName()));
201-
202-
return new String(payload);
203-
}
204-
} else {
205-
try {
199+
} else {
206200
return GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(payload)).toString();
207-
} catch (Exception exception) {
208-
this.exceptions.add(Optional.ofNullable(exception.getMessage())
201+
}
202+
} catch (Exception exception) {
203+
this.exceptions.add(Optional.ofNullable(exception.getMessage())
209204
.filter(msg -> !msg.isBlank())
210205
.orElseGet(() -> exception.getClass().getCanonicalName()));
211206

212-
return new String(payload);
207+
return new String(payload);
208+
}
209+
} else if (topic.isInternalTopic() && topic.getName().equals("__transaction_state")) {
210+
try {
211+
if (isKey) {
212+
TxnKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(payload));
213+
return avroToJsonSerializer.getMapper().writeValueAsString(
214+
Map.of("transactionalId", txnKey.transactionalId(), "version", txnKey.version())
215+
);
216+
} else {
217+
TxnKey txnKey = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(this.bytesKey));
218+
return avroToJsonSerializer.getMapper().writeValueAsString(TransactionLog.readTxnRecordValue(txnKey.transactionalId(), ByteBuffer.wrap(payload)));
213219
}
220+
} catch (Exception exception) {
221+
this.exceptions.add(Optional.ofNullable(exception.getMessage())
222+
.filter(msg -> !msg.isBlank())
223+
.orElseGet(() -> exception.getClass().getCanonicalName()));
224+
225+
return new String(payload);
214226
}
215227
} else {
216228
if (protobufToJsonDeserializer != null) {

0 commit comments

Comments
 (0)
Please sign in to comment.