Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Add fetch down converted when entryFormat=kafka #660

Merged
merged 20 commits into from
Aug 19, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -78,25 +79,23 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
// batch magic greater than the magic corresponding to the version requested by the client
// need down converted
if (batchMagic > magic) {
ConvertedRecords<MemoryRecords> convertedRecords = null;
MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
//down converted, batch magic will be set to client magic
convertedRecords = memoryRecords.downConvert(magic, startOffset, time);
log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}"
, entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
if (convertedRecords != null) {
final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf));
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
log.trace("[{}:{}] down convertedRecords not null {}, {}, {}"
ConvertedRecords<MemoryRecords> convertedRecords = memoryRecords.downConvert(magic, startOffset, time);

final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf));
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));

if (log.isTraceEnabled()) {
log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}"
, entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
} else {
throw new IOException("DownConvert failed, convertedRecords is null");
}

} else {
//not need down converted, batch magic retains the magic value written in production
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
Expand All @@ -111,7 +110,11 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
}
} catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry
// Almost all exceptions in Kafka inherit from KafkaException and will be captured and processed in KafkaApis.
// Here, whether it is down-conversion or the IOException in builder.appendWithOffset in
// decodePulsarEntryToKafkaRecords will be caught by Kafka and the KafkaException will be thrown.
// So we need to catch KafkaException here.
} catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e);
entry.release();
}
Expand Down