Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagster PoC for Metadata service #23989

Merged
merged 35 commits into from
Mar 20, 2023
Merged

Conversation

bnchrch
Copy link
Contributor

@bnchrch bnchrch commented Mar 13, 2023

Notes for reviewers

  1. Large lines changed correspond to having the catalogs as test resources

Video walk through

https://www.loom.com/share/60d78c1e6b74491186bfe2d49d4f459d

What

Closes #24008

This adds the begining of the metadata service and the dagster orchestrator

How

Add a dagster job that on catalog change outputs a list of unique dockerImage versions and in which catalog they are available.

Example here

Recommended reading order

  1. airbyte-ci/connectors_ci/metadata_service/orchestrator/README.md
  2. airbyte-ci/connectors_ci/metadata_service/orchestrator/orchestrator/init.py
  3. airbyte-ci/connectors_ci/metadata_service/orchestrator/orchestrator/assets/catalog_assets.py
  4. ...te-ci/connectors_ci/metadata_service/orchestrator/orchestrator/resources/gcp_resources.py
  5. ...te-ci/connectors_ci/metadata_service/orchestrator/orchestrator/sensors/catalog_sensors.py

@bnchrch bnchrch marked this pull request as draft March 13, 2023 16:21
Comment on lines 38 to 39
@asset(required_resource_keys={"catalog_report_directory_manager"})
def connector_catalog_location_markdown(context, all_destinations_dataframe, all_sources_dataframe):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 Bye, #23367

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha not quite! But its days are numbered 🗡️

@bnchrch bnchrch force-pushed the bnchrch/poc-dagster-gcs-sensor branch from 9ed8c7e to d705241 Compare March 13, 2023 22:14
@bnchrch bnchrch changed the title Draft: Dagster PoC for Metadata service Dagster PoC for Metadata service Mar 13, 2023
@bnchrch bnchrch requested a review from evantahler March 13, 2023 22:56
@bnchrch bnchrch marked this pull request as ready for review March 13, 2023 22:56
Copy link
Contributor

@alafanechere alafanechere left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very cool to discover Dagster with your PR.
I left a couple of comments on the README because of my unfamiliarity with Dagster. Feel free to return these suggestions about Dagger 😄

airbyte-ci/connectors_ci/README.md Outdated Show resolved Hide resolved
@@ -0,0 +1,2 @@
METADATA_BUCKET="ben-ab-test-bucket"
GCP_GSM_CREDENTIALS=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing new line at the end of the file.
Question: how are the variables defined in .env loaded as environment variables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cp .env.template .env
```

### Create a GCP Service Account and Dev Bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been using minio for local development on buckets. do you think we could use it in this context too to not require interaction with gcloud (console) for development?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using minio! Where are we currently using it?

Dagster has the concept of file and io managers so we should be able to swap between local and remote implementations really easily.

Also Ive got this as a separate issue in our backlog just for now.

https://github.com/airbytehq/airbyte/issues/24090

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think platform uses (or used) Minio to store the logs on local deployment:
https://github.com/airbytehq/airbyte-cloud/blob/49530001e0732d40e187c5b4fd68e544ae5e980a/.env#L68

Comment on lines 35 to 37
4. Add the following environment variables to your `.env` file:
- `METADATA_BUCKET`
- `GCP_GSM_CREDENTIALS`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we need to have the GCP_GSM_CREDENTIALS env var in the orchestrator context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, its because we both upload to GCS and watch for changes for files.

But I imagine your asking why were doing this in an ENV var?

If thats the question its simply because Dagster Cloud is currently best set up to handle config values through env vars, not local files.

return ":".join(etags)


def catalog_updated_sensor(job, resources_def) -> SensorDefinition:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def catalog_updated_sensor(job, resources_def) -> SensorDefinition:
def catalog_updated_sensor(job) -> SensorDefinition:

Don't you think it should be the sensor role to define which resources it needs and maybe create this?
If I'm following your implementation correctly we're defining global resources in init.py and we pass all these resources to this function. What if the resources dict grows with resources this sensor does not need? It will still "build_resources" on resources it does not need right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a fair critique! If we add more resources we will end up building more than we needed to.

The only reason I did it this way is I didnt want new developers to add new resources to one of our reports and accidentally (and silently) break the sensor.

hmm let me investigate if theres a way we can solve both issues

Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving because this is a great start to the project and things work!

comments below are NITs, and I also defer to the pythonic reviews of others.

@@ -0,0 +1,3 @@
# Airbyte Connectors CI

This folder is a collection of systems, tools and scripts that are used to run CI/CD systems specific to our connectors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! All part of phase 3 of the tech spec

All commands below assume you are in the `metadata_service/orchestrator` directory.
### Installation
```bash
poetry install
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do I get poetry? (https://python-poetry.org/docs/)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout! Ill add this

Comment on lines 69 to 117
def all_destinations_dataframe(cloud_destinations_dataframe, oss_destinations_dataframe) -> pd.DataFrame:
"""
Merge the cloud and oss destinations catalogs into a single dataframe.
"""

# Add a column 'is_cloud' to indicate if an image/version pair is in the cloud catalog
cloud_destinations_dataframe["is_cloud"] = True

# Add a column 'is_oss' to indicate if an image/version pair is in the oss catalog
oss_destinations_dataframe["is_oss"] = True

composite_key = ["destinationDefinitionId", "dockerRepository", "dockerImageTag"]

# Merge the two catalogs on the 'image' and 'version' columns, keeping only the unique pairs
merged_catalog = pd.merge(
cloud_destinations_dataframe, oss_destinations_dataframe, how="outer", on=composite_key
).drop_duplicates(subset=composite_key)

# Replace NaN values in the 'is_cloud' and 'is_oss' columns with False
merged_catalog[["is_cloud", "is_oss"]] = merged_catalog[["is_cloud", "is_oss"]].fillna(False)

# Return the merged catalog with the desired columns
return merged_catalog


@asset
def all_sources_dataframe(cloud_sources_dataframe, oss_sources_dataframe) -> pd.DataFrame:
"""
Merge the cloud and oss source catalogs into a single dataframe.
"""

# Add a column 'is_cloud' to indicate if an image/version pair is in the cloud catalog
cloud_sources_dataframe["is_cloud"] = True

# Add a column 'is_oss' to indicate if an image/version pair is in the oss catalog
oss_sources_dataframe["is_oss"] = True

composite_key = ["sourceDefinitionId", "dockerRepository", "dockerImageTag"]

# Merge the two catalogs on the 'image' and 'version' columns, keeping only the unique pairs
merged_catalog = pd.merge(
cloud_sources_dataframe, oss_sources_dataframe, how="outer", on=composite_key
).drop_duplicates(subset=composite_key)

# Replace NaN values in the 'is_cloud' and 'is_oss' columns with False
merged_catalog[["is_cloud", "is_oss"]] = merged_catalog[["is_cloud", "is_oss"]].fillna(False)

# Return the merged catalog with the desired columns
return merged_catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on merging these projects. See comment above

@sensor(
name=f"{job.name}_on_catalog_updated",
job=job,
minimum_interval_seconds=30,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the minimum is where things start when initialized

name=f"{job.name}_on_catalog_updated",
job=job,
minimum_interval_seconds=30,
default_status=DefaultSensorStatus.STOPPED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the defaults be to be enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, Im for having them off by default.

The reason being is if this code went to production, it only requires us to flip the switch the first time then the sensor stays on indefinately (or at least until we purge the database backing dagster, which doesnt really happen)

Also by leaving it off we protect devs from running the full dag when they didnt want to when they start the system

@bnchrch bnchrch force-pushed the bnchrch/poc-dagster-gcs-sensor branch from f6167f4 to 8af5a08 Compare March 17, 2023 03:26
@bnchrch bnchrch force-pushed the bnchrch/poc-dagster-gcs-sensor branch from 22a4be1 to 465c00d Compare March 17, 2023 18:35
Copy link
Contributor

@pedroslopez pedroslopez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Interesting to see how the pipelines/dags are built up

General Q that doesn't necessarily have to be addressed in this PR: Should we start referencing the catalog with the new Registry name instead? I know there are quite a few references to "catalog" in this particular pr 😛

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: feels like we should be able to get away with specifying a minimal catalog with 2-3 connectors for testing purposes. This is ok though.


metadata = {
"preview": MetadataValue.md(markdown),
"gcs_path": MetadataValue.url(file_handle.gcs_path),
Copy link
Contributor

@pedroslopez pedroslopez Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this know to upload to GCS? Is this just something dagster has support for internally via an output that has this gcs_path attribute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All part of the resource we pull in "catalog_report_directory_manager"

You can see the gcs stuff all get wired up in the resources folder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checkin, do we have CI set up to run these tests or have a ticket created to do so if not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill add it to the cut over phase!

@bnchrch
Copy link
Contributor Author

bnchrch commented Mar 17, 2023

@pedroslopez I like the callout for removing the use of catalog in favour of registry.

After the types repo is all set next week im planning to come back to the lib/ folder to refactor the jsonschema definitions out.

Im thinking then is a good time for a rename.

Thoughts?

@bnchrch bnchrch merged commit 2d6f5ee into master Mar 20, 2023
@bnchrch bnchrch deleted the bnchrch/poc-dagster-gcs-sensor branch March 20, 2023 19:28
erohmensing pushed a commit that referenced this pull request Mar 22, 2023
* Add airbyte-ci folders

* Add poetry

* Add first dagster job

* Get sensors properly registering

* Trigger job on new files

* Add etag cursor

* Wire up resources and ops

* Parse destinations dataframe

* Add multiple dataframes

* Compute markdown

* Write html and markdown

* Move columns to variable

* move to a folder centered file structure

* Move to sensor factory

* Add resource def to sensor

* Use appropriate credentials

* Use GCSFileManager

* use catalog_directory_manager

* Generalize the gcp catalog resources

* Move bucket to env var

* Clean up and add comments

* Update readme

* Move dependencies into orchestrator

* Add gcs section to readme

* Clean up debug

* Add merge catalog tests

* Run code formatter

* Apply flake8 fixes

* Remove suffix

* Move tests up one level

* Folder rename

* Update readme and rename env

* Add jinja templates

* Rename connectors_ci to connectors for lib
erohmensing pushed a commit that referenced this pull request Mar 22, 2023
* Add airbyte-ci folders

* Add poetry

* Add first dagster job

* Get sensors properly registering

* Trigger job on new files

* Add etag cursor

* Wire up resources and ops

* Parse destinations dataframe

* Add multiple dataframes

* Compute markdown

* Write html and markdown

* Move columns to variable

* move to a folder centered file structure

* Move to sensor factory

* Add resource def to sensor

* Use appropriate credentials

* Use GCSFileManager

* use catalog_directory_manager

* Generalize the gcp catalog resources

* Move bucket to env var

* Clean up and add comments

* Update readme

* Move dependencies into orchestrator

* Add gcs section to readme

* Clean up debug

* Add merge catalog tests

* Run code formatter

* Apply flake8 fixes

* Remove suffix

* Move tests up one level

* Folder rename

* Update readme and rename env

* Add jinja templates

* Rename connectors_ci to connectors for lib
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a Dagster PoC
4 participants