From 5ab81e6ced064704b9a720fe622f7b5f382d0794 Mon Sep 17 00:00:00 2001 From: Eugene Date: Mon, 24 Oct 2022 19:45:37 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89Destination-elasticsearch:=20added?= =?UTF-8?q?=20custom=20sertificate=20support=20(#18177)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [11356] Destination-elasticsearch: added custom certificate support --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 8 +- .../airbyte/db/util/SSLCertificateUtils.java | 20 +++ .../Dockerfile | 2 +- ...trictEncryptDestinationAcceptanceTest.java | 26 +++- .../src/test/resources/expected_spec.json | 7 + .../destination-elasticsearch/Dockerfile | 2 +- .../destination-elasticsearch/build.gradle | 1 + .../elasticsearch/ConnectorConfiguration.java | 123 +----------------- .../ElasticsearchConnection.java | 14 +- .../src/main/resources/spec.json | 7 + ...lasticsearchDestinationAcceptanceTest.java | 11 +- deps.toml | 3 +- .../destinations/elasticsearch.md | 8 +- 14 files changed, 102 insertions(+), 132 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 6daff601bdb5..780c0dc5312f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -100,7 +100,7 @@ - destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c name: ElasticSearch dockerRepository: airbyte/destination-elasticsearch - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch icon: elasticsearch.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 81d2e83655eb..1f71e6f53c4c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1700,7 +1700,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-elasticsearch:0.1.4" +- dockerImage: "airbyte/destination-elasticsearch:0.1.5" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/elasticsearch" connectionSpecification: @@ -1722,6 +1722,12 @@ \ will be performed using the primary key value as the elasticsearch doc\ \ id. Does not support composite primary keys." default: true + ca_certificate: + type: "string" + title: "CA certificate" + description: "CA certificate" + airbyte_secret: true + multiline: true authenticationMethod: title: "Authentication Method" type: "object" diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java index 29a87ccc52c5..32062cabfe46 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java @@ -28,6 +28,9 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,4 +163,21 @@ public static URI keyStoreFromClientCertificate( return keyStoreFromClientCertificate(certString, keyString, keyStorePassword, FileSystems.getDefault(), directory); } + public static SSLContext createContextFromCaCert(String caCertificate) { + try { + CertificateFactory factory = CertificateFactory.getInstance(X509); + Certificate trustedCa = factory.generateCertificate( + new ByteArrayInputStream(caCertificate.getBytes(StandardCharsets.UTF_8)) + ); + KeyStore trustStore = KeyStore.getInstance(PKCS_12); + trustStore.load(null, null); + trustStore.setCertificateEntry("ca", trustedCa); + SSLContextBuilder sslContextBuilder = + SSLContexts.custom().loadTrustMaterial(trustStore, null); + return sslContextBuilder.build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile index 830bab62e8f2..97bf1f52f693 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java index 31232025fcbe..68859c436a47 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java @@ -4,30 +4,43 @@ package io.airbyte.integrations.destination.elasticsearch; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; import java.util.Map; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest { - private final ObjectMapper mapper = new ObjectMapper(); private static ElasticsearchContainer container; + private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3"; + private final ObjectMapper mapper = new ObjectMapper(); @BeforeAll public static void beforeAll() { - container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") - .withPassword("MagicWord"); + container = new ElasticsearchContainer(IMAGE_NAME) + .withEnv("discovery.type", "single-node") + .withEnv("network.host", "0.0.0.0") + .withEnv("logger.org.elasticsearch", "INFO") + .withEnv("ingest.geoip.downloader.enabled", "false") + .withExposedPorts(9200) + .withPassword("s3cret"); container.start(); } @@ -84,11 +97,14 @@ protected JsonNode getConfig() { final JsonNode authConfig = Jsons.jsonNode(Map.of( "method", "basic", "username", "elastic", - "password", "MagicWord")); + "password", "s3cret")); return Jsons.jsonNode(ImmutableMap.builder() - .put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))) + .put("endpoint", String.format("https://%s:%s", container.getHost(), container.getMappedPort(9200))) .put("authenticationMethod", authConfig) + .put("ca_certificate", new String(container.copyFileFromContainer( + "/usr/share/elasticsearch/config/certs/http_ca.crt", + InputStream::readAllBytes), StandardCharsets.UTF_8)) .build()); } diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json index a951230fe95a..7fd2ba80689d 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json @@ -23,6 +23,13 @@ "description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.", "default": true }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true + }, "authenticationMethod": { "title": "Authentication Method", "type": "object", diff --git a/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile index 362a4df7beaf..f78e88860504 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile +++ b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-elasticsearch diff --git a/airbyte-integrations/connectors/destination-elasticsearch/build.gradle b/airbyte-integrations/connectors/destination-elasticsearch/build.gradle index dc5b8e7c8788..469ad4991a16 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/build.gradle +++ b/airbyte-integrations/connectors/destination-elasticsearch/build.gradle @@ -10,6 +10,7 @@ application { } dependencies { + implementation project(':airbyte-db:db-lib') implementation project(':airbyte-config:config-models') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:bases:base-java') diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java index 27c2c91eb9fe..a156e1011d80 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java @@ -5,71 +5,28 @@ package io.airbyte.integrations.destination.elasticsearch; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Objects; +import lombok.Data; @JsonIgnoreProperties(ignoreUnknown = true) +@Data public class ConnectorConfiguration { private String endpoint; private boolean upsert; + @JsonProperty("ca_certificate") + private String caCertificate; private AuthenticationMethod authenticationMethod = new AuthenticationMethod(); - public ConnectorConfiguration() {} - public static ConnectorConfiguration fromJsonNode(JsonNode config) { return new ObjectMapper().convertValue(config, ConnectorConfiguration.class); } - public String getEndpoint() { - return this.endpoint; - } - - public boolean isUpsert() { - return this.upsert; - } - - public AuthenticationMethod getAuthenticationMethod() { - return this.authenticationMethod; - } - - public void setEndpoint(String endpoint) { - this.endpoint = endpoint; - } - - public void setUpsert(boolean upsert) { - this.upsert = upsert; - } - - public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) { - this.authenticationMethod = authenticationMethod; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - ConnectorConfiguration that = (ConnectorConfiguration) o; - return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod); - } - - @Override - public int hashCode() { - return Objects.hash(endpoint, upsert, authenticationMethod); - } - - @Override - public String toString() { - return "ConnectorConfiguration{" + - "endpoint='" + endpoint + '\'' + - ", upsert=" + upsert + - ", authenticationMethod=" + authenticationMethod + - '}'; - } + @Data static class AuthenticationMethod { private ElasticsearchAuthenticationMethod method = ElasticsearchAuthenticationMethod.none; @@ -78,46 +35,6 @@ static class AuthenticationMethod { private String apiKeyId; private String apiKeySecret; - public ElasticsearchAuthenticationMethod getMethod() { - return this.method; - } - - public String getUsername() { - return this.username; - } - - public String getPassword() { - return this.password; - } - - public String getApiKeyId() { - return this.apiKeyId; - } - - public String getApiKeySecret() { - return this.apiKeySecret; - } - - public void setMethod(ElasticsearchAuthenticationMethod method) { - this.method = method; - } - - public void setUsername(String username) { - this.username = username; - } - - public void setPassword(String password) { - this.password = password; - } - - public void setApiKeyId(String apiKeyId) { - this.apiKeyId = apiKeyId; - } - - public void setApiKeySecret(String apiKeySecret) { - this.apiKeySecret = apiKeySecret; - } - public boolean isValid() { return switch (this.method) { case none -> true; @@ -126,34 +43,6 @@ public boolean isValid() { }; } - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - AuthenticationMethod that = (AuthenticationMethod) o; - return method == that.method && - Objects.equals(username, that.username) && - Objects.equals(password, that.password) && - Objects.equals(apiKeyId, that.apiKeyId) && - Objects.equals(apiKeySecret, that.apiKeySecret); - } - - @Override - public int hashCode() { - return Objects.hash(method, username, password, apiKeyId, apiKeySecret); - } - - @Override - public String toString() { - return "AuthenticationMethod{" + - "method=" + method + - ", username='" + username + '\'' + - ", apiKeyId='" + apiKeyId + '\'' + - '}'; - } - } } diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java index 100fb60a03c0..ecb346207706 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.db.util.SSLCertificateUtils; import io.airbyte.protocol.models.AirbyteRecordMessage; import jakarta.json.JsonValue; import java.io.IOException; @@ -28,6 +29,7 @@ import org.apache.http.message.BasicHeader; import org.elasticsearch.client.Node; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,17 @@ public ElasticsearchConnection(ConnectorConfiguration config) { // Create the low-level client httpHost = HttpHost.create(config.getEndpoint()); - restClient = RestClient.builder(httpHost) + final RestClientBuilder builder = RestClient.builder(httpHost); + + // Set custom user's certificate if provided + if (config.getCaCertificate() != null && !config.getCaCertificate().isEmpty()){ + builder.setHttpClientConfigCallback(clientBuilder -> { + clientBuilder.setSSLContext(SSLCertificateUtils.createContextFromCaCert(config.getCaCertificate())); + return clientBuilder; + }); + } + + restClient = builder .setDefaultHeaders(configureHeaders(config)) .setFailureListener(new FailureListener()) .build(); diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json index 546f962d51b2..9375ab8ed5a2 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/resources/spec.json @@ -23,6 +23,13 @@ "description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.", "default": true }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true + }, "authenticationMethod": { "title": "Authentication Method", "type": "object", diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java index ce37a6fee19a..790eaa8724ec 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java @@ -13,23 +13,28 @@ import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticsearchDestinationAcceptanceTest extends DestinationAcceptanceTest { + private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3"; + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDestinationAcceptanceTest.class); + private ObjectMapper mapper = new ObjectMapper(); private static ElasticsearchContainer container; @BeforeAll public static void beforeAll() { - container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") - .withEnv("ES_JAVA_OPTS", "-Xms512m -Xms512m") + container = new ElasticsearchContainer(IMAGE_NAME) .withEnv("discovery.type", "single-node") .withEnv("network.host", "0.0.0.0") .withEnv("logger.org.elasticsearch", "INFO") .withEnv("ingest.geoip.downloader.enabled", "false") - .withEnv("xpack.security.enabled", "false") + .withPassword("s3cret") .withExposedPorts(9200) + .withEnv("xpack.security.enabled", "false") .withStartupTimeout(Duration.ofSeconds(60)); container.start(); } diff --git a/deps.toml b/deps.toml index 5d2079050562..ce0c16b79912 100644 --- a/deps.toml +++ b/deps.toml @@ -21,6 +21,7 @@ connectors-testcontainers-scylla = "1.16.2" connectors-testcontainers-tidb = "1.16.3" connectors-destination-testcontainers-clickhouse = "1.17.3" connectors-destination-testcontainers-oracle-xe = "1.17.3" +connectors-destination-testcontainers-elasticsearch = "1.17.3" connectors-source-testcontainers-clickhouse = "1.17.3" platform-testcontainers = "1.17.3" @@ -54,7 +55,7 @@ connectors-testcontainers = { module = "org.testcontainers:testcontainers", vers connectors-testcontainers-cassandra = { module = "org.testcontainers:cassandra", version.ref = "connectors-testcontainers-cassandra" } connectors-testcontainers-cockroachdb = { module = "org.testcontainers:cockroachdb", version.ref = "connectors-testcontainers" } connectors-testcontainers-db2 = { module = "org.testcontainers:db2", version.ref = "connectors-testcontainers" } -connectors-testcontainers-elasticsearch = { module = "org.testcontainers:elasticsearch", version.ref = "connectors-testcontainers" } +connectors-testcontainers-elasticsearch = { module = "org.testcontainers:elasticsearch", version.ref = "connectors-destination-testcontainers-elasticsearch" } connectors-testcontainers-jdbc = { module = "org.testcontainers:jdbc", version.ref = "connectors-testcontainers" } connectors-testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "connectors-testcontainers" } connectors-testcontainers-mariadb = { module = "org.testcontainers:mariadb", version.ref = "connectors-testcontainers-mariadb" } diff --git a/docs/integrations/destinations/elasticsearch.md b/docs/integrations/destinations/elasticsearch.md index ccae1d4a4d57..720753d7576c 100644 --- a/docs/integrations/destinations/elasticsearch.md +++ b/docs/integrations/destinations/elasticsearch.md @@ -57,11 +57,16 @@ The connector should be enhanced to support variable batch sizes. * Endpoint URL [ex. https://elasticsearch.savantly.net:9423] * Username [optional] (basic auth) * Password [optional] (basic auth) + * CA certificate [optional] * Api key ID [optional] * Api key secret [optional] * If authentication is used, the user should have permission to create an index if it doesn't exist, and/or be able to `create` documents - +### CA certificate +Ca certificate may be fetched from the Elasticsearch server from /usr/share/elasticsearch/config/certs/http_ca.crt +Fetching example from dockerized Elasticsearch: +`docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .` where es01 is a container's name. For more details please visit https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html + ### Setup guide Enter the endpoint URL, select authentication method, and whether to use 'upsert' method when indexing new documents. @@ -89,6 +94,7 @@ Using this feature requires additional configuration, when creating the source. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.5 | 2022-10-24 | [18177](https://github.com/airbytehq/airbyte/pull/18177) | add custom CA certificate processing | | 0.1.4 | 2022-10-14 | [17805](https://github.com/airbytehq/airbyte/pull/17805) | add SSH Tunneling | | 0.1.3 | 2022-05-30 | [14640](https://github.com/airbytehq/airbyte/pull/14640) | Include lifecycle management | | 0.1.2 | 2022-04-19 | [11752](https://github.com/airbytehq/airbyte/pull/11752) | Reduce batch size to 32Mb |