Skip to content

Commit

Permalink
Merge branch 'main' into derivedaggs
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 authored Jul 24, 2024
2 parents e4304eb + fcc231d commit f71f55d
Show file tree
Hide file tree
Showing 36 changed files with 1,540 additions and 69 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
- Optimize Cluster Stats Indices to precomute node level stats ([#14426](https://github.com/opensearch-project/OpenSearch/pull/14426))
- Add logic to create index templates (v2) using context field ([#14811](https://github.com/opensearch-project/OpenSearch/pull/14811))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))

### Dependencies
Expand All @@ -54,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `jackson` from 2.17.1 to 2.17.2 ([#14687](https://github.com/opensearch-project/OpenSearch/pull/14687))
- Bump `net.minidev:json-smart` from 2.5.0 to 2.5.1 ([#14748](https://github.com/opensearch-project/OpenSearch/pull/14748))
- Bump `actions/checkout` from 2 to 4 ([#14858](https://github.com/opensearch-project/OpenSearch/pull/14858))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))

### Changed
- [Tiered Caching] Move query recomputation logic outside write lock ([#14187](https://github.com/opensearch-project/OpenSearch/pull/14187))
Expand All @@ -63,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
- Make reroute iteration time-bound for large shard allocations ([#14848](https://github.com/opensearch-project/OpenSearch/pull/14848))

### Deprecated
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))
Expand Down Expand Up @@ -94,7 +97,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
- Fix constant_keyword field type used when creating index ([#14807](https://github.com/opensearch-project/OpenSearch/pull/14807))
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))
- Create new IndexInput for multi part upload ([#14888](https://github.com/opensearch-project/OpenSearch/pull/14888))
- Fix searchable snapshot failure with scripted fields ([#14411](https://github.com/opensearch-project/OpenSearch/pull/14411))
- Fix the visit of inner query for NestedQueryBuilder ([#14739](https://github.com/opensearch-project/OpenSearch/pull/14739))

### Security

Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ dependencies {
api "org.apache.commons:commons-compress:${versions.commonscompress}"
api 'org.apache.commons:commons-configuration2:2.11.0'
api "commons-io:commons-io:${versions.commonsio}"
api 'org.apache.commons:commons-lang3:3.14.0'
api 'org.apache.commons:commons-lang3:3.15.0'
implementation 'com.google.re2j:re2j:1.7'
api 'javax.servlet:servlet-api:2.5'
api "org.slf4j:slf4j-api:${versions.slf4j}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
21581109b4be710ea4b195d5760392ec284f9f11
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"indices.put_alias":{
"documentation":{
"url":"https://opensearch.org/docs/latest/api-reference/index-apis/alias/",
"url":"https://opensearch.org/docs/latest/api-reference/index-apis/update-alias/",
"description":"Creates or updates an alias."
},
"stability":"stable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
ensureGreen("test");
}

public void testBatchModeEnabled() throws Exception {
public void testBatchModeEnabledWithoutTimeout() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
Expand Down Expand Up @@ -810,6 +810,132 @@ public void testBatchModeEnabled() throws Exception {
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "20s")
.build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));
ensureRed("test");
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createNIndices(50, "test"); // this will create 50p, 50r shards
ensureStableCluster(3);
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
ClusterHealthResponse health = client().admin()
.cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
.actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());

String clusterManagerName = internalCluster().getClusterManagerName();
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataOnlyNodes.get(1)));

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
ensureStableCluster(1);

logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ShardsBatchGatewayAllocator gatewayAllocator = internalCluster().getInstance(
ShardsBatchGatewayAllocator.class,
internalCluster().getClusterManagerName()
);

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
assertEquals(100, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(0, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(0, health.getNumberOfDataNodes());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);

// wait for cluster to turn green
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());
assertEquals(0, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(100, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(2, health.getNumberOfDataNodes());
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

public void testBatchModeDisabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ protected void clusterManagerOperation(
MetadataIndexTemplateService.validateV2TemplateRequest(
state.metadata(),
simulateTemplateToAdd,
request.getIndexTemplateRequest().indexTemplate()
request.getIndexTemplateRequest().indexTemplate(),
clusterService.getClusterSettings()
);
stateWithTemplate = indexTemplateService.addIndexTemplateV2(
state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ protected void clusterManagerOperation(
MetadataIndexTemplateService.validateV2TemplateRequest(
state.metadata(),
simulateTemplateToAdd,
request.getIndexTemplateRequest().indexTemplate()
request.getIndexTemplateRequest().indexTemplate(),
clusterService.getClusterSettings()
);
stateWithTemplate = indexTemplateService.addIndexTemplateV2(
state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.Objects;

/**
* Metadata information about a template available in a template repository.
*/
Expand Down Expand Up @@ -48,13 +50,14 @@ public long version() {
* @return Metadata object based on name
*/
public static SystemTemplateMetadata fromComponentTemplate(String fullyQualifiedName) {
assert fullyQualifiedName.length() > 1 : "System template name must have at least one component";
assert fullyQualifiedName.substring(1, fullyQualifiedName.indexOf(DELIMITER, 1)).equals(COMPONENT_TEMPLATE_TYPE);
assert fullyQualifiedName.length() > DELIMITER.length() * 3 + 2 + COMPONENT_TEMPLATE_TYPE.length()
: "System template name must have all defined components";
assert (DELIMITER + fullyQualifiedName.substring(1, fullyQualifiedName.indexOf(DELIMITER, 1))).equals(COMPONENT_TEMPLATE_TYPE);

return new SystemTemplateMetadata(
Long.parseLong(fullyQualifiedName.substring(fullyQualifiedName.lastIndexOf(DELIMITER))),
Long.parseLong(fullyQualifiedName.substring(fullyQualifiedName.lastIndexOf(DELIMITER) + 1)),
COMPONENT_TEMPLATE_TYPE,
fullyQualifiedName.substring(0, fullyQualifiedName.lastIndexOf(DELIMITER))
fullyQualifiedName.substring(fullyQualifiedName.indexOf(DELIMITER, 2) + 1, fullyQualifiedName.lastIndexOf(DELIMITER))
);
}

Expand All @@ -65,4 +68,22 @@ public static SystemTemplateMetadata fromComponentTemplateInfo(String name, long
public final String fullyQualifiedName() {
return type + DELIMITER + name + DELIMITER + version;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SystemTemplateMetadata that = (SystemTemplateMetadata) o;
return version == that.version && Objects.equals(type, that.type) && Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(version, type, name);
}

@Override
public String toString() {
return "SystemTemplateMetadata{" + "version=" + version + ", type='" + type + '\'' + ", name='" + name + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void refreshTemplates(boolean verification) {
int failedLoadingRepositories = 0;
List<Exception> exceptions = new ArrayList<>();

if (loaded.compareAndSet(false, true) && enabledTemplates) {
if ((verification || loaded.compareAndSet(false, true)) && enabledTemplates) {
for (SystemTemplatesPlugin plugin : systemTemplatesPluginList) {
try (SystemTemplateRepository repository = plugin.loadRepository()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.Objects;

/**
* The information to uniquely identify a template repository.
*/
Expand All @@ -31,4 +33,22 @@ public String id() {
public long version() {
return version;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TemplateRepositoryMetadata that = (TemplateRepositoryMetadata) o;
return version == that.version && Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hash(id, version);
}

@Override
public String toString() {
return "TemplateRepositoryMetadata{" + "id='" + id + '\'' + ", version=" + version + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.metadata.ComponentTemplateMetadata;
import org.opensearch.cluster.metadata.DataStreamMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -94,9 +95,10 @@ public abstract class OpenSearchNodeCommand extends EnvironmentAwareCommand {
public <T, C> T parseNamedObject(Class<T> categoryClass, String name, XContentParser parser, C context) throws IOException {
// Currently, two unknown top-level objects are present
if (Metadata.Custom.class.isAssignableFrom(categoryClass)) {
if (DataStreamMetadata.TYPE.equals(name)) {
if (DataStreamMetadata.TYPE.equals(name) || ComponentTemplateMetadata.TYPE.equals(name)) {
// DataStreamMetadata is used inside Metadata class for validation purposes and building the indicesLookup,
// therefor even es node commands need to be able to parse it.
// ComponentTemplateMetadata is used inside Metadata class for building the systemTemplatesLookup,
// therefor even OpenSearch node commands need to be able to parse it.
return super.parseNamedObject(categoryClass, name, parser, context);
// TODO: Try to parse other named objects (e.g. stored scripts, ingest pipelines) that are part of core es as well?
// Note that supporting PersistentTasksCustomMetadata is trickier, because PersistentTaskParams is a named object too.
Expand Down
Loading

0 comments on commit f71f55d

Please sign in to comment.