diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/build.gradle b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/build.gradle new file mode 100644 index 000000000000..675b22678d07 --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/build.gradle @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + publish: false, + archivesBaseName: 'beam-sdks-java-io-elasticsearch-tests-9' +) +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 9.x" +ext.summary = "Tests of ElasticsearchIO on Elasticsearch 9.x" + +def elastic_search_version = "9.0.0" + +test { + maxParallelForks = 1 +} + +configurations.testImplementation { + resolutionStrategy { + force "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version" + } +} + +dependencies { + testImplementation project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common") + testImplementation library.java.testcontainers_elasticsearch + + testImplementation project(path: ":sdks:java:core", configuration: "shadow") + testImplementation project(":sdks:java:io:elasticsearch") + testImplementation library.java.slf4j_api + testImplementation library.java.hamcrest + testImplementation library.java.junit + testImplementation "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version" + testRuntimeOnly library.java.log4j2_api + testRuntimeOnly library.java.log4j2_core + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java new file mode 100644 index 000000000000..2a6419e1665b --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; + +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.elasticsearch.client.RestClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link ElasticsearchIO} on an independent Elasticsearch v9.x instance. + * + *
This test requires a running instance of Elasticsearch, and the test dataset must exist in the + * database. See {@link ElasticsearchIOITCommon} for instructions to achieve this. + * + *
You can run this test by doing the following from the beam parent module directory with the + * correct server IP: + * + *
+ * ./gradlew integrationTest -p sdks/java/io/elasticsearch-tests/elasticsearch-tests-9 + * -DintegrationTestPipelineOptions='[ + * "--elasticsearchServer=1.2.3.4", + * "--elasticsearchHttpPort=9200"]' + * --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT + * -DintegrationTestRunner=direct + *+ * + *
It is likely that you will need to configure thread_pool.write.queue_size: 250
+ * (or higher) in the backend Elasticsearch server for this test to run.
+ */
+@RunWith(JUnit4.class)
+public class ElasticsearchIOIT {
+ private static RestClient restClient;
+ private static ElasticsearchPipelineOptions options;
+ private static ConnectionConfiguration readConnectionConfiguration;
+ private static ConnectionConfiguration writeConnectionConfiguration;
+ private static ConnectionConfiguration updateConnectionConfiguration;
+ private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ PipelineOptionsFactory.register(ElasticsearchPipelineOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(ElasticsearchPipelineOptions.class);
+ readConnectionConfiguration =
+ ElasticsearchIOITCommon.getConnectionConfiguration(
+ options, ElasticsearchIOITCommon.IndexMode.READ);
+ writeConnectionConfiguration =
+ ElasticsearchIOITCommon.getConnectionConfiguration(
+ options, ElasticsearchIOITCommon.IndexMode.WRITE);
+ updateConnectionConfiguration =
+ ElasticsearchIOITCommon.getConnectionConfiguration(
+ options, ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL);
+ restClient = readConnectionConfiguration.createClient();
+ elasticsearchIOTestCommon =
+ new ElasticsearchIOTestCommon(readConnectionConfiguration, restClient, true);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ ElasticsearchIOTestUtils.deleteIndex(writeConnectionConfiguration, restClient);
+ ElasticsearchIOTestUtils.deleteIndex(updateConnectionConfiguration, restClient);
+ restClient.close();
+ }
+
+ @Test
+ public void testSplitsVolume() throws Exception {
+ elasticsearchIOTestCommon.testSplit(10_000);
+ }
+
+ @Test
+ public void testReadVolume() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testRead();
+ }
+
+ @Test
+ public void testReadPITVolume() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadPIT();
+ }
+
+ @Test
+ public void testWriteVolume() throws Exception {
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
+ new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+ elasticsearchIOTestCommonWrite.testWrite();
+ }
+
+ @Test
+ public void testWriteVolumeStateful() throws Exception {
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
+ new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+ elasticsearchIOTestCommonWrite.testWriteStateful();
+ }
+
+ @Test
+ public void testSizesVolume() throws Exception {
+ elasticsearchIOTestCommon.testSizes();
+ }
+
+ /**
+ * This test verifies volume loading of Elasticsearch using explicit document IDs and routed to an
+ * index named the same as the scientist, and type which is based on the modulo 2 of the scientist
+ * name. The goal of this IT is to help observe and verify that the overhead of adding the
+ * functions to parse the document and extract the ID is acceptable.
+ */
+ @Test
+ public void testWriteWithFullAddressingVolume() throws Exception {
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
+ new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+ elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
+ }
+
+ @Test
+ public void testWriteWithAllowableErrors() throws Exception {
+ elasticsearchIOTestCommon.testWriteWithAllowedErrors();
+ }
+
+ @Test
+ public void testWriteWithRouting() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithRouting();
+ }
+
+ @Test
+ public void testWriteScriptedUpsert() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteScriptedUpsert();
+ }
+
+ @Test
+ public void testWriteWithDocVersion() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithDocVersion();
+ }
+
+ /**
+ * This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
+ * and then a new field is added to each document using a partial update. The test then asserts
+ * the updates were applied.
+ */
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
+ elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted and the other half is partially updated using bulk
+ * delete request. The test then asserts the documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted using bulk delete request. The test then asserts the
+ * documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
+}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 000000000000..5f3861e69a99
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.createConnectionConfig;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.createIndex;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.createTestContainer;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.deleteIndex;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.setDefaultTemplate;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+/** Tests for {@link ElasticsearchIO} version 9. */
+public class ElasticsearchIOTest implements Serializable {
+
+ private ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+ private ConnectionConfiguration connectionConfiguration;
+ private static ElasticsearchContainer container;
+ private static RestClient client;
+ static final String IMAGE_TAG = "9.2.0";
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ // Create the elasticsearch container.
+ container = createTestContainer(IMAGE_TAG);
+
+ // Start the container. This step might take some time...
+ container.start();
+ client = ElasticsearchIOTestUtils.clientFromContainer(container, true);
+ setDefaultTemplate(client);
+ }
+
+ @AfterClass
+ public static void afterClass() throws IOException {
+ client.close();
+ container.stop();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ if (connectionConfiguration == null) {
+ connectionConfiguration = createConnectionConfig(client).builder().setType(null).build();
+ elasticsearchIOTestCommon =
+ new ElasticsearchIOTestCommon(connectionConfiguration, client, false);
+
+ deleteIndex(client, getEsIndex());
+ }
+ }
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testSizes() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.testSizes();
+ }
+
+ @Test
+ public void testSizesWithAlias() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex(), true);
+ elasticsearchIOTestCommon.testSizes();
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testRead();
+ }
+
+ @Test
+ public void testReadPIT() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadPIT();
+ }
+
+ @Test
+ public void testReadWithQueryString() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadWithQueryString();
+ }
+
+ @Test
+ public void testReadWithQueryStringAndPIT() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadWithQueryAndPIT();
+ }
+
+ @Test
+ public void testReadWithQueryValueProvider() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWrite();
+ }
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testWriteWithErrors() throws Exception {
+ elasticsearchIOTestCommon.setExpectedException(expectedException);
+ elasticsearchIOTestCommon.testWriteWithErrors();
+ }
+
+ @Test
+ public void testWriteWithErrorsReturned() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithErrorsReturned();
+ }
+
+ @Test
+ public void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithErrorsReturnedAllowedErrors();
+ }
+
+ @Test
+ public void testWriteWithMaxBatchSize() throws Exception {
+ elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
+ }
+
+ @Test
+ public void testWriteWithMaxBatchSizeBytes() throws Exception {
+ elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
+ }
+
+ @Test
+ public void testSplit() throws Exception {
+ // need to create the index using the helper method (not create it at first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.testSplit(2_000);
+ }
+
+ @Test
+ public void testWriteWithIdFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIdFn();
+ }
+
+ @Test
+ public void testWriteWithIndexFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIndexFn();
+ }
+
+ @Test
+ public void testWriteFullAddressing() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithFullAddressing();
+ }
+
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWritePartialUpdate();
+ }
+
+ @Test
+ public void testWriteAppendOnly() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteAppendOnly();
+ }
+
+ @Test(expected = Exception.class)
+ public void testWriteAppendOnlyDeleteNotAllowed() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteAppendOnlyDeleteNotAllowed();
+ }
+
+ @Test
+ public void testWriteWithAllowableErrors() throws Exception {
+ elasticsearchIOTestCommon.testWriteWithAllowedErrors();
+ }
+
+ @Test
+ public void testWriteWithRouting() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithRouting();
+ }
+
+ @Test
+ public void testWriteScriptedUpsert() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteScriptedUpsert();
+ }
+
+ @Test
+ public void testWriteWithDocVersion() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithDocVersion();
+ }
+
+ @Test
+ public void testMaxParallelRequestsPerWindow() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testMaxParallelRequestsPerWindow();
+ }
+
+ @Test
+ public void testReadWithMetadata() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadWithMetadata();
+ }
+
+ @Test
+ public void testDefaultRetryPredicate() throws IOException {
+ elasticsearchIOTestCommon.testDefaultRetryPredicate(client);
+ }
+
+ @Test
+ public void testWriteRetry() throws Throwable {
+ elasticsearchIOTestCommon.setExpectedException(expectedException);
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteRetry();
+ }
+
+ @Test
+ public void testWriteRetryValidRequest() throws Throwable {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteRetryValidRequest();
+ }
+
+ @Test
+ public void testWriteWithIsDeleteFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
+
+ @Test
+ public void testDocToBulkAndBulkIO() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testDocToBulkAndBulkIO();
+ }
+
+ @Test
+ public void testDocumentCoder() throws Exception {
+ elasticsearchIOTestCommon.testDocumentCoder();
+ }
+
+ @Test
+ public void testPDone() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testPipelineDone();
+ }
+
+ @Test
+ public void testValidSSLAndUsernameConfiguration() throws Exception {
+ URL fileUrl = getClass().getClassLoader().getResource("clientkeystore");
+ Path filePath = Paths.get(fileUrl.toURI());
+ elasticsearchIOTestCommon.testValidSSLAndUsernameConfiguration(
+ filePath.toAbsolutePath().toString());
+ }
+
+ @Test
+ public void testWriteWindowPreservation() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWindowPreservation();
+ }
+
+ @Test
+ public void testWriteWithClientResponseException() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
+ }
+}
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 0d044a732cbc..c634bb99e02e 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -205,7 +205,7 @@
})
public class ElasticsearchIO {
- private static final List