Skip to content

Commit

Permalink
Remove COMPRESSOR variable from CompressorFactory
Browse files Browse the repository at this point in the history
Removed a deprecated `COMPRESSOR` variable from
`CompressorFactory` and use `DEFLATE_COMPRESSOR` instead

Signed-off-by: Andrey Pleskach <ples@aiven.io>
  • Loading branch information
willyborankin committed Jun 4, 2023
1 parent aa21585 commit a5560dd
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang

private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
Expand All @@ -272,7 +272,7 @@ private static BytesReference serializeFullClusterState(ClusterState clusterStat

private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private CompressedXContent(byte[] compressed, int crc32) {
*/
public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream);
OutputStream compressedStream = CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(bStream);
CRC32 crc32 = new CRC32();
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) {
Expand Down Expand Up @@ -113,7 +113,7 @@ public CompressedXContent(BytesReference data) throws IOException {
this.bytes = BytesReference.toBytes(data);
this.crc32 = crc32(uncompressed());
} else {
this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data));
this.bytes = BytesReference.toBytes(CompressorFactory.DEFLATE_COMPRESSOR.compress(data));
this.crc32 = crc32(data);
}
assertConsistent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public class CompressorFactory {

public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();

@Deprecated
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;

public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();

public static final Compressor NONE_COMPRESSOR = new NoneCompressor();
Expand All @@ -62,12 +59,12 @@ public static boolean isCompressed(BytesReference bytes) {

@Nullable
public static Compressor compressor(BytesReference bytes) {
if (COMPRESSOR.isCompressed(bytes)) {
if (DEFLATE_COMPRESSOR.isCompressed(bytes)) {
// bytes should be either detected as compressed or as xcontent,
// if we have bytes that can be either detected as compressed or
// as a xcontent, we have a problem
assert XContentHelper.xContentType(bytes) == null;
return COMPRESSOR;
return DEFLATE_COMPRESSOR;
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
assert XContentHelper.xContentType(bytes) == null;
return ZSTD_COMPRESSOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
try {
serialized = CompressorFactory.COMPRESSOR.compress(updated);
serialized = CompressorFactory.DEFLATE_COMPRESSOR.compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
logger.debug(
Expand Down Expand Up @@ -1746,7 +1746,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
}

private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) {
try (InputStream input = CompressorFactory.DEFLATE_COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input),
cacheEntry.v1()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput {
this.bytesStreamOutput = bytesStreamOutput;
this.shouldCompress = shouldCompress;
if (shouldCompress) {
this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
this.stream = CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
} else {
this.stream = bytesStreamOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public TransportDecompressor(PageCacheRecycler recycler) {
public int decompress(BytesReference bytesReference) throws IOException {
int bytesConsumed = 0;
if (hasReadHeader == false) {
if (CompressorFactory.COMPRESSOR.isCompressed(bytesReference) == false) {
if (CompressorFactory.DEFLATE_COMPRESSOR.isCompressed(bytesReference) == false) {
int maxToRead = Math.min(bytesReference.length(), 10);
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
.append("] content bytes out of [")
Expand All @@ -85,7 +85,7 @@ public int decompress(BytesReference bytesReference) throws IOException {
throw new IllegalStateException(sb.toString());
}
hasReadHeader = true;
int headerLength = CompressorFactory.COMPRESSOR.headerLength();
int headerLength = CompressorFactory.DEFLATE_COMPRESSOR.headerLength();
bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength);
bytesConsumed += headerLength;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ public int decompress(BytesReference bytesReference) throws IOException {
}

public boolean canDecompress(int bytesAvailable) {
return hasReadHeader || bytesAvailable >= CompressorFactory.COMPRESSOR.headerLength();
return hasReadHeader || bytesAvailable >= CompressorFactory.DEFLATE_COMPRESSOR.headerLength();
}

public boolean isEOS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private static String format(TcpChannel channel, InboundMessage message, String
private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(streamInput));
return new InputStreamStreamInput(CompressorFactory.DEFLATE_COMPRESSOR.threadLocalInputStream(streamInput));
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testStoredValue() throws IOException {

// case 2: a value that looks compressed: this used to fail in 1.x
BytesStreamOutput out = new BytesStreamOutput();
try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalOutputStream(out)) {
try (OutputStream compressed = CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(out)) {
new BytesArray(binaryValue1).writeTo(compressed);
}
final byte[] binaryValue2 = BytesReference.toBytes(out.bytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException {
// Closing compression stream does not close underlying stream
stream.close();

assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
assertFalse(CompressorFactory.DEFLATE_COMPRESSOR.isCompressed(bytesRef));

StreamInput streamInput = bytesRef.streamInput();
byte[] actualBytes = new byte[expectedBytes.length];
Expand All @@ -83,9 +83,11 @@ public void testStreamWithCompression() throws IOException {
BytesReference bytesRef = stream.materializeBytes();
stream.close();

assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
assertTrue(CompressorFactory.DEFLATE_COMPRESSOR.isCompressed(bytesRef));

StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput()));
StreamInput streamInput = new InputStreamStreamInput(
CompressorFactory.DEFLATE_COMPRESSOR.threadLocalInputStream(bytesRef.streamInput())
);
byte[] actualBytes = new byte[expectedBytes.length];
streamInput.readBytes(actualBytes, 0, expectedBytes.length);

Expand All @@ -108,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException {
stream.write(expectedBytes);

StreamInput streamInput = new InputStreamStreamInput(
CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput())
CompressorFactory.DEFLATE_COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput())
);
byte[] actualBytes = new byte[expectedBytes.length];
EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public class TransportDecompressorTests extends OpenSearchTestCase {
public void testSimpleCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
byte randomByte = randomByte();
try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))) {
try (
OutputStream deflateStream = CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(
Streams.flushOnCloseStream(output)
)
) {
deflateStream.write(randomByte);
}

Expand All @@ -74,7 +78,7 @@ public void testMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
for (int i = 0; i < 10000; ++i) {
Expand Down Expand Up @@ -106,7 +110,7 @@ public void testIncrementalMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
CompressorFactory.DEFLATE_COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))
)
) {
for (int i = 0; i < 10000; ++i) {
Expand Down

0 comments on commit a5560dd

Please sign in to comment.