Skip to content

Commit

Permalink
Compression Levels Settings for zstd and zstd_no_dict (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#8312)

* Enabling compression levels as index settings

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* addressing review comments

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* refactoring codec service

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* index settings assertion

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

* default compression level change in the settings

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 authored and baba-devv committed Jul 29, 2023
1 parent bed1627 commit 43f5092
Show file tree
Hide file tree
Showing 21 changed files with 148 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908))
- Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471))
- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))
- Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312))


### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CorrelationCodecService extends CodecService {
* @param codecServiceConfig Generic codec service config
*/
public CorrelationCodecService(CodecServiceConfig codecServiceConfig) {
super(codecServiceConfig.getMapperService(), codecServiceConfig.getLogger());
super(codecServiceConfig.getMapperService(), codecServiceConfig.getIndexSettings(), codecServiceConfig.getLogger());
mapperService = codecServiceConfig.getMapperService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.codecService(new CodecService(null, indexSettings, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
Store.FORCE_RAM_TERM_DICT,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
Expand Down
19 changes: 13 additions & 6 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.customcodecs.ZstdCodec;
import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;

import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;

/**
* Since Lucene 4.0 low level index segments are read and written through a
* codec layer that allows to use use-case specific file formats &amp;
Expand All @@ -58,23 +61,27 @@ public class CodecService {

public static final String DEFAULT_CODEC = "default";
public static final String BEST_COMPRESSION_CODEC = "best_compression";
/** the raw unfiltered lucene default. useful for testing */
/**
* the raw unfiltered lucene default. useful for testing
*/
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
public static final String ZSTD_CODEC = "zstd";
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

public CodecService(@Nullable MapperService mapperService, Logger logger) {
public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
assert null != indexSettings;
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZSTD_CODEC, new ZstdCodec());
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec());
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat {
private final CompressionMode zstdNoDictCompressionMode;

private final Lucene95CustomCodec.Mode mode;
private final int compressionLevel;

/** default constructor */
public Lucene95CustomStoredFieldsFormat() {
Expand All @@ -58,6 +59,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) {
*/
public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) {
this.mode = Objects.requireNonNull(mode);
this.compressionLevel = compressionLevel;
zstdCompressionMode = new ZstdCompressionMode(compressionLevel);
zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel);
}
Expand Down Expand Up @@ -122,4 +124,8 @@ StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) {
Lucene95CustomCodec.Mode getMode() {
return mode;
}

public int getCompressionLevel() {
return compressionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public ZstdCodec(int compressionLevel) {
super(Mode.ZSTD, compressionLevel);
}

public ZstdCodec(MapperService mapperService, Logger logger) {
super(Mode.ZSTD, DEFAULT_COMPRESSION_LEVEL, mapperService, logger);
public ZstdCodec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD, compressionLevel, mapperService, logger);
}

/** The name for this codec. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public ZstdNoDictCodec(int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel);
}

public ZstdNoDictCodec(MapperService mapperService, Logger logger) {
super(Mode.ZSTD_NO_DICT, DEFAULT_COMPRESSION_LEVEL, mapperService, logger);
public ZstdNoDictCodec(MapperService mapperService, Logger logger, int compressionLevel) {
super(Mode.ZSTD_NO_DICT, compressionLevel, mapperService, logger);
}

/** The name for this codec. */
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
}
}, Property.IndexScope, Property.NodeScope);

/**
* Index setting to change the compression level of zstd and zstd_no_dict lucene codecs.
* Compression Level gives a trade-off between compression ratio and speed. The higher compression level results in higher compression ratio but slower compression and decompression speeds.
* This setting is <b>not</b> realtime updateable.
*/
public static final Setting<Integer> INDEX_CODEC_COMPRESSION_LEVEL_SETTING = Setting.intSetting(
"index.codec.compression_level",
3,
1,
6,
Property.IndexScope
);

/**
* Configures an index to optimize documents with auto generated ids for append only. If this setting is updated from <code>false</code>
* to <code>true</code> might not take effect immediately. In other words, disabling the optimization will be immediately applied while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public IndexShard(
assert shardRouting.initializing();
this.shardRouting = shardRouting;
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.codecService = new CodecService(mapperService, indexSettings, logger);
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.Environment;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand All @@ -69,6 +71,7 @@
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.junit.Before;
import org.opensearch.test.IndexSettingsModule;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -803,21 +806,25 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
storeException
);
}
Expand Down
57 changes: 49 additions & 8 deletions server/src/test/java/org/opensearch/index/codec/CodecTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,31 @@ public void testBestCompression() throws Exception {
public void testZstd() throws Exception {
Codec codec = createCodecService(false).codec("zstd");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel());
}

public void testZstdNoDict() throws Exception {
Codec codec = createCodecService(false).codec("zstd_no_dict");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel());
}

public void testZstdWithCompressionLevel() throws Exception {
int randomCompressionLevel = randomIntBetween(1, 6);
Codec codec = createCodecService(randomCompressionLevel).codec("zstd");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel());
}

public void testZstdNoDictWithCompressionLevel() throws Exception {
int randomCompressionLevel = randomIntBetween(1, 6);
Codec codec = createCodecService(randomCompressionLevel).codec("zstd_no_dict");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel());
}

public void testDefaultMapperServiceNull() throws Exception {
Expand All @@ -103,17 +123,25 @@ public void testBestCompressionMapperServiceNull() throws Exception {
public void testZstdMapperServiceNull() throws Exception {
Codec codec = createCodecService(true).codec("zstd");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel());
}

public void testZstdNoDictMapperServiceNull() throws Exception {
Codec codec = createCodecService(true).codec("zstd_no_dict");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel());
}

public void testExceptionCodecNull() {
assertThrows(IllegalArgumentException.class, () -> createCodecService(true).codec(null));
}

public void testExceptionIndexSettingsNull() {
assertThrows(AssertionError.class, () -> new CodecService(null, null, LogManager.getLogger("test")));
}

// write some docs with it, inspect .si to see this was the used compression
private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec actual) throws Exception {
SegmentReader sr = getSegmentReader(actual);
Expand All @@ -130,17 +158,29 @@ private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expect
}

private CodecService createCodecService(boolean isMapperServiceNull) throws IOException {

Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
if (isMapperServiceNull) {
return new CodecService(null, LogManager.getLogger("test"));
return new CodecService(null, IndexSettingsModule.newIndexSettings("_na", nodeSettings), LogManager.getLogger("test"));
}
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings settings = IndexSettingsModule.newIndexSettings("_na", nodeSettings);
SimilarityService similarityService = new SimilarityService(settings, null, Collections.emptyMap());
IndexAnalyzers indexAnalyzers = createTestAnalysis(settings, nodeSettings).indexAnalyzers;
return buildCodecService(nodeSettings);
}

private CodecService createCodecService(int randomCompressionLevel) throws IOException {
Settings nodeSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put("index.codec.compression_level", randomCompressionLevel)
.build();
return buildCodecService(nodeSettings);
}

private CodecService buildCodecService(Settings nodeSettings) throws IOException {

IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("_na", nodeSettings);
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, nodeSettings).indexAnalyzers;
MapperRegistry mapperRegistry = new MapperRegistry(Collections.emptyMap(), Collections.emptyMap(), MapperPlugin.NOOP_FIELD_FILTER);
MapperService service = new MapperService(
settings,
indexSettings,
indexAnalyzers,
xContentRegistry(),
similarityService,
Expand All @@ -149,7 +189,7 @@ private CodecService createCodecService(boolean isMapperServiceNull) throws IOEx
() -> false,
null
);
return new CodecService(service, LogManager.getLogger("test"));
return new CodecService(service, indexSettings, LogManager.getLogger("test"));
}

private SegmentReader getSegmentReader(Codec codec) throws IOException {
Expand All @@ -166,4 +206,5 @@ private SegmentReader getSegmentReader(Codec codec) throws IOException {
dir.close();
return sr;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,24 @@ public void testZstdNoDictLucene95CustomCodecMode() {
assertEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, lucene95CustomStoredFieldsFormat.getMode());
}

public void testZstdModeWithCompressionLevel() {
int randomCompressionLevel = randomIntBetween(1, 6);
Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(
Lucene95CustomCodec.Mode.ZSTD,
randomCompressionLevel
);
assertEquals(Lucene95CustomCodec.Mode.ZSTD, lucene95CustomStoredFieldsFormat.getMode());
assertEquals(randomCompressionLevel, lucene95CustomStoredFieldsFormat.getCompressionLevel());
}

public void testZstdNoDictLucene95CustomCodecModeWithCompressionLevel() {
int randomCompressionLevel = randomIntBetween(1, 6);
Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(
Lucene95CustomCodec.Mode.ZSTD_NO_DICT,
randomCompressionLevel
);
assertEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, lucene95CustomStoredFieldsFormat.getMode());
assertEquals(randomCompressionLevel, lucene95CustomStoredFieldsFormat.getCompressionLevel());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
return Optional.of(new CodecService(null, indexSettings, LogManager.getLogger(getClass())));
}

@Override
Expand All @@ -195,7 +195,7 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
return Optional.of(new CodecService(null, indexSettings, LogManager.getLogger(getClass())));
}
}

Expand All @@ -207,7 +207,9 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(IndexSettings indexSettings) {
return Optional.of(config -> new CodecService(config.getMapperService(), LogManager.getLogger(getClass())));
return Optional.of(
config -> new CodecService(config.getMapperService(), config.getIndexSettings(), LogManager.getLogger(getClass()))
);
}
}

Expand Down
Loading

0 comments on commit 43f5092

Please sign in to comment.