Skip to content

Commit

Permalink
🎉Destination-elasticsearch: added custom sertificate support (airbyte…
Browse files Browse the repository at this point in the history
…hq#18177)

* [11356] Destination-elasticsearch: added custom certificate support
  • Loading branch information
etsybaev authored Oct 24, 2022
1 parent b23565e commit 5ab81e6
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
- destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c
name: ElasticSearch
dockerRepository: airbyte/destination-elasticsearch
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch
icon: elasticsearch.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-elasticsearch:0.1.4"
- dockerImage: "airbyte/destination-elasticsearch:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/elasticsearch"
connectionSpecification:
Expand All @@ -1722,6 +1722,12 @@
\ will be performed using the primary key value as the elasticsearch doc\
\ id. Does not support composite primary keys."
default: true
ca_certificate:
type: "string"
title: "CA certificate"
description: "CA certificate"
airbyte_secret: true
multiline: true
authenticationMethod:
title: "Authentication Method"
type: "object"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,4 +163,21 @@ public static URI keyStoreFromClientCertificate(
return keyStoreFromClientCertificate(certString, keyString, keyStorePassword, FileSystems.getDefault(), directory);
}

public static SSLContext createContextFromCaCert(String caCertificate) {
try {
CertificateFactory factory = CertificateFactory.getInstance(X509);
Certificate trustedCa = factory.generateCertificate(
new ByteArrayInputStream(caCertificate.getBytes(StandardCharsets.UTF_8))
);
KeyStore trustStore = KeyStore.getInstance(PKCS_12);
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder =
SSLContexts.custom().loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,43 @@

package io.airbyte.integrations.destination.elasticsearch;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private final ObjectMapper mapper = new ObjectMapper();
private static ElasticsearchContainer container;
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3";
private final ObjectMapper mapper = new ObjectMapper();

@BeforeAll
public static void beforeAll() {

container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
.withPassword("MagicWord");
container = new ElasticsearchContainer(IMAGE_NAME)
.withEnv("discovery.type", "single-node")
.withEnv("network.host", "0.0.0.0")
.withEnv("logger.org.elasticsearch", "INFO")
.withEnv("ingest.geoip.downloader.enabled", "false")
.withExposedPorts(9200)
.withPassword("s3cret");

container.start();
}
Expand Down Expand Up @@ -84,11 +97,14 @@ protected JsonNode getConfig() {
final JsonNode authConfig = Jsons.jsonNode(Map.of(
"method", "basic",
"username", "elastic",
"password", "MagicWord"));
"password", "s3cret"));

return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("endpoint", String.format("https://%s:%s", container.getHost(), container.getMappedPort(9200)))
.put("authenticationMethod", authConfig)
.put("ca_certificate", new String(container.copyFileFromContainer(
"/usr/share/elasticsearch/config/certs/http_ca.crt",
InputStream::readAllBytes), StandardCharsets.UTF_8))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
"default": true
},
"ca_certificate": {
"type": "string",
"title": "CA certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true
},
"authenticationMethod": {
"title": "Authentication Method",
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/destination-elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ application {
}

dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,71 +5,28 @@
package io.airbyte.integrations.destination.elasticsearch;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Objects;
import lombok.Data;

@JsonIgnoreProperties(ignoreUnknown = true)
@Data
public class ConnectorConfiguration {

private String endpoint;
private boolean upsert;
@JsonProperty("ca_certificate")
private String caCertificate;
private AuthenticationMethod authenticationMethod = new AuthenticationMethod();

public ConnectorConfiguration() {}

public static ConnectorConfiguration fromJsonNode(JsonNode config) {
return new ObjectMapper().convertValue(config, ConnectorConfiguration.class);
}

public String getEndpoint() {
return this.endpoint;
}

public boolean isUpsert() {
return this.upsert;
}

public AuthenticationMethod getAuthenticationMethod() {
return this.authenticationMethod;
}

public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public void setUpsert(boolean upsert) {
this.upsert = upsert;
}

public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) {
this.authenticationMethod = authenticationMethod;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ConnectorConfiguration that = (ConnectorConfiguration) o;
return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, upsert, authenticationMethod);
}

@Override
public String toString() {
return "ConnectorConfiguration{" +
"endpoint='" + endpoint + '\'' +
", upsert=" + upsert +
", authenticationMethod=" + authenticationMethod +
'}';
}

@Data
static class AuthenticationMethod {

private ElasticsearchAuthenticationMethod method = ElasticsearchAuthenticationMethod.none;
Expand All @@ -78,46 +35,6 @@ static class AuthenticationMethod {
private String apiKeyId;
private String apiKeySecret;

public ElasticsearchAuthenticationMethod getMethod() {
return this.method;
}

public String getUsername() {
return this.username;
}

public String getPassword() {
return this.password;
}

public String getApiKeyId() {
return this.apiKeyId;
}

public String getApiKeySecret() {
return this.apiKeySecret;
}

public void setMethod(ElasticsearchAuthenticationMethod method) {
this.method = method;
}

public void setUsername(String username) {
this.username = username;
}

public void setPassword(String password) {
this.password = password;
}

public void setApiKeyId(String apiKeyId) {
this.apiKeyId = apiKeyId;
}

public void setApiKeySecret(String apiKeySecret) {
this.apiKeySecret = apiKeySecret;
}

public boolean isValid() {
return switch (this.method) {
case none -> true;
Expand All @@ -126,34 +43,6 @@ public boolean isValid() {
};
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AuthenticationMethod that = (AuthenticationMethod) o;
return method == that.method &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password) &&
Objects.equals(apiKeyId, that.apiKeyId) &&
Objects.equals(apiKeySecret, that.apiKeySecret);
}

@Override
public int hashCode() {
return Objects.hash(method, username, password, apiKeyId, apiKeySecret);
}

@Override
public String toString() {
return "AuthenticationMethod{" +
"method=" + method +
", username='" + username + '\'' +
", apiKeyId='" + apiKeyId + '\'' +
'}';
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.db.util.SSLCertificateUtils;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import jakarta.json.JsonValue;
import java.io.IOException;
Expand All @@ -28,6 +29,7 @@
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,7 +58,17 @@ public ElasticsearchConnection(ConnectorConfiguration config) {

// Create the low-level client
httpHost = HttpHost.create(config.getEndpoint());
restClient = RestClient.builder(httpHost)
final RestClientBuilder builder = RestClient.builder(httpHost);

// Set custom user's certificate if provided
if (config.getCaCertificate() != null && !config.getCaCertificate().isEmpty()){
builder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setSSLContext(SSLCertificateUtils.createContextFromCaCert(config.getCaCertificate()));
return clientBuilder;
});
}

restClient = builder
.setDefaultHeaders(configureHeaders(config))
.setFailureListener(new FailureListener())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
"default": true
},
"ca_certificate": {
"type": "string",
"title": "CA certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true
},
"authenticationMethod": {
"title": "Authentication Method",
"type": "object",
Expand Down
Loading

0 comments on commit 5ab81e6

Please sign in to comment.