Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tsdb binary doc value format to use compression from LUCENE-9211. #105419

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.codec.tsdb;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LongValues;
import org.apache.lucene.util.compress.LZ4;

import java.io.IOException;

/**
* Decompresses binary doc values for {@link org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesProducer}
*/
final class BinaryDecoder {

private final LongValues addresses;
private final IndexInput compressedData;
// Cache of last uncompressed block
private long lastBlockId = -1;
private final int[] uncompressedDocStarts;
private int uncompressedBlockLength = 0;
private final byte[] uncompressedBlock;
private final BytesRef uncompressedBytesRef;
private final int docsPerChunk;
private final int docsPerChunkShift;

BinaryDecoder(LongValues addresses, IndexInput compressedData, int biggestUncompressedBlockSize, int docsPerChunkShift) {
super();
this.addresses = addresses;
this.compressedData = compressedData;
// pre-allocate a byte array large enough for the biggest uncompressed block needed.
this.uncompressedBlock = new byte[biggestUncompressedBlockSize];
uncompressedBytesRef = new BytesRef(uncompressedBlock);
this.docsPerChunk = 1 << docsPerChunkShift;
this.docsPerChunkShift = docsPerChunkShift;
uncompressedDocStarts = new int[docsPerChunk + 1];

}

BytesRef decode(int docId) throws IOException {
int blockId = docId >> docsPerChunkShift;
int docInBlockId = docId % docsPerChunk;
assert docInBlockId < docsPerChunk;

// already read and uncompressed?
if (blockId != lastBlockId) {
lastBlockId = blockId;
long blockStartOffset = addresses.get(blockId);
compressedData.seek(blockStartOffset);

uncompressedBlockLength = 0;

int docsPerChunk = compressedData.readVInt();

int onlyLength = -1;
for (int i = 0; i < docsPerChunk; i++) {
if (i == 0) {
// The first length value is special. It is shifted and has a bit to denote if
// all other values are the same length
int lengthPlusSameInd = compressedData.readVInt();
int sameIndicator = lengthPlusSameInd & 1;
int firstValLength = lengthPlusSameInd >>> 1;
if (sameIndicator == 1) {
onlyLength = firstValLength;
}
uncompressedBlockLength += firstValLength;
} else {
if (onlyLength == -1) {
// Various lengths are stored - read each from disk
uncompressedBlockLength += compressedData.readVInt();
} else {
// Only one length
uncompressedBlockLength += onlyLength;
}
}
uncompressedDocStarts[i + 1] = uncompressedBlockLength;
}

if (uncompressedBlockLength == 0) {
uncompressedBytesRef.offset = 0;
uncompressedBytesRef.length = 0;
return uncompressedBytesRef;
}

assert uncompressedBlockLength <= uncompressedBlock.length;
LZ4.decompress(compressedData, uncompressedBlockLength, uncompressedBlock, 0);
}

uncompressedBytesRef.offset = uncompressedDocStarts[docInBlockId];
uncompressedBytesRef.length = uncompressedDocStarts[docInBlockId + 1] - uncompressedBytesRef.offset;
return uncompressedBytesRef;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.codec.tsdb;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.compress.LZ4;
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.elasticsearch.core.IOUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;

import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT;

/**
* Compresses binary doc values for {@link ES87TSDBDocValuesConsumer}
*/
final class CompressedBinaryBlockWriter implements Closeable {
private final SegmentWriteState state;
private final IndexOutput meta, data;
private final long blockAddressesStart;
private final IndexOutput tempBinaryOffsets;
private final LZ4.FastCompressionHashTable ht = new LZ4.FastCompressionHashTable();

int uncompressedBlockLength = 0;
int maxUncompressedBlockLength = 0;
int numDocsInCurrentBlock = 0;
final int[] docLengths = new int[ES87TSDBDocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK];
byte[] block = BytesRef.EMPTY_BYTES;
int totalChunks = 0;
long maxPointer = 0;

CompressedBinaryBlockWriter(SegmentWriteState state, IndexOutput meta, IndexOutput data) throws IOException {
this.state = state;
this.meta = meta;
this.data = data;

tempBinaryOffsets = state.directory.createTempOutput(state.segmentInfo.name, "binary_pointers", state.context);
boolean success = false;
try {
CodecUtil.writeHeader(
tempBinaryOffsets,
ES87TSDBDocValuesFormat.META_CODEC + "FilePointers",
ES87TSDBDocValuesFormat.VERSION_CURRENT
);
blockAddressesStart = data.getFilePointer();
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
}
}
}

void addDoc(BytesRef v) throws IOException {
docLengths[numDocsInCurrentBlock] = v.length;
block = ArrayUtil.grow(block, uncompressedBlockLength + v.length);
System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length);
uncompressedBlockLength += v.length;
numDocsInCurrentBlock++;
long blockSize = RamUsageEstimator.sizeOf(block);
if (blockSize >= ES87TSDBDocValuesFormat.MAX_COMPRESSED_BLOCK_SIZE
|| numDocsInCurrentBlock == ES87TSDBDocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK) {
flushData();
}
}

void flushData() throws IOException {
if (numDocsInCurrentBlock > 0) {
// Write offset to this block to temporary offsets file
totalChunks++;
long thisBlockStartPointer = data.getFilePointer();
data.writeVInt(numDocsInCurrentBlock);
// Optimisation - check if all lengths are same
boolean allLengthsSame = true;
for (int i = 1; i < numDocsInCurrentBlock; i++) {
if (docLengths[i] != docLengths[i - 1]) {
allLengthsSame = false;
break;
}
}
if (allLengthsSame) {
// Only write one value shifted. Steal a bit to indicate all other lengths are the same
int onlyOneLength = (docLengths[0] << 1) | 1;
data.writeVInt(onlyOneLength);
} else {
for (int i = 0; i < numDocsInCurrentBlock; i++) {
if (i == 0) {
// Write first value shifted and steal a bit to indicate other lengths are to follow
int multipleLengths = (docLengths[0] << 1);
data.writeVInt(multipleLengths);
} else {
data.writeVInt(docLengths[i]);
}
}
}
maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
LZ4.compress(block, 0, uncompressedBlockLength, data, ht);
numDocsInCurrentBlock = 0;
// Ensure initialized with zeroes because full array is always written
Arrays.fill(docLengths, 0);
uncompressedBlockLength = 0;
maxPointer = data.getFilePointer();
tempBinaryOffsets.writeVLong(maxPointer - thisBlockStartPointer);
}
}

void writeMetaData() throws IOException {
if (totalChunks == 0) {
return;
}

long startDMW = data.getFilePointer();
meta.writeLong(startDMW);

meta.writeVInt(totalChunks);
meta.writeVInt(ES87TSDBDocValuesFormat.BINARY_BLOCK_SHIFT);
meta.writeVInt(maxUncompressedBlockLength);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);

CodecUtil.writeFooter(tempBinaryOffsets);
IOUtils.close(tempBinaryOffsets);
// write the compressed block offsets info to the meta file by reading from temp file
try (ChecksumIndexInput filePointersIn = state.directory.openChecksumInput(tempBinaryOffsets.getName(), IOContext.READONCE)) {
CodecUtil.checkHeader(
filePointersIn,
ES87TSDBDocValuesFormat.META_CODEC + "FilePointers",
ES87TSDBDocValuesFormat.VERSION_CURRENT,
ES87TSDBDocValuesFormat.VERSION_CURRENT
);
Throwable priorE = null;
try {
final DirectMonotonicWriter filePointers = DirectMonotonicWriter.getInstance(
meta,
data,
totalChunks,
DIRECT_MONOTONIC_BLOCK_SHIFT
);
long fp = blockAddressesStart;
for (int i = 0; i < totalChunks; ++i) {
filePointers.add(fp);
fp += filePointersIn.readVLong();
}
if (maxPointer < fp) {
throw new CorruptIndexException(
"File pointers don't add up (" + fp + " vs expected " + maxPointer + ")",
filePointersIn
);
}
filePointers.finish();
} catch (Throwable e) {
priorE = e;
} finally {
CodecUtil.checkFooter(filePointersIn, priorE);
}
}
// Write the length of the DMW block in the data
meta.writeLong(data.getFilePointer() - startDMW);
}

@Override
public void close() throws IOException {
if (tempBinaryOffsets != null) {
IOUtils.close(tempBinaryOffsets);
state.directory.deleteFile(tempBinaryOffsets.getName());
}
}

}
Loading