Skip to content

Commit

Permalink
ElasticSearch: Fix ES connection (#13919)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 authored Nov 13, 2023
1 parent 759bd84 commit b34111e
Show file tree
Hide file tree
Showing 17 changed files with 537 additions and 50 deletions.
19 changes: 18 additions & 1 deletion bootstrap/sql/migrations/native/1.2.1/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,21 @@ SET dqdts.json = JSON_INSERT(
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND JSON_EXTRACT(dqdts.json, '$.timestamp') REGEXP '^[0-9]{10}$'
;
;

-- update elasticsearch connection
UPDATE search_service_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.connection.config.caCert'),
'$.connection.config.sslConfig',
JSON_OBJECT(
'certificates',
JSON_OBJECT(
'caCertPath',
JSON_EXTRACT(json, '$.connection.config.caCert')
)
)
)
WHERE
serviceType = 'ElasticSearch'
AND JSON_EXTRACT(json, '$.connection.config.caCert') IS NOT NULL;
20 changes: 20 additions & 0 deletions bootstrap/sql/migrations/native/1.2.1/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,23 @@ SET json = jsonb_set(
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND (json->>'timestamp') ~ '^[0-9]{10}$';



UPDATE search_service_entity
SET json = JSONB_SET(
json::jsonb,
'{connection,config}',
json::jsonb #> '{connection,config}' #- '{caCert}' ||
jsonb_build_object(
'sslConfig',
jsonb_build_object(
'certificates',
jsonb_build_object('caCertPath', json #> '{connection,config,caCert}')
)
),
true
)
WHERE
serviceType = 'ElasticSearch'
AND json #> '{connection,config,caCert}' IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,130 @@
"""
Source connection handler
"""
import ssl
from pathlib import Path
from typing import Optional

from elasticsearch8 import Elasticsearch
from httpx import create_ssl_context

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ApiAuthentication,
from metadata.generated.schema.entity.services.connections.common.sslCertPaths import (
SslCertificatesByPath,
)
from metadata.generated.schema.entity.services.connections.common.sslCertValues import (
SslCertificatesByValues,
)
from metadata.generated.schema.entity.services.connections.common.sslConfig import (
SslConfig,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearch.apiAuth import (
ApiKeyAuthentication,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearch.basicAuth import (
BasicAuthentication,
)
from metadata.generated.schema.entity.services.connections.search.elasticSearchConnection import (
ElasticsearchConnection,
)
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.helpers import init_staging_dir

CA_CERT_FILE_NAME = "root.pem"
CLIENT_CERT_FILE_NAME = "client.pem"
KEY_CERT_FILE_NAME = "client_key.pem"


def _clean_cert_value(cert_data: str) -> str:
return cert_data.replace("\\n", "\n")


def write_data_to_file(file_path: Path, cert_data: str) -> None:
with open(
file_path,
"w+",
encoding=UTF_8,
) as file:
data = _clean_cert_value(cert_data)

file.write(data)


def _handle_ssl_context_by_value(ssl_config: SslConfig):
ca_cert = False
client_cert = None
private_key = None
init_staging_dir(ssl_config.certificates.stagingDir)
if ssl_config.certificates.caCertValue:
ca_cert = Path(ssl_config.certificates.stagingDir, CA_CERT_FILE_NAME)
write_data_to_file(
ca_cert, ssl_config.certificates.caCertValue.get_secret_value()
)
if ssl_config.certificates.clientCertValue:
client_cert = Path(ssl_config.certificates.stagingDir, CLIENT_CERT_FILE_NAME)
write_data_to_file(
client_cert,
ssl_config.certificates.clientCertValue.get_secret_value(),
)
if ssl_config.certificates.privateKeyValue:
private_key = Path(ssl_config.certificates.stagingDir, KEY_CERT_FILE_NAME)
write_data_to_file(
private_key,
ssl_config.certificates.privateKeyValue.get_secret_value(),
)
return ca_cert, client_cert, private_key


def _handle_ssl_context_by_path(ssl_config: SslConfig):
ca_cert = False
if ssl_config.certificates.caCertPath:
ca_cert = ssl_config.certificates.caCertPath
client_cert = ssl_config.certificates.clientCertPath
private_key = ssl_config.certificates.privateKeyPath
return ca_cert, client_cert, private_key


def get_ssl_context(ssl_config: SslConfig) -> ssl.SSLContext:
"""
Method to get SSL Context
"""
ca_cert = False
client_cert = None
private_key = None
cert_chain = None

if not ssl_config.certificates:
return None

if isinstance(ssl_config.certificates, SslCertificatesByValues):
ca_cert, client_cert, private_key = _handle_ssl_context_by_value(
ssl_config=ssl_config
)
elif isinstance(ssl_config.certificates, SslCertificatesByPath):
ca_cert, client_cert, private_key = _handle_ssl_context_by_path(
ssl_config=ssl_config
)

if client_cert and private_key:
cert_chain = (client_cert, private_key)
elif client_cert:
cert_chain = client_cert
else:
cert_chain = None

if ca_cert or cert_chain:
ssl_context = create_ssl_context(
cert=cert_chain,
verify=ca_cert,
)
return ssl_context

return ssl._create_unverified_context() # pylint: disable=protected-access


def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
Expand All @@ -35,6 +144,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
"""
basic_auth = None
api_key = None
ssl_context = None
if (
isinstance(connection.authType, BasicAuthentication)
and connection.authType.username
Expand All @@ -46,7 +156,7 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
else None,
)

if isinstance(connection.authType, ApiAuthentication):
if isinstance(connection.authType, ApiKeyAuthentication):
if connection.authType.apiKeyId and connection.authType.apiKey:
api_key = (
connection.authType.apiKeyId,
Expand All @@ -58,12 +168,15 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()

if connection.sslConfig:
ssl_context = get_ssl_context(connection.sslConfig)

return Elasticsearch(
connection.hostPort,
http_auth=basic_auth,
api_key=api_key,
ca_certs=connection.caCert,
**connection.connectionArguments.__root__
ssl_context=ssl_context,
**connection.connectionArguments.__root__,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"""
Elasticsearch source to extract metadata
"""
import shutil
from pathlib import Path
from typing import Any, Iterable, Optional

from elasticsearch8 import Elasticsearch
Expand Down Expand Up @@ -124,3 +126,11 @@ def yield_search_index_sample_data(
),
)
)

def close(self):
try:
if Path(self.service_connection.sslConfig.certificates.stagingDir).exists():
shutil.rmtree(self.service_connection.sslConfig.certificates.stagingDir)
except AttributeError:
pass
return super().close()
2 changes: 2 additions & 0 deletions ingestion/src/metadata/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def stop(self) -> None:
except Exception as exc:
logger.warning(f"Error trying to close the step {step} due to [{exc}]")

self.source.close()

@property
def timer(self) -> RepeatedTimer:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,34 @@ We extract ElasticSearch's metadata by using its [API](https://www.elastic.co/gu
2. API Key Authentication
- API Key: API Key to connect to ElasticSearch required when API Key Authentication is enabled on ElasticSearch.
- API Key Id: Enter API Key ID In case of API Key Authentication if there is any API Key ID associated with the API Key, otherwise this field can be left blank.
- **Client Certificate Path**: In case the SSL is enabled on your ElasticSearch instance and CA certificate is required for authentication, then specify the path of certificate in this field. NOTE: In case of docker deployment you need to store this certificate accessible to OpenMetadata Ingestion docker container, you can do it via copying the certificate to the docker container or store it in the volume associate with the OpenMetadata Ingestion container.
- **SSL Certificates**:
1. SSL Certificates By Path
- CA Certificate Path: This field specifies the path of CA certificate required for authentication.
- Client Certificate Path: This field specifies the path of Clint certificate required for authentication.
- Private Key Path: This field specifies the path of Clint Key/Private Key required for authentication.

2. SSL Certificates By Value
- CA Certificate Value: This field specifies the value of CA certificate required for authentication.
- Client Certificate Value: This field specifies the value of Clint certificate required for authentication.
- Private Key Value: This field specifies the value of Clint Key/Private Key required for authentication.
- Staging Directory Path: This field specifies the path to temporary staging directory, where the certificates will be stored temporarily during the ingestion process, which will de deleted once the ingestion job is over.
- when you are using this approach make sure you are passing the key in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
- **Connection Timeout in Seconds**: Connection timeout configuration for communicating with ElasticSearch APIs.
{% /extraContent %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,34 @@ This is a sample config for ElasticSearch:
{% /codeInfo %}

{% codeInfo srNumber=4 %}
**caCert**: In case the SSL is enabled on your ElasticSearch instance and CA certificate is required for authentication, then specify the path of certificate in this field. NOTE: In case of docker deployment you need to store this certificate accessible to OpenMetadata Ingestion docker container, you can do it via copying the certificate to the docker container or store it in the volume associate with the OpenMetadata Ingestion container.
- **sslConfig**:
1. SSL Certificates By Path
- caCertPath: This field specifies the path of CA certificate required for authentication.
- clientCertPath: This field specifies the path of Clint certificate required for authentication.
- privateKeyPath: This field specifies the path of Clint Key/Private Key required for authentication.

2. SSL Certificates By Value
- caCertValue: This field specifies the value of CA certificate required for authentication.
- clientCertValue: This field specifies the value of Clint certificate required for authentication.
- privateKeyValue: This field specifies the value of Clint Key/Private Key required for authentication.
- stagingDir: This field specifies the path to temporary staging directory, where the certificates will be stored temporarily during the ingestion process, which will de deleted once the ingestion job is over.
- when you are using this approach make sure you are passing the key in a correct format. If your certificate looks like this:
```
-----BEGIN CERTIFICATE-----
MII..
MBQ...
CgU..
8Lt..
...
h+4=
-----END CERTIFICATE-----
```
You will have to replace new lines with `\n` and the final value that you need to pass should look like this:
```
-----BEGIN CERTIFICATE-----\nMII..\nMBQ...\nCgU..\n8Lt..\n...\nh+4=\n-----END CERTIFICATE-----\n
{% /codeInfo %}
Expand Down Expand Up @@ -117,7 +144,17 @@ source:
# apiKey: <api key>
```
```yaml {% srNumber=4 %}
caCert: /path/to/http_ca.crt
sslConfig:
certificates:
caCertPath: /path/to/http_ca.crt
clientCertPath: /path/to/http_ca.crt
privateKeyPath: /path/to/http_ca.crt

# pass certificate values
# caCertValue: -----BEGIN CERTIFICATE-----\n....\n.....\n-----END CERTIFICATE-----\n
# clientCertValue: -----BEGIN CERTIFICATE-----\n....\n...-----END CERTIFICATE-----\n
# privateKeyValue: -----BEGIN RSA PRIVATE KEY-----\n....\n....\n-----END RSA PRIVATE KEY-----\n
# stagingDir: /tmp/stage
```
```yaml {% srNumber=5 %}
connectionTimeoutSecs: 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.openmetadata.schema.services.connections.database.datalake.GCSConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.schema.services.connections.pipeline.AirflowConnection;
import org.openmetadata.schema.services.connections.search.ElasticSearchConnection;
import org.openmetadata.schema.services.connections.storage.GcsConnection;

/** Factory class to get a `ClassConverter` based on the service class. */
Expand All @@ -56,6 +57,7 @@ private ClassConverterFactory() {
Map.entry(GCSConfig.class, new GCPConfigClassConverter()),
Map.entry(GCPCredentials.class, new GcpCredentialsClassConverter()),
Map.entry(GcsConnection.class, new GcpConnectionClassConverter()),
Map.entry(ElasticSearchConnection.class, new ElasticSearchConnectionClassConverter()),
Map.entry(LookerConnection.class, new LookerConnectionClassConverter()),
Map.entry(OpenMetadataConnection.class, new OpenMetadataConnectionClassConverter()),
Map.entry(SSOAuthMechanism.class, new SSOAuthMechanismClassConverter()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openmetadata.service.secrets.converter;

import java.util.List;
import org.openmetadata.schema.services.connections.search.ElasticSearchConnection;
import org.openmetadata.schema.services.connections.search.elasticSearch.ESAPIAuth;
import org.openmetadata.schema.services.connections.search.elasticSearch.ESBasicAuth;
import org.openmetadata.service.util.JsonUtils;

/** Converter class to get an `ElasticSearchConnection` object. */
public class ElasticSearchConnectionClassConverter extends ClassConverter {

private static final List<Class<?>> CONFIG_SOURCE_CLASSES = List.of(ESBasicAuth.class, ESAPIAuth.class);
//
public ElasticSearchConnectionClassConverter() {
super(ElasticSearchConnection.class);
}

@Override
public Object convert(Object object) {
ElasticSearchConnection elasticSearchConnection =
(ElasticSearchConnection) JsonUtils.convertValue(object, this.clazz);

tryToConvert(elasticSearchConnection.getAuthType(), CONFIG_SOURCE_CLASSES)
.ifPresent(elasticSearchConnection::setAuthType);

return elasticSearchConnection;
}
}
Loading

0 comments on commit b34111e

Please sign in to comment.