Skip to content

Commit

Permalink
Merge branch 'main' into agg_nan_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
OnkarVO7 authored Oct 25, 2023
2 parents 2e2370f + a72fcec commit e2227fd
Show file tree
Hide file tree
Showing 874 changed files with 662 additions and 520 deletions.
12 changes: 6 additions & 6 deletions docker/docker-compose-quickstart/docker-compose-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ services:
#Database configuration for postgresql
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver}
DB_SCHEME: ${DB_SCHEME:-postgresql}
DB_USE_SSL: ${DB_USE_SSL:-false}
DB_PARAMS: ${DB_PARAMS:-"allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC"}
DB_USER: ${DB_USER:-openmetadata_user}
DB_USER_PASSWORD: ${DB_USER_PASSWORD:-openmetadata_password}
DB_HOST: ${DB_HOST:-postgresql}
Expand Down Expand Up @@ -127,6 +127,7 @@ services:
EVENT_MONITOR_LATENCY: ${EVENT_MONITOR_LATENCY:-[]}

#pipelineServiceClientConfiguration
PIPELINE_SERVICE_CLIENT_ENABLED: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
PIPELINE_SERVICE_CLIENT_CLASS_NAME: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
PIPELINE_SERVICE_IP_INFO_ENABLED: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
PIPELINE_SERVICE_CLIENT_HOST_IP: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
Expand Down Expand Up @@ -162,8 +163,7 @@ services:
OM_URI: ${OM_URI:- "http://localhost:8585"}

#extensionConfiguration
OM_RESOURCE_PACKAGES: ${OM_RESOURCE_PACKAGES:-[]}
OM_EXTENSIONS: ${OM_EXTENSIONS:-[]}
MIGRATION_EXTENSION_PATH: ${MIGRATION_EXTENSION_PATH:-""}

# Heap OPTS Configurations
OPENMETADATA_HEAP_OPTS: ${OPENMETADATA_HEAP_OPTS:--Xmx1G -Xms1G}
Expand Down Expand Up @@ -253,7 +253,7 @@ services:
#Database configuration for postgresql
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver}
DB_SCHEME: ${DB_SCHEME:-postgresql}
DB_USE_SSL: ${DB_USE_SSL:-false}
DB_PARAMS: ${DB_PARAMS:-"allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC"}
DB_USER: ${DB_USER:-openmetadata_user}
DB_USER_PASSWORD: ${DB_USER_PASSWORD:-openmetadata_password}
DB_HOST: ${DB_HOST:-postgresql}
Expand Down Expand Up @@ -281,6 +281,7 @@ services:
EVENT_MONITOR_LATENCY: ${EVENT_MONITOR_LATENCY:-[]}

#pipelineServiceClientConfiguration
PIPELINE_SERVICE_CLIENT_ENABLED: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
PIPELINE_SERVICE_CLIENT_CLASS_NAME: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
PIPELINE_SERVICE_IP_INFO_ENABLED: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
PIPELINE_SERVICE_CLIENT_HOST_IP: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
Expand Down Expand Up @@ -316,8 +317,7 @@ services:
OM_URI: ${OM_URI:- "http://localhost:8585"}

#extensionConfiguration
OM_RESOURCE_PACKAGES: ${OM_RESOURCE_PACKAGES:-[]}
OM_EXTENSIONS: ${OM_EXTENSIONS:-[]}
MIGRATION_EXTENSION_PATH: ${MIGRATION_EXTENSION_PATH:-""}

# Heap OPTS Configurations
OPENMETADATA_HEAP_OPTS: ${OPENMETADATA_HEAP_OPTS:--Xmx1G -Xms1G}
Expand Down
13 changes: 6 additions & 7 deletions docker/docker-compose-quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ services:
# Database configuration for MySQL
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver}
DB_SCHEME: ${DB_SCHEME:-mysql}
DB_USE_SSL: ${DB_USE_SSL:-false}
DB_PARAMS: ${DB_PARAMS:-"allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC"}
DB_USER: ${DB_USER:-openmetadata_user}
DB_USER_PASSWORD: ${DB_USER_PASSWORD:-openmetadata_password}
DB_HOST: ${DB_HOST:-mysql}
Expand Down Expand Up @@ -125,6 +125,7 @@ services:
EVENT_MONITOR_LATENCY: ${EVENT_MONITOR_LATENCY:-[]}

#pipelineServiceClientConfiguration
PIPELINE_SERVICE_CLIENT_ENABLED: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
PIPELINE_SERVICE_CLIENT_CLASS_NAME: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
PIPELINE_SERVICE_IP_INFO_ENABLED: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
PIPELINE_SERVICE_CLIENT_HOST_IP: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
Expand Down Expand Up @@ -160,9 +161,7 @@ services:
OM_URI: ${OM_URI:- "http://localhost:8585"}

#extensionConfiguration
OM_RESOURCE_PACKAGES: ${OM_RESOURCE_PACKAGES:-[]}
OM_EXTENSIONS: ${OM_EXTENSIONS:-[]}

MIGRATION_EXTENSION_PATH: ${MIGRATION_EXTENSION_PATH:-""}

# Heap OPTS Configurations
OPENMETADATA_HEAP_OPTS: ${OPENMETADATA_HEAP_OPTS:--Xmx1G -Xms1G}
Expand Down Expand Up @@ -252,7 +251,7 @@ services:
# Database configuration for MySQL
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver}
DB_SCHEME: ${DB_SCHEME:-mysql}
DB_USE_SSL: ${DB_USE_SSL:-false}
DB_PARAMS: ${DB_PARAMS:-"allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=UTC"}
DB_USER: ${DB_USER:-openmetadata_user}
DB_USER_PASSWORD: ${DB_USER_PASSWORD:-openmetadata_password}
DB_HOST: ${DB_HOST:-mysql}
Expand Down Expand Up @@ -280,6 +279,7 @@ services:
EVENT_MONITOR_LATENCY: ${EVENT_MONITOR_LATENCY:-[]}

#pipelineServiceClientConfiguration
PIPELINE_SERVICE_CLIENT_ENABLED: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
PIPELINE_SERVICE_CLIENT_CLASS_NAME: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
PIPELINE_SERVICE_IP_INFO_ENABLED: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
PIPELINE_SERVICE_CLIENT_HOST_IP: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
Expand Down Expand Up @@ -315,8 +315,7 @@ services:
OM_URI: ${OM_URI:- "http://localhost:8585"}

#extensionConfiguration
OM_RESOURCE_PACKAGES: ${OM_RESOURCE_PACKAGES:-[]}
OM_EXTENSIONS: ${OM_EXTENSIONS:-[]}
MIGRATION_EXTENSION_PATH: ${MIGRATION_EXTENSION_PATH:-""}


# Heap OPTS Configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def yield_topic(
sourceUrl=source_url,
)
yield Either(right=topic)
self.register_record(topic_request=topic)

except Exception as exc:
yield Either(
Expand Down
14 changes: 11 additions & 3 deletions ingestion/tests/unit/topology/database/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ def test_source_url(self):
class BigqueryLineageSourceTest(TestCase):
@patch("metadata.ingestion.source.database.bigquery.connection.get_connection")
@patch("metadata.ingestion.source.database.bigquery.connection.test_connection")
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata")
@patch(
"metadata.ingestion.source.database.bigquery.query_parser.BigqueryQueryParserSource.set_project_id"
)
def __init__(
self, methodName, get_connection, test_connection, OpenMetadata
self,
methodName,
set_project_id_lineage,
test_connection,
get_connection,
) -> None:
super().__init__(methodName)

self.config = OpenMetadataWorkflowConfig.parse_obj(
mock_credentials_path_bq_config
)
self.bq_query_parser = BigqueryLineageSource(self.config.source, OpenMetadata())
self.bq_query_parser = BigqueryLineageSource(
self.config.source, self.config.workflowConfig.openMetadataServerConfig
)

def test_get_engine_without_project_id_specified(self):
for engine in self.bq_query_parser.get_engine():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pip install openmetadata-managed-apis==x.y.z

## Deprecation Notice

- OpenMetadata only supports Python version 3.8 to 3.10.
- OpenMetadata only supports Python version 3.8 to 3.10. We will add support for 3.11 in the release 1.3.

## Breaking Changes for 1.2 Stable Release

Expand All @@ -115,4 +115,57 @@ then there is no way to link a query to a service and the query will be removed.

- Domo Database, Dashboard and Pipeline renamed the `sandboxDomain` in favor of `instanceDomain`.

### Ingestion Framework Changes

We have reorganized the structure of the `Workflow` classes, which requires updated imports:

- **Metadata Workflow**
- From: `from metadata.ingestion.api.workflow import Workflow`
- To: `from metadata.workflow.metadata import MetadataWorkflow`

- **Lineage Workflow**
- From: `from metadata.ingestion.api.workflow import Workflow`
- To: `from metadata.workflow.metadata import MetadataWorkflow` (same as metadata)

- **Usage Workflow**
- From: `from metadata.ingestion.api.workflow import Workflow`
- To: `from metadata.workflow.usage import UsageWorkflow`

- **Profiler Workflow**
- From: `from metadata.profiler.api.workflow import ProfilerWorkflow`
- To: `from metadata.workflow.profiler import ProfilerWorkflow`

- **Data Quality Workflow**
- From: `from metadata.data_quality.api.workflow import TestSuiteWorkflow`
- To: `from metadata.workflow.data_quality import TestSuiteWorkflow`

- **Data Insights Workflow**
- From: `from metadata.data_insight.api.workflow import DataInsightWorkflow`
- To: `from metadata.workflow.data_insight import DataInsightWorkflow`

- **Elasticsearch Reindex Workflow**
- From: `from metadata.ingestion.api.workflow import Workflow`
- To: `from metadata.workflow.metadata import MetadataWorkflow` (same as metadata)

The `Workflow` class that you import can then be called as follows:

```python
from metadata.workflow.workflow_output_handler import print_status

workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
print_status(workflow) # This method has been updated. Before it was `workflow.print_status()`
workflow.stop()
```

If you try to run your workflows externally and start noticing `ImportError`s, you will need to review the points above.

### Metadata CLI Changes

In 1.1.7 and below you could run the Usage Workflow as `metadata ingest -c <path to yaml>`. Now, the Usage Workflow
has its own command `metadata usage -c <path to yaml>`.

### Other Changes

- Pipeline Status are now timestamps in milliseconds.
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -746,7 +747,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -754,7 +755,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -702,7 +703,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -719,7 +720,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -778,7 +779,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -701,7 +702,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -787,7 +788,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ Here we are also importing all the basic requirements to parse YAMLs, handle dat
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -675,7 +676,7 @@ def metadata_ingestion_workflow():
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,20 +321,23 @@ and whatever Airflow requires to create a DAG.
We know that to create a new DAG in Airflow we need a Python file to be placed under the `AIRFLOW_HOME/dags` directory (by default).
Then, calling the `/deploy` endpoint will make the necessary steps to create such a file.

What it is important here is to notice that in order to run a workflow we just need the following few lines of Python code:
What it is important here is to notice that in order to run a metadata ingestion workflow,
we just need the following few lines of Python code:

```python
from metadata.ingestion.api.workflow import Workflow
from metadata.workflow.metadata import MetadataWorkflow

from metadata.workflow.workflow_output_handler import print_status

config = """
<your YAML configuration>
"""

workflow_config = yaml.safe_load(config)
workflow = Workflow.create(workflow_config)
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
print_status(workflow)
workflow.stop()
```

Expand Down
Loading

0 comments on commit e2227fd

Please sign in to comment.