Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into issue_15742
Browse files Browse the repository at this point in the history
  • Loading branch information
Akash Verma authored and Akash Verma committed Dec 16, 2024
2 parents 5fc5cac + a3cfd8a commit c70a2bb
Show file tree
Hide file tree
Showing 4,358 changed files with 73,872 additions and 63,417 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
4 changes: 2 additions & 2 deletions .github/workflows/sync-docs-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Prepare Collate content and partials
id: prepare_collate
continue-on-error: true
run: cp -R openmetadata-docs/content/partials/ openmetadata-docs/content/v1.5.x/partials
run: cp -R openmetadata-docs/content/partials/ openmetadata-docs/content/v1.6.x/partials

- name: Push content Collate
id: push_content_collate
Expand All @@ -68,7 +68,7 @@ jobs:
env:
SSH_DEPLOY_KEY: ${{ secrets.DOCS_COLLATE_SSH_DEPLOY_KEY }}
with:
source-directory: openmetadata-docs/content/v1.5.x
source-directory: openmetadata-docs/content/v1.6.x
target-directory: content/
destination-github-username: 'open-metadata'
destination-repository-name: 'docs-collate'
Expand Down
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ mlruns/
/ingestion/tests/integration/source/mlflow/tests/db/

# Antlr
/openmetadata-ui/src/main/resources/ui/src/antlr/generated/Fqn.interp
/openmetadata-ui/src/main/resources/ui/src/antlr/generated/Fqn.tokens
/openmetadata-ui/src/main/resources/ui/src/antlr/generated/FqnLexer.interp
/openmetadata-ui/src/main/resources/ui/src/antlr/generated/FqnLexer.tokens
openmetadata-ui/src/main/resources/ui/src/generated/antlr/
.antlr

# SQLAlchemy tests
Expand Down
13 changes: 8 additions & 5 deletions bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1290,17 +1290,20 @@ SET
)
)
)
)
)
WHERE
json -> 'chartDetails' -> 'metrics' is null;


-- Rename 'offset' to 'currentOffset' and add 'startingOffset'
-- Rename and remove 'offset' to 'currentOffset' and add 'startingOffset'
UPDATE change_event_consumers
SET json = jsonb_set(
jsonb_set(json, '{currentOffset}', json -> 'offset'),
'{startingOffset}', json -> 'offset'
)
jsonb_set(
json - 'offset',
'{currentOffset}', json -> 'offset'
),
'{startingOffset}', json -> 'offset'
)
WHERE json -> 'offset' IS NOT NULL
AND jsonSchema = 'eventSubscriptionOffset';

Expand Down
3 changes: 3 additions & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"giturlparse": "giturlparse",
"validators": "validators~=0.22.0",
"teradata": "teradatasqlalchemy>=20.0.0.0",
"cassandra": "cassandra-driver>=3.28.0",
}

COMMONS = {
Expand Down Expand Up @@ -267,6 +268,7 @@
},
"mlflow": {"mlflow-skinny>=2.3.0"},
"mongo": {VERSIONS["mongo"], VERSIONS["pandas"], VERSIONS["numpy"]},
"cassandra": {VERSIONS["cassandra"]},
"couchbase": {"couchbase~=4.1"},
"mssql": {
"sqlalchemy-pytds~=0.3",
Expand Down Expand Up @@ -370,6 +372,7 @@
VERSIONS["tableau"],
VERSIONS["pyhive"],
VERSIONS["mongo"],
VERSIONS["cassandra"],
VERSIONS["redshift"],
VERSIONS["snowflake"],
VERSIONS["elasticsearch8"],
Expand Down
30 changes: 30 additions & 0 deletions ingestion/src/metadata/examples/workflows/cassandra.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
source:
type: cassandra
serviceName: local_cassandra
serviceConnection:
config:
type: Cassandra
databaseName: custom_database_name
username: cassandra
authType:
password: cassandra
# cloudConfig:
# secureConnectBundle: <SCB File Path>
# token: <Token String>
# requestTimeout: <Timeout in seconds>
# connectTimeout: <Timeout in seconds>
hostPort: localhost:9042
sourceConfig:
config:
type: DatabaseMetadata
includeTables: true
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
from metadata.ingestion.source.database.common_nosql_source import (
SAMPLE_SIZE as GLOBAL_SAMPLE_SIZE,
)
from metadata.ingestion.source.database.common_nosql_source import CommonNoSQLSource
from metadata.ingestion.source.database.common_nosql_source import (
CommonNoSQLSource,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -114,7 +117,9 @@ def get_schema_name_list(self) -> List[str]:
)
raise

def get_table_name_list(self, schema_name: str) -> List[str]:
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
project_id = self.context.get().database
try:
instance = self._get_instance(project_id, schema_name)
Expand All @@ -127,7 +132,10 @@ def get_table_name_list(self, schema_name: str) -> List[str]:
[project_id, instance.instance_id, table.table_id],
table,
)
return list(self.tables[project_id][schema_name].keys())
return [
TableNameAndType(name=table)
for table in self.tables[project_id][schema_name].keys()
]
except Exception as err:
logger.debug(traceback.format_exc())
# add context to the error message
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Source connection handler
"""
from functools import partial
from typing import Optional

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
EXEC_PROFILE_DEFAULT,
Cluster,
ExecutionProfile,
ProtocolVersion,
)
from cassandra.cluster import Session as CassandraSession
from pydantic import BaseModel

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.cassandraConnection import (
CassandraConnection,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.cassandra.queries import (
CASSANDRA_GET_KEYSPACE_MATERIALIZED_VIEWS,
CASSANDRA_GET_KEYSPACE_TABLES,
CASSANDRA_GET_KEYSPACES,
CASSANDRA_GET_RELEASE_VERSION,
)
from metadata.utils.constants import THREE_MIN


def get_connection(connection: CassandraConnection):
"""
Create connection
"""

cluster_config = {}
if hasattr(connection.authType, "cloudConfig"):
cloud_config = connection.authType.cloudConfig
cluster_cloud_config = {
"connect_timeout": cloud_config.connectTimeout,
"use_default_tempdir": True,
"secure_connect_bundle": cloud_config.secureConnectBundle,
}
profile = ExecutionProfile(request_timeout=cloud_config.requestTimeout)
auth_provider = PlainTextAuthProvider("token", cloud_config.token)
cluster_config.update(
{
"cloud": cluster_cloud_config,
"auth_provider": auth_provider,
"execution_profiles": {EXEC_PROFILE_DEFAULT: profile},
"protocol_version": ProtocolVersion.V4,
}
)
else:
host, port = connection.hostPort.split(":")
cluster_config.update({"contact_points": [host], "port": port})
if connection.username and getattr(connection.authType, "password", None):
cluster_config["auth_provider"] = PlainTextAuthProvider(
username=connection.username,
password=connection.authType.password.get_secret_value(),
)

cluster = Cluster(**cluster_config)
session = cluster.connect()

return session


def test_connection(
metadata: OpenMetadata,
session: CassandraSession,
service_connection: CassandraConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
timeout_seconds: Optional[int] = THREE_MIN,
) -> TestConnectionResult:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""

class SchemaHolder(BaseModel):
schema: Optional[str] = None

holder = SchemaHolder()

def test_get_release_version(session: CassandraConnection):
session.execute(CASSANDRA_GET_RELEASE_VERSION)

def test_get_schemas(session: CassandraSession, holder_: SchemaHolder):
for keyspace in session.execute(CASSANDRA_GET_KEYSPACES):
holder_.schema = keyspace.keyspace_name
break

def test_get_tables(session: CassandraSession, holder_: SchemaHolder):
session.execute(CASSANDRA_GET_KEYSPACE_TABLES, [holder_.schema])

def test_get_views(session: CassandraSession, holder_: SchemaHolder):
session.execute(CASSANDRA_GET_KEYSPACE_MATERIALIZED_VIEWS, [holder_.schema])

test_fn = {
"CheckAccess": partial(test_get_release_version, session),
"GetSchemas": partial(test_get_schemas, session, holder),
"GetTables": partial(test_get_tables, session, holder),
"GetViews": partial(test_get_views, session, holder),
}

return test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=timeout_seconds,
)
101 changes: 101 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/cassandra/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Iceberg source helpers.
"""
from __future__ import annotations

from metadata.generated.schema.entity.data.table import Column, DataType


class CassandraColumnParser:
"""
Responsible for containing the logic to parse a column from Cassandra to OpenMetadata
"""

datatype_mapping = {
"ascii": DataType.STRING,
"bigint": DataType.BIGINT,
"blob": DataType.BLOB,
"boolean": DataType.BOOLEAN,
"date": DataType.DATE,
"decimal": DataType.DECIMAL,
"double": DataType.DOUBLE,
"duration": DataType.INTERVAL,
"float": DataType.FLOAT,
"uuid": DataType.UUID,
"inet": DataType.INET,
"int": DataType.INT,
"list": DataType.ARRAY,
"map": DataType.MAP,
"set": DataType.SET,
"smallint": DataType.SMALLINT,
"text": DataType.TEXT,
"time": DataType.TIME,
"timestamp": DataType.TIMESTAMP,
"timeuuid": DataType.UUID,
"tinyint": DataType.TINYINT,
"tuple": DataType.TUPLE,
"varint": DataType.STRING,
"struct": DataType.STRUCT,
}

@classmethod
def parse(cls, field) -> Column:
"""
Parses a Cassandra table column into an OpenMetadata column.
"""

data_type = None
array_data_type = None
raw_data_type = ""
for letter in field.type:
if letter == "<":
if raw_data_type in ("", "frozen"):
raw_data_type = ""
continue

if not data_type:
data_type = cls.datatype_mapping.get(
raw_data_type.lower(), DataType.UNKNOWN
)
elif not array_data_type:
array_data_type = cls.datatype_mapping.get(
raw_data_type.lower(), DataType.UNKNOWN
)
raw_data_type = ""
if data_type != DataType.ARRAY:
break

elif letter != ">":
raw_data_type += letter

elif letter == ">":
if not array_data_type and data_type:
array_data_type = cls.datatype_mapping.get(
raw_data_type.lower(), DataType.UNKNOWN
)
break
else:
if not data_type:
data_type = cls.datatype_mapping.get(
field.type.lower(), DataType.UNKNOWN
)

column_def = {
"name": field.column_name,
"dataTypeDisplay": field.type,
"dataType": data_type,
}
if array_data_type:
column_def["arrayDataType"] = array_data_type

return Column(**column_def)
Loading

0 comments on commit c70a2bb

Please sign in to comment.