Skip to content

Commit

Permalink
[Backport] [2.x] Enabling Compression Level Index Settings for ZSTD a…
Browse files Browse the repository at this point in the history
…nd ZSTD Codecs (#8558)

* Compression Levels Settings for zstd and zstd_no_dict (#8312)

* Enabling compression levels as index settings

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* addressing review comments

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* refactoring codec service

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* index settings assertion

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* default compression level change in the settings

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
(cherry picked from commit ca7045e)

* refactoring CodecService for 2.x

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 authored Jul 10, 2023
1 parent d3923e2 commit fc30d6c
Show file tree
Hide file tree
Showing 20 changed files with 209 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Refactor] OpenSearchException and ExceptionsHelper foundation to base class ([#7508](https://github.com/opensearch-project/OpenSearch/pull/7508))
- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908))
- Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471))
- Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312))
- Improved performance of parsing floating point numbers ([#8467](https://github.com/opensearch-project/OpenSearch/pull/8467))
- Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411))
- [Refactor] OpenSearchException streamables to a registry ([#7646](https://github.com/opensearch-project/OpenSearch/pull/7646))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.codecService(new CodecService(null, indexSettings, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
Store.FORCE_RAM_TERM_DICT,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
Expand Down
39 changes: 36 additions & 3 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.customcodecs.Lucene95CustomCodec;
import org.opensearch.index.codec.customcodecs.ZstdCodec;
import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;

import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;

/**
* Since Lucene 4.0 low level index segments are read and written through a
* codec layer that allows to use use-case specific file formats &amp;
Expand All @@ -58,11 +62,36 @@ public class CodecService {

public static final String DEFAULT_CODEC = "default";
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
/**
* the raw unfiltered lucene default. useful for testing
*/
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
public static final String ZSTD_CODEC = "zstd";
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
assert null != indexSettings;
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
codecs.put(codec, Codec.forName(codec));
}
this.codecs = codecs.immutableMap();
}

@Deprecated(since = "2.9.0", forRemoval = true)
public CodecService(@Nullable MapperService mapperService, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
if (mapperService == null) {
Expand All @@ -71,10 +100,14 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
codecs.put(ZSTD_CODEC, new ZstdCodec());
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec());
} else {
IndexSettings indexSettings = mapperService.getIndexSettings();
int compressionLevel = indexSettings == null
? Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL
: indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
private final CompressionMode zstdNoDictCompressionMode;

private final Lucene95CustomCodec.Mode mode;
private final int compressionLevel;

/** default constructor */
public Lucene95CustomStoredFieldsFormat() {
Expand All @@ -58,6 +59,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) {
this.mode = Objects.requireNonNull(mode);
this.compressionLevel = compressionLevel;
zstdCompressionMode = new ZstdCompressionMode(compressionLevel);
zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel);
}
Expand Down Expand Up @@ -122,4 +124,8 @@ StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) {
Lucene95CustomCodec.Mode getMode() {
return mode;
}

public int getCompressionLevel() {
return compressionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public ZstdCodec(int compressionLevel) {
super(Mode.ZSTD, compressionLevel);
}

public ZstdCodec(MapperService mapperService, Logger logger) {
super(Mode.ZSTD, DEFAULT_COMPRESSION_LEVEL, mapperService, logger);
public ZstdCodec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD, compressionLevel, mapperService, logger);
}

/** The name for this codec. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public ZstdNoDictCodec(int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel);
}

public ZstdNoDictCodec(MapperService mapperService, Logger logger) {
super(Mode.ZSTD_NO_DICT, DEFAULT_COMPRESSION_LEVEL, mapperService, logger);
public ZstdNoDictCodec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel, mapperService, logger);
}

/** The name for this codec. */
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
}
}, Property.IndexScope, Property.NodeScope);

/**
* Index setting to change the compression level of zstd and zstd_no_dict lucene codecs.
* Compression Level gives a trade-off between compression ratio and speed. The higher compression level results in higher compression ratio but slower compression and decompression speeds.
* This setting is <b>not</b> realtime updateable.
*/
public static final Setting<Integer> INDEX_CODEC_COMPRESSION_LEVEL_SETTING = Setting.intSetting(
"index.codec.compression_level",
3,
1,
6,
Property.IndexScope
);

/**
* Configures an index to optimize documents with auto generated ids for append only. If this setting is updated from <code>false</code>
* to <code>true</code> might not take effect immediately. In other words, disabling the optimization will be immediately applied while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public IndexShard(
assert shardRouting.initializing();
this.shardRouting = shardRouting;
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.codecService = new CodecService(mapperService, indexSettings, logger);
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.Environment;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand All @@ -69,6 +71,7 @@
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.junit.Before;
import org.opensearch.test.IndexSettingsModule;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -803,21 +806,25 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
storeException
);
}
Expand Down
Loading

0 comments on commit fc30d6c

Please sign in to comment.