Skip to content

Commit

Permalink
Dagster PoC for Metadata service (#23989)
Browse files Browse the repository at this point in the history
* Add airbyte-ci folders

* Add poetry

* Add first dagster job

* Get sensors properly registering

* Trigger job on new files

* Add etag cursor

* Wire up resources and ops

* Parse destinations dataframe

* Add multiple dataframes

* Compute markdown

* Write html and markdown

* Move columns to variable

* move to a folder centered file structure

* Move to sensor factory

* Add resource def to sensor

* Use appropriate credentials

* Use GCSFileManager

* use catalog_directory_manager

* Generalize the gcp catalog resources

* Move bucket to env var

* Clean up and add comments

* Update readme

* Move dependencies into orchestrator

* Add gcs section to readme

* Clean up debug

* Add merge catalog tests

* Run code formatter

* Apply flake8 fixes

* Remove suffix

* Move tests up one level

* Folder rename

* Update readme and rename env

* Add jinja templates

* Rename connectors_ci to connectors for lib
  • Loading branch information
bnchrch authored and erohmensing committed Mar 22, 2023
1 parent 69d5028 commit 6b7604b
Show file tree
Hide file tree
Showing 52 changed files with 50,335 additions and 0 deletions.
172 changes: 172 additions & 0 deletions airbyte-ci/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml

# ruff
.ruff_cache/

# LSP config files
pyrightconfig.json

tmp*
3 changes: 3 additions & 0 deletions airbyte-ci/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Airbyte CI

This folder is a collection of systems, tools and scripts that are used to run Airbyte's CI/CD
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Airbyte Connectors CI

This folder is a collection of systems, tools and scripts that are used to run CI/CD systems specific to our connectors.
12 changes: 12 additions & 0 deletions airbyte-ci/connectors/metadata_service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Metadata Service
This is the begining of metadata service for airbyte.

This system is responsible for the following:
- Validating Connector metadata
- Storing Connector metadata in GCS
- Serving Connector metadata to various consumers
- Aggregating Connector metadata to provide a unified view of all connectors
- Triggering actions based on changes to Connector metadata

## Subsystems
- [Metadata Orchestrator](./orchestrator/README.md)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
METADATA_BUCKET="ben-ab-test-bucket"
112 changes: 112 additions & 0 deletions airbyte-ci/connectors/metadata_service/orchestrator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Connector Orchestrator (WIP)
This is the Orchestrator for Airbyte metadata built on Dagster.


# Setup

## Prerequisites

#### Poetry

Before you can start working on this project, you will need to have Poetry installed on your system. Please follow the instructions below to install Poetry:

1. Open your terminal or command prompt.
2. Install Poetry using the recommended installation method:

```bash
curl -sSL https://install.python-poetry.org | python3 -
```

Alternatively, you can use `pip` to install Poetry:

```bash
pip install --user poetry
```

3. After the installation is complete, close and reopen your terminal to ensure the newly installed `poetry` command is available in your system's PATH.

For more detailed instructions and alternative installation methods, please refer to the official Poetry documentation: https://python-poetry.org/docs/#installation

### Using Poetry in the Project

Once Poetry is installed, you can use it to manage the project's dependencies and virtual environment. To get started, navigate to the project's root directory in your terminal and follow these steps:


## Installation
```bash
poetry install
cp .env.template .env
```

## Create a GCP Service Account and Dev Bucket
Developing against the orchestrator requires a development bucket in GCP.

The orchestrator will use this bucket to:
- store important output files. (e.g. Reports)
- watch for changes to the `catalog` directory in the bucket.

However all tmp files will be stored in a local directory.

To create a development bucket:
1. Create a GCP Service Account with the following permissions:
- Storage Admin
- Storage Object Admin
- Storage Object Creator
- Storage Object Viewer
2. Create a GCS bucket
3. Add the service account as a member of the bucket with the following permissions:
- Storage Admin
- Storage Object Admin
- Storage Object Creator
- Storage Object Viewer
4. Add the following environment variables to your `.env` file:
- `METADATA_BUCKET`
- `GCS_CREDENTIALS`

Note that the `GCS_CREDENTIALS` should be the raw json string of the service account credentials.

Here is an example of how to import the service account credentials into your environment:
```bash
export GCS_CREDENTIALS=`cat /path/to/credentials.json`
```

## The Orchestrator

The orchestrator (built using Dagster) is responsible for orchestrating various the metadata processes.

Dagster has a number of concepts that are important to understand before working on the orchestrator.
1. Assets
2. Resources
3. Schedules
4. Sensors
5. Ops

Refer to the [Dagster documentation](https://docs.dagster.io/concepts) for more information on these concepts.

### Starting the Dagster Daemons
Start the orchestrator with the following command:
```bash
poetry run dagster dev -m orchestrator
```

Then you can access the Dagster UI at http://localhost:3000

Note its important to use `dagster dev` instead of `dagit` because `dagster dev` start additional services that are required for the orchestrator to run. Namely the sensor service.

### Materializing Assets with the UI
When you navigate to the orchestrator in the UI, you will see a list of assets that are available to be materialized.

From here you have the following options
1. Materialize all assets
2. Select a subset of assets to materialize
3. Enable a sensor to automatically materialize assets

### Materializing Assets without the UI

In some cases you may want to run the orchestrator without the UI. To learn more about Dagster's CLI commands, see the [Dagster CLI documentation](https://docs.dagster.io/_apidocs/cli).

## Running Tests
```bash
poetry run pytest
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from dagster import Definitions

from .resources.gcp_resources import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob
from .assets.catalog_assets import (
oss_destinations_dataframe,
cloud_destinations_dataframe,
oss_sources_dataframe,
cloud_sources_dataframe,
latest_oss_catalog_dict,
latest_cloud_catalog_dict,
all_sources_dataframe,
all_destinations_dataframe,
connector_catalog_location_markdown,
connector_catalog_location_html,
)
from .jobs.catalog_jobs import generate_catalog_markdown
from .sensors.catalog_sensors import catalog_updated_sensor

from .config import REPORT_FOLDER, CATALOG_FOLDER


ASSETS = [
oss_destinations_dataframe,
cloud_destinations_dataframe,
oss_sources_dataframe,
cloud_sources_dataframe,
latest_oss_catalog_dict,
latest_cloud_catalog_dict,
all_sources_dataframe,
all_destinations_dataframe,
connector_catalog_location_markdown,
connector_catalog_location_html,
]

RESOURCES = {
"gcp_gcs_client": gcp_gcs_client.configured(
{
"gcp_gcs_cred_string": {"env": "GCS_CREDENTIALS"},
}
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"catalog_report_directory_manager": gcs_file_manager.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "gcs_prefix": REPORT_FOLDER}
),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
}

SENSORS = [catalog_updated_sensor(job=generate_catalog_markdown, resources_def=RESOURCES)]

SCHEDULES = []

JOBS = [generate_catalog_markdown]

"""
START HERE
This is the entry point for the orchestrator.
It is a list of all the jobs, assets, resources, schedules, and sensors that are available to the orchestrator.
"""
defn = Definitions(
jobs=JOBS,
assets=ASSETS,
resources=RESOURCES,
schedules=SCHEDULES,
sensors=SENSORS,
)
Loading

0 comments on commit 6b7604b

Please sign in to comment.