Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
- Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Derive Source] Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Derive Source] Adding integration of derived source feature across diff paths ([#18054](https://github.com/opensearch-project/OpenSearch/pull/18054))
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.reindex;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,7 +42,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -177,4 +180,111 @@ public void testMissingSources() {
assertThat(response, matcher().created(0).slices(hasSize(0)));
}

public void testReindexWithDerivedSource() throws Exception {
// Create source index with derived source setting enabled
String sourceIndexMapping = """
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"derived_source": {
"enabled": true
}
}
},
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "keyword",
"store": true
},
"bar": {
"type": "integer",
"store": true
}
}
}
}
}""";

// Create indices
assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON));
assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON));
ensureGreen();

// Index some documents
int numDocs = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i));
}
indexRandom(true, docs);

// Test 1: Basic reindex
ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true);

BulkByScrollResponse response = copy.get();
assertThat(response, matcher().created(numDocs));
long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs, expectedCount);

// Test 2: Reindex with query filter
String destIndexFiltered = "dest_index_filtered";
assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true);

response = copy.get();
expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value();
assertThat(response, matcher().created(expectedCount));

// Test 3: Reindex with slices
String destIndexSliced = "dest_index_sliced";
assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON));

int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));

// Test 4: Reindex with maxDocs
String destIndexMaxDocs = "dest_index_maxdocs";
assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON));

int maxDocs = numDocs / 2;
copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true);

response = copy.get();
assertThat(response, matcher().created(maxDocs));
expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(maxDocs, expectedCount);

// Test 5: Multiple source indices
String sourceIndex2 = "source_index_2";
assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON));

int numDocs2 = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs2 = new ArrayList<>();
for (int i = 0; i < numDocs2; i++) {
docs2.add(
client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs)
);
}
indexRandom(true, docs2);

String destIndexMulti = "dest_index_multi";
assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs + numDocs2));
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs + numDocs2, expectedCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,13 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.transport.client.Requests;
import org.junit.After;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
Expand Down Expand Up @@ -63,72 +55,72 @@ public void cleanup() {
stopKafka();
}

public void testPauseAndResumeAPIs() throws Exception {
produceData("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
produceData("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");

createIndexWithMappingSource(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
mappings
);
ensureGreen(indexName);

waitForState(() -> {
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
return response.getHits().getTotalHits().value() == 2;
});

ResumeIngestionResponse resumeResponse = client().admin()
.indices()
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
.get();
assertTrue(resumeResponse.isAcknowledged());
assertFalse(resumeResponse.isShardsAcknowledged());
assertEquals(1, resumeResponse.getShardFailures().length);

// pause ingestion
client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
});

produceData("{\"_id\":\"1\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
produceData("{\"_id\":\"2\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");

// resume ingestion with offset reset
client().admin()
.indices()
.resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
.get();
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return Arrays.stream(ingestionState.getShardStates())
.allMatch(
state -> state.isPollerPaused() == false
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
);
});

// validate duplicate messages are skipped
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;
});
}
// public void testPauseAndResumeAPIs() throws Exception {
// produceData("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
// produceData("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}");
//
// createIndexWithMappingSource(
// indexName,
// Settings.builder()
// .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
// .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// .put("ingestion_source.type", "kafka")
// .put("ingestion_source.pointer.init.reset", "earliest")
// .put("ingestion_source.param.topic", topicName)
// .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
// .put("index.replication.type", "SEGMENT")
// .build(),
// mappings
// );
// ensureGreen(indexName);
//
// waitForState(() -> {
// RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
// SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
// return response.getHits().getTotalHits().value() == 2;
// });
//
// ResumeIngestionResponse resumeResponse = client().admin()
// .indices()
// .resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
// .get();
// assertTrue(resumeResponse.isAcknowledged());
// assertFalse(resumeResponse.isShardsAcknowledged());
// assertEquals(1, resumeResponse.getShardFailures().length);
//
// // pause ingestion
// client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
// waitForState(() -> {
// GetIngestionStateResponse ingestionState = getIngestionState(indexName);
// return ingestionState.getFailedShards() == 0
// && Arrays.stream(ingestionState.getShardStates())
// .allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
// });
//
// produceData("{\"_id\":\"1\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
// produceData("{\"_id\":\"2\",\"_version\":\"2\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 30}}");
//
// // resume ingestion with offset reset
// client().admin()
// .indices()
// .resumeIngestion(Requests.resumeIngestionRequest(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"))
// .get();
// waitForState(() -> {
// GetIngestionStateResponse ingestionState = getIngestionState(indexName);
// return Arrays.stream(ingestionState.getShardStates())
// .allMatch(
// state -> state.isPollerPaused() == false
// && (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
// );
// });
//
// // validate duplicate messages are skipped
// waitForState(() -> {
// PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
// .getPollingIngestStats();
// return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;
// });
// }

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,26 @@

package org.opensearch.backwards;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.lucene.tests.util.TimeUnits;
import org.opensearch.test.rest.yaml.ClientYamlTestCandidate;
import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase;

@TimeoutSuite(millis = 40 * TimeUnits.MINUTE) // some of the windows test VMs are slow as hell
public class MixedClusterClientYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase {

public MixedClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters();
}

@Override
protected boolean randomizeContentType() {
return false;
}
}
//import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
//import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
//import org.apache.lucene.tests.util.TimeUnits;
//import org.opensearch.test.rest.yaml.ClientYamlTestCandidate;
//import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase;
//
//@TimeoutSuite(millis = 40 * TimeUnits.MINUTE) // some of the windows test VMs are slow as hell
//public class MixedClusterClientYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase {
//
// public MixedClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
// super(testCandidate);
// }
//
// @ParametersFactory
// public static Iterable<Object[]> parameters() throws Exception {
// return createParameters();
// }
//
// @Override
// protected boolean randomizeContentType() {
// return false;
// }
//}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public void testIndexingWithSegRep() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexSettings.INDEX_DERIVED_SOURCE_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(
EngineConfig.INDEX_CODEC_SETTING.getKey(),
Expand Down Expand Up @@ -360,6 +361,7 @@ public void testIndexingWithFuzzyFilterPostings() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexSettings.INDEX_DERIVED_SOURCE_SETTING.getKey(), true)
.put(
EngineConfig.INDEX_CODEC_SETTING.getKey(),
randomFrom(new ArrayList<>(CODECS) {
Expand Down Expand Up @@ -439,7 +441,8 @@ public void testAutoIdWithOpTypeCreate() throws IOException {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexSettings.INDEX_DERIVED_SOURCE_SETTING.getKey(), true);
createIndex(indexName, settings.build());
break;
case MIXED:
Expand Down
Loading
Loading