diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java index bc3375ea0a..e24b67b83b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java @@ -178,21 +178,21 @@ private CompletableFuture conditionalUpdateData(byte[] data, long expectVe } updateFuture.complete(stat.getVersion()); }).exceptionally(ex -> { - if (ex instanceof MetadataStoreException.BadVersionException) { + if (ex.getCause() instanceof MetadataStoreException.BadVersionException) { checkProducerIdBlockMetadata(data) .thenAccept(updateFuture::complete).exceptionally(e -> { updateFuture.completeExceptionally(e); return null; }); - } else if (ex instanceof MetadataStoreException.NotFoundException) { + } else if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { log.error("Update of path {} with data {} and expected version {} failed due to {}", KOP_PID_BLOCK_ZNODE, getProducerIdBlockStr(data), expectVersion, "NoNode for path " + KOP_PID_BLOCK_ZNODE); updateFuture.completeExceptionally(new Exception("NoNode for path " + KOP_PID_BLOCK_ZNODE)); } else { log.error("Update of path {} with data {} and expected version {} exception: {}", - KOP_PID_BLOCK_ZNODE, getProducerIdBlockStr(data), expectVersion, ex); - updateFuture.completeExceptionally(new Exception("Error to update data.", ex)); + KOP_PID_BLOCK_ZNODE, getProducerIdBlockStr(data), expectVersion, ex.getCause()); + updateFuture.completeExceptionally(new Exception("Error to update data.", ex.getCause())); } return null; });