From c6078c79736b11167523967daad9e606f35856e7 Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Fri, 19 Aug 2022 14:45:58 -0400 Subject: [PATCH 01/26] [DOCUMENTATION] add release cadence policy (#2079) * add missing language to release policy Signed-off-by: Michael Robinson * add release cadence policy to RELEASING.md Signed-off-by: Michael Robinson Signed-off-by: Michael Robinson --- RELEASING.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RELEASING.md b/RELEASING.md index a3f595e39c..1d0e23cd64 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -15,6 +15,10 @@ ![](./docs/assets/images/new-release.png) +# Release Cadence + +A release will be initiated on the 15th day of each month unless this falls on a Friday, Saturday, Sunday or US holiday, in which case the release will commence on the following business day. + # Voting on Releases Anyone may request a new release of the project in the #general Slack channel. From 9e1cb7c89b3e94b62ac27bca4fa405919914bcfe Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Mon, 22 Aug 2022 11:47:32 -0700 Subject: [PATCH 02/26] Apply formatting to changelog Signed-off-by: wslulciuc --- CHANGELOG.md | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b652f15e4..2abf051647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,65 +6,65 @@ ### Fixed -* Fix py module release [#2057](https://github.com/MarquezProject/marquez/pull/2057) [@wslulciuc](https://github.com/wslulciuc) -* Use /bin/sh in web/docker/entrypoint.sh [#2059](https://github.com/MarquezProject/marquez/pull/2059) [@wslulciuc](https://github.com/wslulciuc) +* Fix py module release [`#2057`](https://github.com/MarquezProject/marquez/pull/2057) [@wslulciuc](https://github.com/wslulciuc) +* Use `/bin/sh` in `web/docker/entrypoint.sh` [`#2059`](https://github.com/MarquezProject/marquez/pull/2059) [@wslulciuc](https://github.com/wslulciuc) ## [0.24.0](https://github.com/MarquezProject/marquez/compare/0.23.0...0.24.0) - 2022-08-02 ### Added -* Add copyright lines to all source files [#1996](https://github.com/MarquezProject/marquez/pull/1996) [@merobi-hub](https://github.com/MarquezProject/marquez/commits?author=merobi-hub) -* Add copyright and license guidelines in CONTRIBUTING.md [@wslulciuc](https://github.com/wslulciuc) -* Add @FlywayTarget annotation to migration tests to control flyway upgrades [#2035](https://github.com/MarquezProject/marquez/pull/2035) [@collado-mike](https://github.com/collado-mike) +* Add copyright lines to all source files [`#1996`](https://github.com/MarquezProject/marquez/pull/1996) [@merobi-hub](https://github.com/MarquezProject/marquez/commits?author=merobi-hub) +* Add copyright and license guidelines in `CONTRIBUTING.md` [@wslulciuc](https://github.com/wslulciuc) +* Add `@FlywayTarget` annotation to migration tests to control flyway upgrades [`#2035`](https://github.com/MarquezProject/marquez/pull/2035) [@collado-mike](https://github.com/collado-mike) ### Changed -* Updated `jobs_view` to stop computing FQN on reads and to compute on _writes_ instead [#2036](https://github.com/MarquezProject/marquez/pull/2036) [@collado-mike](https://github.com/collado-mike) -* Runs row reduction [#2041](https://github.com/MarquezProject/marquez/pull/2041) [@collado-mike](https://github.com/collado-mike) +* Updated `jobs_view` to stop computing FQN on reads and to compute on _writes_ instead [`#2036`](https://github.com/MarquezProject/marquez/pull/2036) [@collado-mike](https://github.com/collado-mike) +* Runs row reduction [`#2041`](https://github.com/MarquezProject/marquez/pull/2041) [@collado-mike](https://github.com/collado-mike) ### Fixed -* Update `Run` in the openapi spec to include a `context` field [#2020](https://github.com/MarquezProject/marquez/pull/2020) [@esaych](https://github.com/Esaych) -* Fix dataset openapi model [#2038](https://github.com/MarquezProject/marquez/pull/2038) [@esaych](https://github.com/Esaych) -* Fix casing on lastLifecycleState [#2039](https://github.com/MarquezProject/marquez/pull/2039) [@esaych](https://github.com/Esaych) -* Fix V45 migration to include initial population of jobs_fqn table [#2051](https://github.com/MarquezProject/marquez/pull/2051) [@collado-mike](https://github.com/collado-mike) -* Fix symlinked jobs in queries [#2053](https://github.com/MarquezProject/marquez/pull/2053) [@collado-mike](https://github.com/collado-mike) +* Update `Run` in the openapi spec to include a `context` field [`#2020`](https://github.com/MarquezProject/marquez/pull/2020) [@esaych](https://github.com/Esaych) +* Fix dataset openapi model [`#2038`](https://github.com/MarquezProject/marquez/pull/2038) [@esaych](https://github.com/Esaych) +* Fix casing on `lastLifecycleState` [`#2039`](https://github.com/MarquezProject/marquez/pull/2039) [@esaych](https://github.com/Esaych) +* Fix V45 migration to include initial population of jobs_fqn table [`#2051`](https://github.com/MarquezProject/marquez/pull/2051) [@collado-mike](https://github.com/collado-mike) +* Fix symlinked jobs in queries [`#2053`](https://github.com/MarquezProject/marquez/pull/2053) [@collado-mike](https://github.com/collado-mike) ## [0.23.0](https://github.com/MarquezProject/marquez/compare/0.22.0...0.23.0) - 2022-06-16 ### Added -* Update docker-compose.yml: Randomly map postgres db port [#2000](https://github.com/MarquezProject/marquez/pull/2000) [@RNHTTR](https://github.com/RNHTTR) -* Job parent hierarchy [#1935](https://github.com/MarquezProject/marquez/pull/1935) [#1980](https://github.com/MarquezProject/marquez/pull/1980) [#1992](https://github.com/MarquezProject/marquez/pull/1992) [@collado-mike](https://github.com/collado-mike) +* Update docker-compose.yml: Randomly map postgres db port [`#2000`](https://github.com/MarquezProject/marquez/pull/2000) [@RNHTTR](https://github.com/RNHTTR) +* Job parent hierarchy [`#1935`](https://github.com/MarquezProject/marquez/pull/1935) [`#1980`](https://github.com/MarquezProject/marquez/pull/1980) [`#1992`](https://github.com/MarquezProject/marquez/pull/1992) [@collado-mike](https://github.com/collado-mike) ### Changed -* Set default limit for listing datasets and jobs in UI from `2000` to `25` [#2018](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc) +* Set default limit for listing datasets and jobs in UI from `2000` to `25` [`#2018`](https://github.com/MarquezProject/marquez/pull/2018) [@wslulciuc](https://github.com/wslulciuc) * Update OpenLineage write API to be non-transactional and avoid unnecessary locks on records under heavy contention [@collado-mike](https://github.com/collado-mike) ### Fixed -* Return the tag for postgresql to 12.1.0 [#2015](https://github.com/MarquezProject/marquez/pull/2015) [@rossturk](https://github.com/rossturk) +* Return the tag for postgresql to 12.1.0 [`#2015`](https://github.com/MarquezProject/marquez/pull/2015) [@rossturk](https://github.com/rossturk) ## [0.22.0](https://github.com/MarquezProject/marquez/compare/0.21.0...0.22.0) - 2022-05-16 ### Added -* Add support for `LifecycleStateChangeFacet` with an ability to softly delete datasets [#1847](https://github.com/MarquezProject/marquez/pull/1847)[@pawel-big-lebowski](https://github.com/pawel-big-lebowski) -* Enable pod specific annotations in Marquez Helm Chart via `marquez.podAnnotations` [#1945](https://github.com/MarquezProject/marquez/pull/1945) [@wslulciuc](https://github.com/wslulciuc) -* Add support for job renaming/redirection via symlink [#1947](https://github.com/MarquezProject/marquez/pull/1947) [@collado-mike](https://github.com/collado-mike) -* Add `Created by` view for dataset versions along with SQL syntax highlighting in web UI [#1929](https://github.com/MarquezProject/marquez/pull/1929) [@phixMe](https://github.com/phixMe) -* Add `operationId` to openapi spec [#1978](https://github.com/MarquezProject/marquez/pull/1978) [@phixMe](https://github.com/phixMe) +* Add support for `LifecycleStateChangeFacet` with an ability to softly delete datasets [`#1847`](https://github.com/MarquezProject/marquez/pull/1847)[@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Enable pod specific annotations in Marquez Helm Chart via `marquez.podAnnotations` [`#1945`](https://github.com/MarquezProject/marquez/pull/1945) [@wslulciuc](https://github.com/wslulciuc) +* Add support for job renaming/redirection via symlink [`#1947`](https://github.com/MarquezProject/marquez/pull/1947) [@collado-mike](https://github.com/collado-mike) +* Add `Created by` view for dataset versions along with SQL syntax highlighting in web UI [`#1929`](https://github.com/MarquezProject/marquez/pull/1929) [@phixMe](https://github.com/phixMe) +* Add `operationId` to openapi spec [`#1978`](https://github.com/MarquezProject/marquez/pull/1978) [@phixMe](https://github.com/phixMe) ### Changed -* Upgrade Flyway to v7.6.0 [#1974](https://github.com/MarquezProject/marquez/pull/1974) [@dakshin-k](https://github.com/dakshin-k) +* Upgrade Flyway to v7.6.0 [`#1974`](https://github.com/MarquezProject/marquez/pull/1974) [@dakshin-k](https://github.com/dakshin-k) ### Fixed -* Remove size limits on namespaces, dataset names, and and source connection urls [#1925](https://github.com/MarquezProject/marquez/pull/1925) [@collado-mike](https://github.com/collado-mike) -* Update namespace names to allow `=`, `@`, and `;` [#1936](https://github.com/MarquezProject/marquez/pull/1936) [@mobuchowski](https://github.com/mobuchowski) -* Time duration display in web UI [#1950](https://github.com/MarquezProject/marquez/pull/1950) [@phixMe](https://github.com/phixMe) +* Remove size limits on namespaces, dataset names, and and source connection urls [`#1925`](https://github.com/MarquezProject/marquez/pull/1925) [@collado-mike](https://github.com/collado-mike) +* Update namespace names to allow `=`, `@`, and `;` [`#1936`](https://github.com/MarquezProject/marquez/pull/1936) [@mobuchowski](https://github.com/mobuchowski) +* Time duration display in web UI [`#1950`](https://github.com/MarquezProject/marquez/pull/1950) [@phixMe](https://github.com/phixMe) * Enable web UI to access API via Helm Chart [@GZack2000](https://github.com/GZack2000) ## [0.21.0](https://github.com/MarquezProject/marquez/compare/0.20.0...0.21.0) - 2022-03-03 From 4d9475c2087794a6949c9757866427dc341c2c23 Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Mon, 22 Aug 2022 19:02:31 -0400 Subject: [PATCH 03/26] Add Boring Cyborg to project for automating Github tasks (#2081) * add Boring Cyborg config file Signed-off-by: Michael Robinson * fix typos in boring cyborg config Signed-off-by: Michael Robinson Signed-off-by: Michael Robinson --- .github/boring-cyborg.yml | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 .github/boring-cyborg.yml diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml new file mode 100644 index 0000000000..b3892f2b62 --- /dev/null +++ b/.github/boring-cyborg.yml @@ -0,0 +1,58 @@ +##### Labeler ########################################################################################################## +# Enable "labeler" for your PR that would add labels to PRs based on the paths that are modified in the PR. +labelPRBasedOnFilePath: + # Add 'api' to any changes within 'api' folder or any subfolders + api: + - api/**/* + + # Add 'chart' to any changes within 'chart' folder or any subfolders + chart: + - chart/**/* + + # Add 'docs' to any changes to `.md` files in root or within 'docs' folder, subfolders + docs: + - docs/**/* + - ../*.md + + # Add 'client/java' to any changes within 'clients/java' folder or any subfolders + client/java: + - clients/java/**/* + + # Add 'client/python' to any changes within 'clients/python' folder or any subfolders + client/python: + - clients/python/**/* + + # Add 'docker' to any changes within 'docker' folder + docker: + - docker/* + + # Add 'example' to any changes within 'examples' folder or any subfolders + example: + - examples/**/* + + # Add 'proposal' to any changes within 'proposals' folder or any subfolders + proposal: + - proposals/**/* + + # Add 'spec' to any changes within 'spec' folder + spec: + - spec/* + + # Add 'web' to any changes within 'web' folder or any subfolders + web: + - web/**/* + + + +##### Greetings ######################################################################################################## +# Comment to be posted to welcome users when they open their first PR +firstPRWelcomeComment: > + Thanks for opening your first pull request in the Marquez project! Please check out our contributing guidelines (https://github.com/MarquezProject/marquez/blob/main/CONTRIBUTING.md). + +# Comment to be posted to congratulate user on their first merged PR +firstPRMergeComment: > + Great job! Congrats on your first merged pull request in the Marquez project! + +# Comment to be posted to on first time issues +firstIssueWelcomeComment: > + Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template! \ No newline at end of file From 95d6de58112e7c2e8c38ade18c89170b69813bde Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Mon, 22 Aug 2022 17:42:28 -0700 Subject: [PATCH 04/26] Add `--metadata` option to seed backend with ol events (#2082) * Add --metadata option to seed backend with ol events Signed-off-by: wslulciuc * Fix javadocs Signed-off-by: wslulciuc Signed-off-by: wslulciuc --- .../main/java/marquez/cli/SeedCommand.java | 1511 ++--------------- build.gradle | 2 +- docker-compose.seed.yml | 4 +- docker/metadata.json | 1213 +++++++++++++ docker/seed.sh | 2 +- 5 files changed, 1331 insertions(+), 1401 deletions(-) create mode 100644 docker/metadata.json diff --git a/api/src/main/java/marquez/cli/SeedCommand.java b/api/src/main/java/marquez/cli/SeedCommand.java index 48f4597f8c..4e1b1d131d 100644 --- a/api/src/main/java/marquez/cli/SeedCommand.java +++ b/api/src/main/java/marquez/cli/SeedCommand.java @@ -5,1438 +5,155 @@ package marquez.cli; -import static com.google.common.base.Preconditions.checkArgument; -import static marquez.client.models.JobType.BATCH; -import static marquez.common.base.MorePreconditions.checkNotBlank; +import static marquez.common.Utils.newObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import io.dropwizard.cli.ConfiguredCommand; import io.dropwizard.setup.Bootstrap; -import java.net.URL; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import javax.annotation.Nullable; -import lombok.EqualsAndHashCode; -import lombok.Getter; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import io.openlineage.client.transports.HttpTransport; +import java.nio.file.Paths; import lombok.NonNull; -import lombok.ToString; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import marquez.MarquezConfig; -import marquez.client.MarquezClient; -import marquez.client.Utils; -import marquez.client.models.Dataset; -import marquez.client.models.DatasetId; -import marquez.client.models.DatasetMeta; -import marquez.client.models.DbTableMeta; -import marquez.client.models.Field; -import marquez.client.models.Job; -import marquez.client.models.JobMeta; -import marquez.client.models.Namespace; -import marquez.client.models.NamespaceMeta; -import marquez.client.models.Run; -import marquez.client.models.RunMeta; -import marquez.client.models.Source; -import marquez.client.models.SourceMeta; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; /** - * A command to seed the HTTP API with source, dataset, and job metadata. You can override the - * default {@code host} and {@code port} using the command-line arguments {@code --host} and {@code - * --port}. This command is meant to be used to explore the features of Marquez. For example, - * lineage graph, dataset schemas, job run history, etc. + * A command to seed the HTTP API with source, dataset, and job metadata using OpenLineage. The {@code seed} command is meant to be used to + * explore the features of Marquez. For example, lineage graph analysis, dataset lifecycle + * management, job run history, etc. + * + *

Note: You must specify {@code metadata} using the command-line argument {@code + * --metadata}. Metadata must be defined as a Json file containing an array of {@code OpenLineage} + * events. * *

Usage

* - * For example, to override the {@code port}: + * For example, to override the {@code url}: + * + *
{@code
+ * java -jar marquez-api.jar seed --url http://localhost:5000 --metadata metadata.json marquez.yml
+ * }
+ * + *

where, {@code metadata.json} contains metadata for run {@code + * d46e465b-d358-4d32-83d4-df660ff614dd}: * *

{@code
- * java -jar marquez-api.jar seed --port 5001 marquez.yml
+ * [
+ *   {
+ *     "eventType": "START",
+ *     "eventTime": "2020-02-22T22:42:42.000Z",
+ *     "run": {
+ *       "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
+ *     },
+ *     "job": {
+ *       "namespace": "my-namespace",
+ *       "name": "my-job"
+ *     },
+ *     "inputs": [{
+ *       "namespace": "my-namespace",
+ *       "name": "my-input"
+ *     }],
+ *     "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java"
+ *   },
+ *   {
+ *     "eventType": "COMPLETE",
+ *     "eventTime": "2020-02-22T22:48:12.000Z",
+ *     "run": {
+ *       "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
+ *     },
+ *     "job": {
+ *       "namespace": "my-namespace",
+ *       "name": "my-job"
+ *     },
+ *     "outputs": [{
+ *       "namespace": "my-namespace",
+ *       "name": "my-output",
+ *       "facets": {
+ *         "schema": {
+ *           "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java",
+ *           "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
+ *           "fields": [
+ *             { "name": "a", "type": "VARCHAR"},
+ *             { "name": "b", "type": "VARCHAR"}
+ *           ]
+ *         }
+ *       }
+ *     }],
+ *     "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java"
+ *   }
+ * ]
  * }
* - * Note that all metadata is defined within this class and requires a running instance of Marquez. + *

Note: The {@code seed} command requires a running instance of Marquez. */ @Slf4j public final class SeedCommand extends ConfiguredCommand { - static final String DEFAULT_MARQUEZ_HOST = "localhost"; - static final int DEFAULT_MARQUEZ_PORT = 8080; - - public static final String NAMESPACE_NAME = "food_delivery"; - static final String SOURCE_NAME = "analytics_db"; + /* Default URL for HTTP backend. */ + private static final String DEFAULT_OL_URL = "http://localhost:8080"; - static final int LINEAGE_GRAPH_24_HOUR_WINDOW = 24; - static final int RUN_TIME_IN_SEC_MIN = 120; - static final int RUN_TIME_IN_SEC_MAX = 240; + /* Args for seed command. */ + private static final String CMD_ARG_OL_URL = "url"; + private static final String CMD_ARG_OL_METADATA = "metadata"; + /* Define seed command. */ public SeedCommand() { - super("seed", "seeds the HTTP API with metadata"); + super("seed", "seeds the HTTP API server with metadata"); } + /* Configure seed command. */ @Override - public void configure(@NonNull final net.sourceforge.argparse4j.inf.Subparser subparser) { + public void configure(@NonNull final Subparser subparser) { super.configure(subparser); subparser - .addArgument("--host") - .dest("host") + .addArgument("--url") + .dest("url") .type(String.class) .required(false) - .setDefault(DEFAULT_MARQUEZ_HOST) - .help("the HTTP API server host"); + .setDefault(DEFAULT_OL_URL) + .help("the HTTP API server url"); subparser - .addArgument("--port") - .dest("port") - .type(Integer.class) - .required(false) - .setDefault(DEFAULT_MARQUEZ_PORT) - .help("the HTTP API server port"); + .addArgument("--metadata") + .dest("metadata") + .type(String.class) + .required(true) + .help("the path to the metadata file (ex: path/to/metadata.json)"); } @Override protected void run( - @NonNull final Bootstrap bootstrap, - @NonNull final net.sourceforge.argparse4j.inf.Namespace namespace, - @NonNull final MarquezConfig config) { - final String host = namespace.getString("host"); - final int port = namespace.getInt("port"); - - final URL baseUrl = Utils.toUrl(String.format("http://%s:%d", host, port)); - final MarquezClient client = MarquezClient.builder().baseUrl(baseUrl).build(); - seedApiWithMeta(client, LINEAGE_GRAPH_24_HOUR_WINDOW); + @NonNull Bootstrap bootstrap, + @NonNull Namespace namespace, + @NonNull MarquezConfig config) { + final String olUrl = namespace.getString(CMD_ARG_OL_URL); + final String olMetadata = namespace.getString(CMD_ARG_OL_METADATA); + // Use HTTP transport. + final OpenLineageClient olClient = + OpenLineageClient.builder().transport(HttpTransport.builder().uri(olUrl).build()).build(); + log.info("Connected to '{}'... attempting to seed with metadata!", olUrl); + // Load, then emit events. + final ImmutableList olEvents = loadMetadata(olMetadata); + log.info("Emitting '{}' events to: '{}'", olEvents.size(), olUrl); + int olEventsEmitted = 0; // Keep count of events emitted. + for (final OpenLineage.RunEvent olEvent : olEvents) { + olClient.emit(olEvent); + olEventsEmitted++; + } + log.info("Successfully emitted '{}' events!", olEventsEmitted); } - public void seedApiWithMeta(@NonNull MarquezClient client, int additionalIterations) { - // (1) Create namespace - final NamespaceMeta namespaceMeta = - NamespaceMeta.builder() - .ownerName("owner@food.com") - .description("Food delivery example!") - .build(); - final Namespace newNamespace = client.createNamespace(NAMESPACE_NAME, namespaceMeta); - log.info("Created namespace: {}", newNamespace); - - // (2) Create source - final SourceMeta sourceMeta = - SourceMeta.builder() - .type("POSTGRESQL") - .connectionUrl("jdbc:postgres://localhost:3306/deliveries") - .description("Contains all food delivery orders.") - .build(); - final Source newSource = client.createSource(SOURCE_NAME, sourceMeta); - log.info("Created source: {}", newSource); - - // (3) Seed dataset meta - DATASET_META.forEach( - (datasetName, datasetMeta) -> { - final Dataset newDataset = client.createDataset(NAMESPACE_NAME, datasetName, datasetMeta); - log.info("Created dataset: {}", newDataset); - }); - - // (4) Seed job meta - JOB_META.forEach( - (jobName, jobMeta) -> { - final Job newJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); - log.info("Created job: {}", newJob); - }); - - // (5) Define run start times for each graph level - final Instant startTimesGraphLevel0 = Instant.now(); - final Instant startTimesGraphLevel1 = startTimesGraphLevel0.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel2 = startTimesGraphLevel1.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel3 = startTimesGraphLevel2.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel4 = startTimesGraphLevel3.plusSeconds(RUN_TIME_IN_SEC_MAX); - - final Instant[] startTimesByGraphLevel = { - startTimesGraphLevel0, - startTimesGraphLevel1, - startTimesGraphLevel2, - startTimesGraphLevel3, - startTimesGraphLevel4 - }; - - // (6) Seed run meta for jobs using graph level start times - for (int hourOfDay = additionalIterations; hourOfDay >= 0; hourOfDay--) { - for (final Map.Entry entry : JOB_META.entrySet()) { - final String jobName = entry.getKey(); - final JobMeta jobMeta = entry.getValue(); - - if (hourOfDay >= ACTIVE_RUN_META.get(jobName).size()) { - continue; - } - - // On code change, create a new job version - final ActiveRunMeta activeRunMeta = ACTIVE_RUN_META.get(jobName).get(hourOfDay); - activeRunMeta - .getCodeChange() - .ifPresent( - codeChange -> { - client.createJob( - NAMESPACE_NAME, - jobName, - JobMeta.builder() - .type(jobMeta.getType()) - .inputs(jobMeta.getInputs()) - .outputs(jobMeta.getOutputs()) - .location(codeChange.getToUrl()) - .context(jobMeta.getContext()) - .description(jobMeta.getDescription().orElse(null)) - .build()); - }); - - // Set run start and end times - final Instant runStartedAt = - startTimesByGraphLevel[activeRunMeta.getLevelInGraph()].minus( - Duration.ofHours(hourOfDay)); - final Instant runEndedAt = runStartedAt.plusSeconds(secondsToAdd()); - - // Create run - final RunMeta runMeta = - RunMeta.builder() - .nominalStartTime(runStartedAt.truncatedTo(ChronoUnit.MINUTES)) - .nominalEndTime(runEndedAt.truncatedTo(ChronoUnit.MINUTES)) - .build(); - final Run run = client.createRun(NAMESPACE_NAME, jobName, runMeta); - log.info("Created run for job '{}': {}", jobName, run); - - // Start run - final Run running = client.markRunAsRunning(run.getId(), runStartedAt); - log.info("Marked run for job '{}' as 'RUNNING': {}", jobName, running); - - // Complete or fail run - if (activeRunMeta.isMarkFailed()) { - final Run failed = client.markRunAsFailed(run.getId(), runEndedAt); - log.info("Marked run for job '{}' as 'FAILED': {}", jobName, failed); - } else if (activeRunMeta.isMarkRunning()) { - for (final DatasetId output : jobMeta.getOutputs()) { - final DatasetMeta datasetMeta = DATASET_META.get(output.getName()); - final DbTableMeta.DbTableMetaBuilder dbTableMetaWithChange = - DbTableMeta.builder() - .physicalName(datasetMeta.getPhysicalName()) - .sourceName(datasetMeta.getSourceName()) - .description(datasetMeta.getDescription().orElse(null)) - .runId(run.getId()); - - if (activeRunMeta.hasSchemaChange()) { - activeRunMeta - .schemaChangeFor(output.getName()) - .ifPresent( - schemaChange -> - dbTableMetaWithChange.fields( - fieldsWithChange(datasetMeta.getFields(), schemaChange))); - } else { - dbTableMetaWithChange.fields(datasetMeta.getFields()); - } - final Dataset modifiedDataset = - client.createDataset( - NAMESPACE_NAME, output.getName(), dbTableMetaWithChange.build()); - log.info( - "Dataset '{}' modified by job '{}' on run '{}': {}", - output.getName(), - jobName, - run.getId(), - modifiedDataset); - } - - final Run completed = client.markRunAsCompleted(run.getId(), runEndedAt); - log.info("Marked run for job '{}' as 'COMPLETED': {}", jobName, completed); - } - } - } - } - - int secondsToAdd() { - return new Random().nextInt((RUN_TIME_IN_SEC_MAX - RUN_TIME_IN_SEC_MIN) + 1) - + RUN_TIME_IN_SEC_MIN; - } - - List fieldsWithChange( - @NonNull List fields, @NonNull ActiveRunMeta.SchemaChange change) { - final ImmutableList.Builder fieldsWithChange = ImmutableList.builder(); - for (final Field field : fields) { - fieldsWithChange.add( - (change.getFieldName().equals(field.getName())) - ? Field.builder() - .name(field.getName()) - .type(change.getToType()) - .tags(field.getTags()) - .description(field.getDescription().orElse(null)) - .build() - : field); - } - return fieldsWithChange.build(); - } - - static final ImmutableMap DATASET_META = - new ImmutableMap.Builder() - .put( - "public.orders", - DbTableMeta.builder() - .physicalName("public.orders") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the order.") - .build(), - Field.builder() - .name("placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("quantity") - .type("INTEGER") - .description("The number of the item in the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The unique ID of the discount applied to the order.") - .build(), - Field.builder() - .name("comment") - .type("VARCHAR") - .description("The comment of the order.") - .build())) - .description("A table for orders.") - .build()) - .put( - "public.menus", - DbTableMeta.builder() - .physicalName("public.menus") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the menu.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the menu.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the menu.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the menu.") - .build())) - .description("A table for menus.") - .build()) - .put( - "public.categories", - DbTableMeta.builder() - .physicalName("public.categories") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the category.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the category.") - .build(), - Field.builder() - .name("menu_id") - .type("INTEGER") - .description("The ID of the menu related to the category.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the category.") - .build())) - .description("A table for categories.") - .build()) - .put( - "public.menu_items", - DbTableMeta.builder() - .physicalName("public.menu_items") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the menu item.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the menu item.") - .build(), - Field.builder() - .name("price") - .type("INTEGER") - .description("The price of the menu item.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of the category related to the item.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the menu item.") - .build())) - .description("A table for menu items.") - .build()) - .put( - "public.restaurants", - DbTableMeta.builder() - .physicalName("public.restaurants") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the restaurant.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the restaurant was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the restaurant was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the restaurant.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .tags(Sets.newHashSet("PII")) - .description("The email address of the customer.") - .build(), - Field.builder() - .name("address") - .type("VARCHAR") - .description("The address of the restaurant.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the restaurant.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the restaurant.") - .build(), - Field.builder() - .name("business_hours_id") - .type("INTEGER") - .description( - "The ID of the business hours related to the restaurant.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the restaurant.") - .build())) - .description("A table for customers.") - .build()) - .put( - "public.customers", - DbTableMeta.builder() - .physicalName("public.customers") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the customer.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the customer was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the customer was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the customer.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .tags(Sets.newHashSet("PII")) - .description("The email address of the customer.") - .build(), - Field.builder() - .name("address") - .type("VARCHAR") - .description("The address of the customer.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the customer.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the customer.") - .build())) - .description("A table for customers.") - .build()) - .put( - "public.order_status", - DbTableMeta.builder() - .physicalName("public.order_status") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the order status.") - .build(), - Field.builder() - .name("transitioned_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order status was transitioned.") - .build(), - Field.builder() - .name("status") - .type("VARCHAR") - .description("The status of the order status.") - .build(), - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order related to the order status.") - .build(), - Field.builder() - .name("customer_id") - .type("INTEGER") - .description("The ID of the customer related to the order status.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the order status.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order status.") - .build())) - .description("A table for order status.") - .build()) - .put( - "public.orders_7_days", - DbTableMeta.builder() - .physicalName("public.orders_7_days") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("menu_id") - .type("VARCHAR") - .description("The ID of the menu related to the order.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of category related to the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The ID of the discount applied to the order.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the order.") - .build())) - .description("A table for weekly orders.") - .build()) - .put( - "public.drivers", - DbTableMeta.builder() - .physicalName("public.drivers") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the driver.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the driver was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the driver was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the driver.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .description("The email of the driver.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the driver.") - .build(), - Field.builder() - .name("car_make") - .type("VARCHAR") - .description("The make of the car.") - .build(), - Field.builder() - .name("car_model") - .type("VARCHAR") - .description("The model of the car.") - .build(), - Field.builder() - .name("car_year") - .type("VARCHAR") - .description("The year of the car.") - .build(), - Field.builder() - .name("car_color") - .type("VARCHAR") - .description("The color of the car.") - .build(), - Field.builder() - .name("car_license_plate") - .type("VARCHAR") - .description("The license plate number of the car.") - .build())) - .description("A table for drivers.") - .build()) - .put( - "public.delivery_7_days", - DbTableMeta.builder() - .physicalName("public.delivery_7_days") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("order_dispatched_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was dispatched.") - .build(), - Field.builder() - .name("order_delivered_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was delivered.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("menu_id") - .type("INTEGER") - .description("The ID of the menu related to the order.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of category related to the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The ID of the discount applied to the order.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the order.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order.") - .build())) - .description("A table for weekly deliveries.") - .build()) - .put( - "public.discounts", - DbTableMeta.builder() - .physicalName("public.discounts") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the discount.") - .build(), - Field.builder() - .name("amount_off") - .type("INTEGER") - .description("The amount of the discount.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("starts_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the discount starts.") - .build(), - Field.builder() - .name("ends_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the discount ends.") - .build())) - .description("A table for discounts.") - .build()) - .put( - "public.top_delivery_times", - DbTableMeta.builder() - .physicalName("public.top_delivery_times") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("order_dispatched_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was dispatched.") - .build(), - Field.builder() - .name("order_delivered_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was delivered.") - .build(), - Field.builder() - .name("order_delivered_time") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the total time of delivery.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the order.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order.") - .build())) - .description("A table for top deliveries.") - .build()) - .put( - "public.popular_orders_day_of_week", - DbTableMeta.builder() - .physicalName("public.popular_orders_day_of_week") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_day_of_week") - .type("VARCHAR") - .description("The day of week of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("orders_placed") - .type("INTEGER") - .description("The number of orders placed on day of week.") - .build())) - .description("A table for popular orders by day of week.") - .build()) - .build(); - - static final ImmutableMap JOB_META = - new ImmutableMap.Builder() - .put( - "test.job_with_no_inputs_or_outputs", - JobMeta.builder() - .type(BATCH) - .description("An job with no inputs or outputs.") - .build()) - .put( - "test.job_with_no_runs", - JobMeta.builder().type(BATCH).description("An job with no runs.").build()) - .put( - "example.etl_orders", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.orders") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_orders.py") - .description("Loads newly placed orders weekly.") - .context( - sql( - "INSERT INTO orders (id, placed_on, menu_item_id, quantity, discount_id, comment)\n " - + "SELECT id, placed_on, menu_item_id, quantity, discount_id, comment\n " - + "FROM tmp_orders;")) - .build()) - .put( - "example.etl_menus", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.menus") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_menus.py") - .description("Loads newly added restaurant menus daily.") - .context( - sql( - "INSERT INTO menus (id, name, restaurant_id, description)\n " - + "SELECT id, name, restaurant_id, description\n " - + "FROM tmp_menus;")) - .build()) - .put( - "example.etl_categories", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.categories") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_categories.py") - .description("Loads newly added menus categories daily.") - .context( - sql( - "INSERT INTO categories (id, name, menu_id, description)\n " - + "SELECT id, name, menu_id, description\n " - + "FROM tmp_categories;")) - .build()) - .put( - "example.etl_menu_items", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.menu_items") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_menu_items.py") - .description("Loads newly added restaurant menu items daily.") - .context( - sql( - "INSERT INTO menu_items (id, name, price, category_id, description)\n " - + "SELECT id, name, price, category_id, description\n " - + "FROM tmp_menu_items;")) - .build()) - .put( - "example.etl_restaurants", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.restaurants") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_restaurants.py") - .description("Loads newly registered restaurants daily.") - .context( - sql( - "INSERT INTO restaurants (id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description)\n " - + "SELECT id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description\n " - + "FROM tmp_restaurants;")) - .build()) - .put( - "example.etl_customers", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.customers") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_customers.py") - .description("Loads newly registered customers daily.") - .context( - sql( - "INSERT INTO customers (id, created_at, updated_at, name, email, phone, city_id)\n " - + "SELECT id, created_at, updated_at, name, email, phone, city_id\n " - + "FROM tmp_customers;")) - .build()) - .put( - "example.etl_order_status", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.order_status") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_order_status.py") - .description("Loads order statues updates daily.") - .context( - sql( - "INSERT INTO order_status (id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id)\n " - + "SELECT id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id\n " - + "FROM tmp_order_status;")) - .build()) - .put( - "example.etl_drivers", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.drivers") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_drivers.py") - .description("Loads newly registered drivers daily.") - .context( - sql( - "INSERT INTO drivers (id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate)\n " - + "SELECT id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate\n " - + "FROM tmp_drivers;")) - .build()) - .put( - "example.etl_orders_7_days", - JobMeta.builder() - .type(BATCH) - .inputs( - NAMESPACE_NAME, - "public.menus", - "public.menu_items", - "public.orders", - "public.categories") - .outputs(NAMESPACE_NAME, "public.orders_7_days") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_orders_7_days.py") - .description("Loads newly placed orders weekly.") - .context( - sql( - "INSERT INTO orders_7_days (order_id, placed_on, discount_id, menu_id, restaurant_id, menu_item_id, category_id)\n " - + "SELECT o.id AS order_id, o.placed_on, o.discount_id, m.id AS menu_id, m.restaurant_id, mi.id AS menu_item_id, c.id AS category_id\n" - + " FROM orders AS o\n" - + " INNER JOIN menu_items AS mi\n" - + " ON menu_items.id = o.menu_item_id\n" - + " INNER JOIN categories AS c\n" - + " ON c.id = mi.category_id\n" - + " INNER JOIN menu AS m\n" - + " ON m.id = c.menu_id\n" - + " WHERE o.placed_on >= NOW() - interval '7 days';")) - .build()) - .put( - "example.etl_delivery_7_days", - JobMeta.builder() - .type(BATCH) - .inputs( - NAMESPACE_NAME, - "public.orders_7_days", - "public.customers", - "public.order_status", - "public.drivers", - "public.restaurants") - .outputs(NAMESPACE_NAME, "public.delivery_7_days") - .location( - "https://github.com/example/jobs/blob/4d0b5d374261fdaf60a1fc588dd8f0d124b0e87f/etl_delivery_7_days.py") - .description("Loads new deliveries for the week.") - .context( - sql( - "INSERT INTO delivery (order_id, order_placed_on, order_dispatched_on, order_delivered_on, customer_email,\n" - + " customer_address, discount_id, menu_id, restaurant_id, restaurant_address, menu_item_id, category_id, driver_id)\n" - + " SELECT o.order_id, o.placed_on AS order_placed_on,\n" - + " (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DISPATCHED') AS order_dispatched_on,\n" - + " (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DELIVERED') AS order_delivered_on,\n" - + " c.email AS customer_email, c.address AS customer_address, o.discount_id, o.menu_id, o.restaurant_id,\n" - + " r.address, o.menu_item_id, o.category_id, d.id AS driver_id\n" - + " FROM orders_7_days AS o\n" - + " INNER JOIN order_status AS os\n" - + " ON os.order_id = o.order_id\n" - + " INNER JOIN customers AS c\n" - + " ON c.id = os.customer_id\n" - + " INNER JOIN restaurants AS r\n" - + " ON r.id = os.restaurant_id\n" - + " INNER JOIN drivers AS d\n" - + " ON d.id = os.driver_id\n" - + " WHERE os.transitioned_at >= NOW() - interval '7 days';")) - .build()) - .put( - "example.delivery_times_7_days", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.delivery_7_days") - .outputs(NAMESPACE_NAME, "public.top_delivery_times", "public.discounts") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/delivery_times_7_days.py") - .description("Determine weekly top delivery times by restaurant.") - .context( - sql( - "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n" - + " customer_email, restaurant_id, driver_id)\n" - + " SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n" - + " customer_email, restaurant_id, driver_id\n" - + " FROM delivery_7_days\n" - + "GROUP BY restaurant_id\n" - + "ORDER BY order_delivery_time DESC\n" - + " LIMIT 1;")) - .build()) - .put( - "example.email_discounts", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.customers", "public.discounts") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/email_discounts.py") - .description("Email discounts to customers that have experienced order delays.") - .context(sql("SELECT * FROM discounts;")) - .build()) - .put( - "example.orders_popular_day_of_week", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.customers", "public.top_delivery_times") - .outputs(NAMESPACE_NAME, "public.popular_orders_day_of_week") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/orders_popular_day_of_week.py") - .description("Determines the popular day of week orders are placed.") - .context( - sql( - "INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)\n" - + " SELECT order_day_of_week, order_placed_on, COUNT(*)\n" - + " FROM top_delivery_times;")) - .build()) - .build(); - - private static Map sql(final String sql) { - return ImmutableMap.of("sql", sql); - } - - @EqualsAndHashCode - @ToString - static final class ActiveRunMeta { - @Getter private final int levelInGraph; - @Getter private final boolean markFailed; - @Getter private final boolean markRunning; - @Nullable private final CodeChange codeChange; - @Getter private final ImmutableSet schemaChanges; - - public ActiveRunMeta( - final int levelInGraph, - final boolean markFailed, - final boolean markRunning, - @Nullable final CodeChange codeChange, - @NonNull final ImmutableSet schemaChanges) { - this.levelInGraph = levelInGraph; - this.markFailed = markFailed; - this.markRunning = markRunning; - this.codeChange = codeChange; - this.schemaChanges = schemaChanges; - } - - public Optional getCodeChange() { - return Optional.ofNullable(codeChange); - } - - public boolean hasChanges() { - return !hasCodeChange() && !hasSchemaChange(); - } - - public boolean hasCodeChange() { - return (codeChange != null); - } - - public boolean hasSchemaChange() { - return !schemaChanges.isEmpty(); - } - - public Optional schemaChangeFor(String datasetName) { - checkArgument(hasSchemaChange()); - for (SchemaChange schemaChange : schemaChanges) { - if (schemaChange.getDatasetName().equals(checkNotBlank(datasetName))) { - return Optional.of(schemaChange); - } - } - return Optional.empty(); - } - - interface Change {} - - @EqualsAndHashCode - @ToString - static class CodeChange implements Change { - @Getter String jobName; - @Nullable URL fromUrl; - @Getter URL toUrl; - - public CodeChange( - @NonNull final String jobName, @Nullable final URL fromUrl, @NonNull final URL toUrl) { - this.jobName = jobName; - this.fromUrl = fromUrl; - this.toUrl = toUrl; - } - - public Optional getFromUrl() { - return Optional.ofNullable(fromUrl); - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private String jobName; - private URL fromUrl; - private URL toUrl; - - public Builder jobName(@NonNull String jobName) { - this.jobName = jobName; - return this; - } - - public Builder fromUrl(@NonNull String fromUrlString) { - return fromUrl(Utils.toUrl(fromUrlString)); - } - - public Builder fromUrl(@NonNull URL fromUrl) { - this.fromUrl = fromUrl; - return this; - } - - public Builder toUrl(@NonNull String toUrl) { - return toUrl(Utils.toUrl(toUrl)); - } - - public Builder toUrl(@NonNull URL toUrl) { - this.toUrl = toUrl; - return this; - } - - public CodeChange build() { - return new CodeChange(jobName, fromUrl, toUrl); - } - } - } - - @EqualsAndHashCode - @ToString - static class SchemaChange implements Change { - @Getter private String datasetName; - @Getter private String fieldName; - @Getter private String fromType; - @Getter private String toType; - - public SchemaChange( - @NonNull final String datasetName, - @NonNull final String fieldName, - @NonNull final String fromType, - @NonNull final String toType) { - this.datasetName = datasetName; - this.fieldName = fieldName; - this.fromType = fromType; - this.toType = toType; - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private String datasetName; - private String fieldName; - private String fromType; - private String toType; - - public Builder datasetName(@NonNull String datasetName) { - this.datasetName = datasetName; - return this; - } - - public Builder fieldName(@NonNull String fieldName) { - this.fieldName = fieldName; - return this; - } - - public Builder fromType(@NonNull String fromType) { - this.fromType = fromType; - return this; - } - - public Builder toType(@NonNull String toType) { - this.toType = toType; - return this; - } - - public SchemaChange build() { - return new SchemaChange(datasetName, fieldName, fromType, toType); - } - } - } - - public static ImmutableList successes( - final int levelInLineageGraph, final int numOfSuccesses) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfSuccesses; i++) { - activeRuns.add(ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).build()); - } - return activeRuns.build(); - } - - public static ImmutableList failures( - final int levelInLineageGraph, final int numOfFailures) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfFailures; i++) { - activeRuns.add( - ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).markFailed().build()); - } - return activeRuns.build(); - } - - public static ImmutableList running( - final int levelInLineageGraph, final int numOfRunning) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfRunning; i++) { - activeRuns.add( - ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).markRunning().build()); - } - return activeRuns.build(); - } - - public static ImmutableList randomize( - final int levelInLineageGraph, final int numOfRandom) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfRandom; i++) { - if (new Random().nextBoolean()) { - activeRuns.add(successes(levelInLineageGraph, 1).get(0)); - } else { - activeRuns.add(failures(levelInLineageGraph, 1).get(0)); - } - } - return activeRuns.build(); - } - - public static ActiveRunMeta successesWith( - int levelInLineageGraph, @Nullable final SchemaChange... schemaChanges) { - return successesWith(levelInLineageGraph, null, schemaChanges); - } - - public static ActiveRunMeta successesWith( - final int levelInLineageGraph, - @Nullable final CodeChange codeChange, - @Nullable final SchemaChange... schemaChanges) { - return ActiveRunMeta.builder() - .levelInLineageGraph(levelInLineageGraph) - .codeChange(codeChange) - .schemaChanges(ImmutableSet.copyOf(schemaChanges)) - .build(); - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private int levelInLineageGraph; - private boolean markFailed; - private boolean markRunning; - private CodeChange codeChange; - private ImmutableSet schemaChanges; - - private Builder() { - this.markFailed = false; - this.markRunning = true; - this.schemaChanges = ImmutableSet.of(); - } - - public Builder levelInLineageGraph(int levelInLineageGraph) { - this.levelInLineageGraph = levelInLineageGraph; - return this; - } - - public Builder markFailed() { - this.markFailed = true; - return this; - } - - public Builder markRunning() { - this.markRunning = true; - return this; - } - - public Builder codeChange(@NonNull CodeChange codeChange) { - this.codeChange = codeChange; - return this; - } - - public Builder schemaChanges(@NonNull ImmutableSet schemaChanges) { - this.schemaChanges = schemaChanges; - return this; - } - - public ActiveRunMeta build() { - return new ActiveRunMeta( - levelInLineageGraph, markFailed, markRunning, codeChange, schemaChanges); - } - } + /* Returns {@link OpenLineage.RunEvent}s contained within the provided metadata file. */ + @SneakyThrows + private ImmutableList loadMetadata(@NonNull String olMetadata) { + log.info("Loading metadata from: '{}'", olMetadata); + return newObjectMapper() + .readValue( + Paths.get(olMetadata).toFile(), + new TypeReference>() {}); } - - static final LinkedHashMap> ACTIVE_RUN_META = - Maps.newLinkedHashMap( - new ImmutableMap.Builder>() - .put("test.job_with_no_inputs_or_outputs", ActiveRunMeta.failures(0, 2)) - .put("test.job_with_no_runs", ActiveRunMeta.successes(0, 0)) - .put( - "example.etl_categories", - ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_menu_items", - ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put("example.etl_menus", ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_orders", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.running(0, 1), - ActiveRunMeta.randomize(0, LINEAGE_GRAPH_24_HOUR_WINDOW - 1)))) - .put( - "example.etl_customers", ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put("example.etl_drivers", ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_order_status", - ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_orders_7_days", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.running(1, 1), - ActiveRunMeta.successes(1, 1), - ActiveRunMeta.randomize(1, LINEAGE_GRAPH_24_HOUR_WINDOW - 2)))) - .put( - "example.etl_restaurants", - ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_delivery_7_days", - ImmutableList.copyOf( - Iterables.concat( - ImmutableList.of( - ActiveRunMeta.successesWith( - 2, - ActiveRunMeta.CodeChange.builder() - .jobName("example.etl_delivery_7_days") - .fromUrl( - JOB_META - .get("example.etl_delivery_7_days") - .getLocation() - .orElse(null)) - .toUrl( - "https://github.com/example/jobs/blob/c87f2a40553cfa4ae7178083a068bf1d0c6ca3a8/etl_delivery_7_days.py") - .build(), - ActiveRunMeta.SchemaChange.builder() - .datasetName("public.delivery_7_days") - .fieldName("discount_id") - .fromType("INTEGER") - .toType("VARCHAR") - .build())), - ActiveRunMeta.successes(2, LINEAGE_GRAPH_24_HOUR_WINDOW - 1)))) - .put( - "example.delivery_times_7_days", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.failures(3, 1), - ActiveRunMeta.successes(3, LINEAGE_GRAPH_24_HOUR_WINDOW - 2), - ActiveRunMeta.failures(3, 1)))) - .put( - "example.email_discounts", - ActiveRunMeta.successes(4, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.orders_popular_day_of_week", - ActiveRunMeta.successes(4, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .build()); } diff --git a/build.gradle b/build.gradle index 275fbab702..996f91d8ba 100644 --- a/build.gradle +++ b/build.gradle @@ -59,7 +59,7 @@ subprojects { junit5Version = '5.8.2' lombokVersion = '1.18.24' mockitoVersion = '4.5.1' - openlineageVersion = '0.8.1' + openlineageVersion = '0.13.0' slf4jVersion = '1.7.36' postgresqlVersion = '42.3.5' isReleaseVersion = !version.endsWith('SNAPSHOT') diff --git a/docker-compose.seed.yml b/docker-compose.seed.yml index 725a3446f5..3aa9ea373a 100644 --- a/docker-compose.seed.yml +++ b/docker-compose.seed.yml @@ -4,11 +4,11 @@ services: image: "marquezproject/marquez:${TAG}" container_name: seed-marquez-with-metadata environment: - - MARQUEZ_HOST=api - - MARQUEZ_PORT=${API_PORT} + - MARQUEZ_URL=http://api:${API_PORT} volumes: - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh - ./docker/seed.sh:/usr/src/app/seed.sh + - ./docker/metadata.json:/usr/src/app/metadata.json links: - "db:postgres" depends_on: diff --git a/docker/metadata.json b/docker/metadata.json new file mode 100644 index 0000000000..538775030a --- /dev/null +++ b/docker/metadata.json @@ -0,0 +1,1213 @@ +[ + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "d46e465b-d358-4d32-83d4-df660ff614dd" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menus", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO menus (id, name, restaurant_id, description)\n SELECT id, name, restaurant_id, description\n FROM tmp_menus;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added restaurant menus daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.menus", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the menu." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the menu." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the restaurant related to the menu." + }, + { + "name": "description", + "type": "TEXT", + "tags": [], + "description": "The description of the menu." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:45:52.000Z", + "run": { + "runId": "d46e465b-d358-4d32-83d4-df660ff614dd" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menus" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "6f0c13a5-f29b-46a5-90c1-0ffbebbbd1aa" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_categories", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO categories (id, name, menu_id, description)\n SELECT id, name, menu_id, description\n FROM tmp_categories;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added menus categories daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.categories", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the category." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the category." + }, + { + "name": "menu_id", + "type": "INTEGER", + "description": "The ID of the menu related to the category." + }, + { + "name": "description", + "type": "TEXT", + "description": "The description of the category." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "6f0c13a5-f29b-46a5-90c1-0ffbebbbd1aa" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_categories" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "e05901b1-3a06-4b98-8d9c-aaf188c9a28c" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menu_items", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO menu_items (id, name, price, category_id, description)\n SELECT id, name, price, category_id, description\n FROM tmp_menu_items;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added restaurant menu items daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.menu_items", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The ID of the menu item." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the menu item." + }, + { + "name": "price", + "type": "VARCHAR", + "description": "The price of the menu item." + }, + { + "name": "category_id", + "type": "VARCHAR", + "description": "The ID of the category related to the item." + }, + { + "name": "description", + "type": "TEXT", + "description": "The description of the menu item." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:54.000Z", + "run": { + "runId": "e05901b1-3a06-4b98-8d9c-aaf188c9a28c" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menu_items" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "a43a8523-349f-4296-807f-3354ac491990" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO orders (id, placed_on, menu_item_id, quantity, discount_id, comment)\n SELECT id, placed_on, menu_item_id, quantity, discount_id, comment\n FROM tmp_orders;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly placed orders daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.orders", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the order." + }, + { + "name": "placed_on", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "description": "The ID of the menu item related to the order." + }, + { + "name": "quantity", + "type": "INTEGER", + "description": "The number of the item in the order." + }, + { + "name": "discount_id", + "type": "INTEGER", + "description": "The unique ID of the discount applied to the order." + }, + { + "name": "comment", + "type": "TEXT", + "description": "The comment of the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "a43a8523-349f-4296-807f-3354ac491990" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "ffba2c14-4170-48da-bec3-ab5fd4ec9a3f" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO orders_7_days (order_id, placed_on, discount_id, restaurant_id, menu_id, menu_item_id, category_id)\n SELECT o.id AS order_id, o.placed_on, o.discount_id, m.restaurant_id, m.id AS menu_id, mi.id AS menu_item_id, c.id AS category_id\n FROM orders AS o\n INNER JOIN menu_items AS mi\n ON menu_items.id = o.menu_item_id\n INNER JOIN categories AS c\n ON c.id = mi.category_id\n INNER JOIN menu AS m\n ON m.id = c.menu_id\n WHERE o.placed_on >= NOW() - interval '7 days';" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly placed orders weekly." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.menus" + }, + { + "namespace": "food_delivery", + "name": "public.menu_items" + }, + { + "namespace": "food_delivery", + "name": "public.orders" + }, + { + "namespace": "food_delivery", + "name": "public.categories" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.orders_7_days", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "description": "The ID of the order." + }, + { + "name": "placed_on", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "discount_id", + "type": "INTEGER", + "description": "The ID of the discount applied to the order." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "description": "The ID of the restaurant related to the order." + }, + { + "name": "menu_id", + "type": "INTEGER", + "description": "The ID of the menu related to the order." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "description": "The ID of the menu item related to the order." + }, + { + "name": "category_id", + "type": "INTEGER", + "description": "The ID of category related to the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:02.000Z", + "run": { + "runId": "ffba2c14-4170-48da-bec3-ab5fd4ec9a3f" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "182a9eaf-881a-4d49-860c-f7e260b8bf60" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_customers", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO customers (id, created_at, updated_at, name, email, phone, city_id)\n SELECT id, created_at, updated_at, name, email, phone, city_id\n FROM tmp_customers;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered customers daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.customers", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the customer." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the customer was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the customer was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the customer." + }, + { + "name": "email", + "type": "VARCHAR", + "description": "The email address of the customer." + }, + { + "name": "address", + "type": "VARCHAR", + "description": "The address of the customer." + }, + { + "name": "phone", + "type": "VARCHAR", + "description": "The phone number of the customer." + }, + { + "name": "city_id", + "type": "INTEGER", + "description": "The ID of the city related to the customer." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:55.000Z", + "run": { + "runId": "182a9eaf-881a-4d49-860c-f7e260b8bf60" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_customers" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "b7098939-87f0-4207-878f-dfd8e8804d8a" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_order_status", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO order_status (id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id)\n SELECT id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id\n FROM tmp_order_status;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads order statues updates daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.order_status", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the order status." + }, + { + "name": "transitioned_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order status was transitioned." + }, + { + "name": "status", + "type": "VARCHAR", + "description": "The status of the order." + }, + { + "name": "order_id", + "type": "INTEGER", + "description": "The ID of the order related to the order status." + }, + { + "name": "customer_id", + "type": "INTEGER", + "description": "The ID of the customer related to the order status." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "description": "The ID of the restaurant related to the order status." + }, + { + "name": "driver_id", + "type": "INTEGER", + "description": "The ID of the driver related to the order status." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "b7098939-87f0-4207-878f-dfd8e8804d8a" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_order_status" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "9f3db1c5-5e9a-4280-8184-18aca4592c77" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_drivers", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO drivers (id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate)\n SELECT id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate\n FROM tmp_drivers;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered drivers daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.drivers", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the driver." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the driver was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the driver was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the driver." + }, + { + "name": "email", + "type": "VARCHAR", + "tags": [], + "description": "The email of the driver." + }, + { + "name": "phone", + "type": "VARCHAR", + "tags": [], + "description": "The phone number of the driver." + }, + { + "name": "car_make", + "type": "VARCHAR", + "tags": [], + "description": "The make of the car." + }, + { + "name": "car_model", + "type": "VARCHAR", + "tags": [], + "description": "The model of the car." + }, + { + "name": "car_year", + "type": "VARCHAR", + "tags": [], + "description": "The year of the car." + }, + { + "name": "car_color", + "type": "VARCHAR", + "tags": [], + "description": "The color of the car." + }, + { + "name": "car_license_plate", + "type": "VARCHAR", + "tags": [], + "description": "The license plate number of the car." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "9f3db1c5-5e9a-4280-8184-18aca4592c77" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_drivers" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "8ddfb1d9-415f-4850-bcd6-01d02f011abe" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_restaurants", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO restaurants (id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description)\n SELECT id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description\n FROM tmp_restaurants;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered restaurants daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.restaurants", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the restaurant." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the restaurant was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the restaurant was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the restaurant." + }, + { + "name": "email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the restaurant." + }, + { + "name": "address", + "type": "VARCHAR", + "tags": [], + "description": "The address of the restaurant." + }, + { + "name": "phone", + "type": "VARCHAR", + "tags": [], + "description": "The phone number of the restaurant." + }, + { + "name": "city_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the city related to the restaurant." + }, + { + "name": "business_hours_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the business hours related to the restaurant." + }, + { + "name": "description", + "type": "TEXT", + "tags": [], + "description": "The description of the restaurant." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:56.000Z", + "run": { + "runId": "8ddfb1d9-415f-4850-bcd6-01d02f011abe" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_restaurants" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "d5a2a4c4-fc78-428d-ae85-08c942ed8371" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_delivery_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO delivery_7_days (order_id, order_placed_on, order_dispatched_on, order_delivered_on, customer_email,\n customer_address, discount_id, menu_id, restaurant_id, restaurant_address, menu_item_id, category_id, driver_id)\n SELECT o.order_id, o.placed_on AS order_placed_on,\n (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DISPATCHED') AS order_dispatched_on,\n (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DELIVERED') AS order_delivered_on,\n c.email AS customer_email, c.address AS customer_address, o.discount_id, o.menu_id, o.restaurant_id,\n r.address, o.menu_item_id, o.category_id, d.id AS driver_id\n FROM orders_7_days AS o\n INNER JOIN order_status AS os\n ON os.order_id = o.order_id\n INNER JOIN customers AS c\n ON c.id = os.customer_id\n INNER JOIN restaurants AS r\n ON r.id = os.restaurant_id\n INNER JOIN drivers AS d\n ON d.id = os.driver_id\n WHERE os.transitioned_at >= NOW() - interval '7 days';" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads new deliveries for the week." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.orders_7_days" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + }, + { + "namespace": "food_delivery", + "name": "public.order_status" + }, + { + "namespace": "food_delivery", + "name": "public.drivers" + }, + { + "namespace": "food_delivery", + "name": "public.restaurants" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.delivery_7_days", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "order_dispatched_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was dispatched." + }, + { + "name": "order_delivered_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was delivered." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "menu_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the menu related to the order." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the menu item related to the order." + }, + { + "name": "category_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of category related to the order." + }, + { + "name": "discount_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the discount applied to the order" + }, + { + "name": "city_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the city related to the order." + }, + { + "name": "driver_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the driver related to the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:48:12.000Z", + "run": { + "runId": "d5a2a4c4-fc78-428d-ae85-08c942ed8371" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_delivery_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "bd41a42a-bf18-4b74-9bb7-cd62637823d8" + }, + "job": { + "namespace": "food_delivery", + "name": "delivery_times_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n customer_email, restaurant_id, driver_id)\n SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n customer_email, restaurant_id, driver_id\n FROM delivery_7_days\n GROUP BY restaurant_id\n ORDER BY order_delivery_time DESC\n LIMIT 1;\nINSERT INTO discounts (amount_off, customer_email, starts_on, ends_on)\n SELECT customer_email, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n CASE WHEN order_delivery_time >= 60 THEN 15\n ELSE 5\n END AS amount_off,\n NOW() AS starts_on,\n NOW() + interval '7 days' AS ends_on\n FROM top_delivery_times\n WHERE order_delivery_time >= 45;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Determine weekly top delivery times by restaurant." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.delivery_7_days" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.top_delivery_times", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "order_dispatched_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was dispatched." + }, + { + "name": "order_delivered_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was delivered." + }, + { + "name": "order_delivered_time", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the total time of delivery." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the restaurant related to the order." + }, + { + "name": "driver_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the driver related to the order." + } + ] + } + } + }, + { + "namespace": "food_delivery", + "name": "public.discounts", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the discount." + }, + { + "name": "amount_off", + "type": "INTEGER", + "tags": [], + "description": "The amount of the discount." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "starts_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the discount starts." + }, + { + "name": "ends_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the discount ends." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:58:02.000Z", + "run": { + "runId": "bd41a42a-bf18-4b74-9bb7-cd62637823d8" + }, + "job": { + "namespace": "food_delivery", + "name": "delivery_times_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "adc8507c-595e-4d76-9dac-be2bf0ffe1ee" + }, + "job": { + "namespace": "food_delivery", + "name": "orders_popular_day_of_week", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)\n SELECT EXTRACT(DOW FROM t.order_placed_on) AS order_day_of_week, t.order_placed_on, COUNT(*) AS orders_placed\n FROM top_delivery_times AS t\n INNER JOIN customers AS c\n ON d.customer_email = c.email\n GROUP BY t.order_day_of_week\n ORDER BY t.orders_placed DESC\n LIMIT 1;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Determines the popular day of week orders are placed." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.top_delivery_times" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.popular_orders_day_of_week", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_day_of_week", + "type": "INTEGER", + "tags": [], + "description": "The day of week of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "orders_placed", + "type": "INTEGER", + "tags": [], + "description": "The number of orders placed on day of week." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:46:12.000Z", + "run": { + "runId": "adc8507c-595e-4d76-9dac-be2bf0ffe1ee" + }, + "job": { + "namespace": "food_delivery", + "name": "orders_popular_day_of_week" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "3ab25429-cf9c-4d1d-9166-1e1946f9d3c3" + }, + "job": { + "namespace": "food_delivery", + "name": "email_discounts", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "SELECT d.id, d.amount_off, d.customer_email, d.starts_on, d.ends_on\n FROM discounts AS d\n INNER JOIN customers AS c\n ON d.customer_email = c.email;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Email discounts to customers that have experienced order delays." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.discounts" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:56:01.000Z", + "run": { + "runId": "3ab25429-cf9c-4d1d-9166-1e1946f9d3c3" + }, + "job": { + "namespace": "food_delivery", + "name": "email_discounts" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + } +] diff --git a/docker/seed.sh b/docker/seed.sh index 69c1c5f5fa..25134e4d92 100755 --- a/docker/seed.sh +++ b/docker/seed.sh @@ -12,4 +12,4 @@ if [[ -z "${MARQUEZ_CONFIG}" ]]; then echo "WARNING 'MARQUEZ_CONFIG' not set, using development configuration." fi -java -jar marquez-api-*.jar seed --host "${MARQUEZ_HOST:-localhost}" --port "${MARQUEZ_PORT:-5000}" "${MARQUEZ_CONFIG}" +java -jar marquez-api-*.jar seed --url "${MARQUEZ_URL:-http://localhost:5000}" --metadata metadata.json "${MARQUEZ_CONFIG}" From 7da23daf1705f618d20d95a1b62c10ccca363d10 Mon Sep 17 00:00:00 2001 From: Howard Yoo <32691630+howardyoo@users.noreply.github.com> Date: Tue, 23 Aug 2022 18:17:01 -0500 Subject: [PATCH 05/26] doc:openapi Improve documentation on spec's nodeId (#2084) * This PR is for https://github.com/MarquezProject/marquez/issues/2083 - adds more example and details on what nodeId. - updated description of the nodeId. Signed-off-by: howardyoo * fixed the example part of the nodId Signed-off-by: howardyoo Signed-off-by: howardyoo --- spec/openapi.yml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/spec/openapi.yml b/spec/openapi.yml index 7ccfe989d1..085296c058 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -697,7 +697,9 @@ components: schema: type: string example: dataset:food_delivery:public.delivery_7_days - description: The ID of the node. + description: The ID of the node. A node can either be a dataset node or a job node. The format of + nodeId for dataset is `dataset::` and for job is + `job::`. required: true depth: @@ -1768,8 +1770,10 @@ components: example: food_delivery nodeId: type: string - description: The ID of the dataset or job node. - example: dataset:food_delivery:public.delivery_7_days + description: The ID of the node. A node can either be a dataset node or a job node. The format of + nodeId for dataset is `dataset::` and for job is + `job::`. + example: 'dataset:food_delivery:public.delivery_7_days' SearchResultType: type: enum From e25a6c3afd8bc74381a1c9947d7ff0bb4251c57c Mon Sep 17 00:00:00 2001 From: Marc Robichaud Date: Wed, 24 Aug 2022 14:46:08 -0700 Subject: [PATCH 06/26] Correct enum types in OpenAPI Spec. (#2086) Signed-off-by: Marc Robichaud Signed-off-by: Marc Robichaud --- docs/openapi.html | 1657 +++++++++++++++++++++++---------------------- spec/openapi.yml | 8 +- 2 files changed, 837 insertions(+), 828 deletions(-) diff --git a/docs/openapi.html b/docs/openapi.html index 6958d1dc14..23976c22ea 100644 --- a/docs/openapi.html +++ b/docs/openapi.html @@ -13,21 +13,21 @@ } -

Marquez (0.25.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

-

Namespaces

Create a namespace

Creates a new namespace object. A namespace enables the contextual grouping of related jobs and datasets. Namespaces must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), dashes (-), colons (:), slashes (/), or dots (.). A namespace is case-insensitive with a maximum length of 1024 characters. Note jobs and datasets will be unique within a namespace, but not across namespaces.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
Request Body schema: application/json
ownerName
required
string

The owner of the namespace.

-
description
string

The description of the namespace.

-

Responses

Request samples

Content type
application/json
{
  • "ownerName": "me",
  • "description": "My first namespace!"
}

Response samples

Content type
application/json
{
  • "name": "my-namespace",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "ownerName": "me",
  • "description": "My first namespace!"
}

Retrieve a namespace

Returns a namespace.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-

Responses

Response samples

Content type
application/json
{
  • "name": "my-namespace",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "ownerName": "me",
  • "description": "My first namespace!"
}

List all namespaces

Returns a list of namespaces.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "namespaces": [
    ]
}

Sources

Create a source Deprecated

Creates a new source object. A source is the physical location of a dataset such as a table in PostgreSQL, or topic in Kafka. A source enables the grouping of physical datasets to their physical source.

-
path Parameters
source
required
string <= 1024 characters
Example: my-source

The name of the source.

-
Request Body schema: application/json
type
required
string

The type of the source.

-
connectionUrl
required
string <URL>

The URL to the location of the source.

-
description
string

The description of the source.

-

Responses

Request samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

Response samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "name": "my-source",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

Retrieve a source

Returns a source.

-
path Parameters
source
required
string <= 1024 characters
Example: my-source

The name of the source.

-

Responses

Response samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "name": "my-source",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

List all sources

Returns a list of sources.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "sources": [
    ]
}

Datasets

Create a dataset Deprecated

Creates a new dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-
Request Body schema: application/json
One of
type
required
string
Value: "DB_TABLE"

The type of the dataset.

-
physicalName
required
string

The physical name of the table.

-
sourceName
required
string

The name of the source associated with the table.

-
required
Array of objects[ items ]

The fields of the table.

-
tags
Array of strings

List of tags.

-
description
string

The description of the table.

-
runId
string

The ID associated with the run modifying the table.

-

Responses

Request samples

Content type
application/json
Example
{
  • "type": "DB_TABLE",
  • "physicalName": "public.mytable",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "description": "My first dataset!"
}

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a dataset

Returns a dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a version for a dataset

Returns a version for a dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-
version
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the job or dataset version.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "version": "d224dac0-35d7-4d9b-bbbe-6fff1a8485ad",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "description": "My first dataset!",
  • "createdByRun": {
    }
}

List all versions for a dataset

Returns a list of versions for a dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-

Responses

Response samples

Content type
application/json
{
  • "versions": [
    ]
}

List all datasets

Returns a list of datasets.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "datasets": [
    ],
  • "totalCount": 0
}

Tag a dataset

Tag an existing dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-
tag
required
string
Example: SENSITIVE

The name of the tag.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Tag a field

Tag an existing field of a dataset.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

-
field
required
string
Example: my_field

The name of the field.

-
tag
required
string
Example: SENSITIVE

The name of the tag.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Jobs

Create a job Deprecated

Creates a new job object. All job objects are immutable and are uniquely identified by a generated ID. Marquez will create a version of a job each time the contents of the object is modified. For example, the location of a job may change over time resulting in new versions. The accumulated versions can be listed, used to rerun a specific job version or possibly help debug a failed job run.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-
Request Body schema: application/json
object

The ID of the job.

-
type
required
enum (JobType)
Enum: "BATCH" "STREAM" "SERVICE"

The type of the job.

-
required
Array of objects (DatasetId) unique [ items ]

The set of input datasets.

-
required
Array of objects (DatasetId) unique [ items ]

The set of output datasets.

-
location
string <URL>

The URL of the job source code or artifact.

-
context
object
Deprecated

A key/value pair that must be of type string. A context can be used for getting additional details about the job.

-
description
string

The description of the job.

-
runId
string

An optional run ID used to associate a job version to an existing job run.

-

Responses

Request samples

Content type
application/json
{}

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "latestRun": null,
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a job

Retrieve a job.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "latestRun": null,
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

List all jobs

Returns a list of jobs.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "jobs": [
    ],
  • "totalCount": 0
}

Retrieve a version for a job

Returns a version for a job.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-
version
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the job or dataset version.

-

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "version": "56472c57-a2ef-4218-b7b7-d2af02a343fd",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "facets": { }
}

List all versions for a job

Returns a list of versions for a job.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-

Responses

Response samples

Content type
application/json
{
  • "versions": [
    ]
}

Create a run Deprecated

Creates a new run object for a job.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-
Request Body schema: application/json
id
string <uuid>

An optional user-provided unique ID of the run. A run ID must be an UUID. If an ID for the run is not provided, a random UUID will be generated for the given run.

-
nominalStartTime
string <date-time>

An ISO-8601 timestamp representing the nominal start time of the run.

-
nominalEndTime
string <date-time>

An ISO-8601 timestamp representing the nominal end time of the run.

-
args
object

The arguments of the run.

-

Responses

Request samples

Content type
application/json
{
  • "args": {
    }
}

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

List all runs

Returns a list of runs for a job.

-
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

-
job
required
string <= 1024 characters
Example: my-job

The name of the job.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "runs": [
    ]
}

Retrieve a run

Retrieve a run.

-
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

-

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Start a run Deprecated

Marks the run as RUNNING.

-
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

-
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

-

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Complete a run Deprecated

Marks the run as COMPLETED.

-
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

-
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

-

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

Fail a run Deprecated

Marks the run as FAILED.

-
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

-
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

-

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Abort a run Deprecated

Marks the run as ABORTED.

-
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

-
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

-

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Lineage

Record a single lineage event

Receive, process, and store lineage metadata using the OpenLineage standard.

-
Request Body schema: application/json
any (LineageEvent)

Responses

Request samples

Content type
application/json
{}

Get a lineage graph

query Parameters
nodeId
required
string
Example: nodeId=dataset:food_delivery:public.delivery_7_days

The ID of the node.

-
depth
integer
Default: 20

Depth of lineage graph to create.

-

Responses

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Tags

Create a tag

Creates a new tag object.

-
path Parameters
tag
required
string
Example: SENSITIVE

The name of the tag.

-
Request Body schema: application/json
description
string

The description of the tag.

-

Responses

Request samples

Content type
application/json
{
  • "description": "My first tag!"
}

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

List all tags

Returns a list of tags.

-
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-
offset
integer
Default: 0

The initial position from which to return results

-

Responses

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

Search

Query all datasets and jobs

Returns one or more datasets and jobs of your query.

-
query Parameters
q
required
string
Example: q=my-dataset

Query containing pattern to match; datasets and jobs pattern matching is string based and case-insensitive. Use percent sign (%) to match any string of zero or more characters (my-job%), or an underscore (_) to match a single character (_job_).

-
filter
string
Example: filter=dataset

Filters the results of your query by dataset or job.

-
sort
string
Example: sort=name

Sorts the results of your query by name or updated_at.

-
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

-

Responses

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}
+ " fill="currentColor">

Marquez (0.26.0-SNAPSHOT)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

+

Namespaces

Create a namespace

Creates a new namespace object. A namespace enables the contextual grouping of related jobs and datasets. Namespaces must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), dashes (-), colons (:), slashes (/), or dots (.). A namespace is case-insensitive with a maximum length of 1024 characters. Note jobs and datasets will be unique within a namespace, but not across namespaces.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
Request Body schema: application/json
ownerName
required
string

The owner of the namespace.

+
description
string

The description of the namespace.

+

Responses

Request samples

Content type
application/json
{
  • "ownerName": "me",
  • "description": "My first namespace!"
}

Response samples

Content type
application/json
{
  • "name": "my-namespace",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "ownerName": "me",
  • "description": "My first namespace!"
}

Retrieve a namespace

Returns a namespace.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+

Responses

Response samples

Content type
application/json
{
  • "name": "my-namespace",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "ownerName": "me",
  • "description": "My first namespace!"
}

List all namespaces

Returns a list of namespaces.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "namespaces": [
    ]
}

Sources

Create a source Deprecated

Creates a new source object. A source is the physical location of a dataset such as a table in PostgreSQL, or topic in Kafka. A source enables the grouping of physical datasets to their physical source.

+
path Parameters
source
required
string <= 1024 characters
Example: my-source

The name of the source.

+
Request Body schema: application/json
type
required
string

The type of the source.

+
connectionUrl
required
string <URL>

The URL to the location of the source.

+
description
string

The description of the source.

+

Responses

Request samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

Response samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "name": "my-source",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

Retrieve a source

Returns a source.

+
path Parameters
source
required
string <= 1024 characters
Example: my-source

The name of the source.

+

Responses

Response samples

Content type
application/json
{
  • "type": "POSTGRESQL",
  • "name": "my-source",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "connectionUrl": "jdbc:postgresql://db.example.com/mydb",
  • "description": "My first source!"
}

List all sources

Returns a list of sources.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "sources": [
    ]
}

Datasets

Create a dataset Deprecated

Creates a new dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+
Request Body schema: application/json
One of
type
required
string
Value: "DB_TABLE"

The type of the dataset.

+
physicalName
required
string

The physical name of the table.

+
sourceName
required
string

The name of the source associated with the table.

+
required
Array of objects[ items ]

The fields of the table.

+
tags
Array of strings

List of tags.

+
description
string

The description of the table.

+
runId
string

The ID associated with the run modifying the table.

+

Responses

Request samples

Content type
application/json
Example
{
  • "type": "DB_TABLE",
  • "physicalName": "public.mytable",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "description": "My first dataset!"
}

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a dataset

Returns a dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a version for a dataset

Returns a version for a dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+
version
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the job or dataset version.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "version": "d224dac0-35d7-4d9b-bbbe-6fff1a8485ad",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "description": "My first dataset!",
  • "createdByRun": {
    }
}

List all versions for a dataset

Returns a list of versions for a dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+

Responses

Response samples

Content type
application/json
{
  • "versions": [
    ]
}

List all datasets

Returns a list of datasets.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "datasets": [
    ],
  • "totalCount": 0
}

Tag a dataset

Tag an existing dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+
tag
required
string
Example: SENSITIVE

The name of the tag.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Tag a field

Tag an existing field of a dataset.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
dataset
required
string <= 1024 characters
Example: my-dataset

The name of the dataset.

+
field
required
string
Example: my_field

The name of the field.

+
tag
required
string
Example: SENSITIVE

The name of the tag.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "DB_TABLE",
  • "name": "my-dataset",
  • "physicalName": "public.mytable",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "upodatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "sourceName": "my-source",
  • "fields": [
    ],
  • "tags": [ ],
  • "lastModifiedAt": null,
  • "description": "My first dataset!",
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Jobs

Create a job Deprecated

Creates a new job object. All job objects are immutable and are uniquely identified by a generated ID. Marquez will create a version of a job each time the contents of the object is modified. For example, the location of a job may change over time resulting in new versions. The accumulated versions can be listed, used to rerun a specific job version or possibly help debug a failed job run.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+
Request Body schema: application/json
object

The ID of the job.

+
type
required
string (JobType)
Enum: "BATCH" "STREAM" "SERVICE"

The type of the job.

+
required
Array of objects (DatasetId) unique [ items ]

The set of input datasets.

+
required
Array of objects (DatasetId) unique [ items ]

The set of output datasets.

+
location
string <URL>

The URL of the job source code or artifact.

+
context
object
Deprecated

A key/value pair that must be of type string. A context can be used for getting additional details about the job.

+
description
string

The description of the job.

+
runId
string

An optional run ID used to associate a job version to an existing job run.

+

Responses

Request samples

Content type
application/json
{}

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "latestRun": null,
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

Retrieve a job

Retrieve a job.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "latestRun": null,
  • "facets": { },
  • "currentVersion": "b1d626a2-6d3a-475e-9ecf-943176d4a8c6"
}

List all jobs

Returns a list of jobs.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "jobs": [
    ],
  • "totalCount": 0
}

Retrieve a version for a job

Returns a version for a job.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+
version
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the job or dataset version.

+

Responses

Response samples

Content type
application/json
{
  • "id": {
    },
  • "type": "BATCH",
  • "name": "my-job",
  • "version": "56472c57-a2ef-4218-b7b7-d2af02a343fd",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "namespace": "my-namespace",
  • "inputs": [
    ],
  • "outputs": [ ],
  • "context": {
    },
  • "description": "My first job!",
  • "facets": { }
}

List all versions for a job

Returns a list of versions for a job.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+

Responses

Response samples

Content type
application/json
{
  • "versions": [
    ]
}

Create a run Deprecated

Creates a new run object for a job.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+
Request Body schema: application/json
id
string <uuid>

An optional user-provided unique ID of the run. A run ID must be an UUID. If an ID for the run is not provided, a random UUID will be generated for the given run.

+
nominalStartTime
string <date-time>

An ISO-8601 timestamp representing the nominal start time of the run.

+
nominalEndTime
string <date-time>

An ISO-8601 timestamp representing the nominal end time of the run.

+
args
object

The arguments of the run.

+

Responses

Request samples

Content type
application/json
{
  • "args": {
    }
}

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

List all runs

Returns a list of runs for a job.

+
path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

+
job
required
string <= 1024 characters
Example: my-job

The name of the job.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "runs": [
    ]
}

Retrieve a run

Retrieve a run.

+
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

+

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Start a run Deprecated

Marks the run as RUNNING.

+
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

+
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

+

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Complete a run Deprecated

Marks the run as COMPLETED.

+
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

+
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

+

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

Fail a run Deprecated

Marks the run as FAILED.

+
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

+
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

+

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Abort a run Deprecated

Marks the run as ABORTED.

+
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

+
query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

+

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Lineage

Record a single lineage event

Receive, process, and store lineage metadata using the OpenLineage standard.

+
Request Body schema: application/json
any (LineageEvent)

Responses

Request samples

Content type
application/json
{}

Get a lineage graph

query Parameters
nodeId
required
string
Example: nodeId=dataset:food_delivery:public.delivery_7_days

The ID of the node. A node can either be a dataset node or a job node. The format of nodeId for dataset is dataset:<namespace_of_dataset>:<name_of_the_dataset> and for job is job:<namespace_of_the_job>:<name_of_the_job>.

+
depth
integer
Default: 20

Depth of lineage graph to create.

+

Responses

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Tags

Create a tag

Creates a new tag object.

+
path Parameters
tag
required
string
Example: SENSITIVE

The name of the tag.

+
Request Body schema: application/json
description
string

The description of the tag.

+

Responses

Request samples

Content type
application/json
{
  • "description": "My first tag!"
}

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

List all tags

Returns a list of tags.

+
query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+
offset
integer
Default: 0

The initial position from which to return results

+

Responses

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

Search

Query all datasets and jobs

Returns one or more datasets and jobs of your query.

+
query Parameters
q
required
string
Example: q=my-dataset

Query containing pattern to match; datasets and jobs pattern matching is string based and case-insensitive. Use percent sign (%) to match any string of zero or more characters (my-job%), or an underscore (_) to match a single character (_job_).

+
filter
string
Example: filter=dataset

Filters the results of your query by dataset or job.

+
sort
string
Example: sort=name

Sorts the results of your query by name or updated_at.

+
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

+

Responses

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}