Skip to content

Commit

Permalink
Connector CI: Upload complete.json file on pipeline complete (#27051)
Browse files Browse the repository at this point in the history
* Write complete.json file at end of run

* Move state tranform

* Move file_path_key logic into report

* Use property decorator

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@sers.noreply.github.com>
  • Loading branch information
bnchrch and Octavia Squidington III authored Jun 6, 2023
1 parent ee1be35 commit 8bc8414
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,3 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
"""The actions package is made to declare reusable pipeline components."""

from __future__ import annotations

from typing import TYPE_CHECKING, List, Tuple, Union

import asyncer
from ci_connector_ops.pipelines.bases import Step, StepStatus

if TYPE_CHECKING:
from ci_connector_ops.pipelines.bases import StepResult


async def run_steps(
steps_and_run_args: List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]], results: List[StepResult] = []
) -> List[StepResult]:
"""Run multiple steps sequentially, or in parallel if steps are wrapped into a sublist.
Args:
steps_and_run_args (List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]]): List of steps to run, if steps are wrapped in a sublist they will be executed in parallel. run function arguments can be passed as a tuple along the Step instance.
results (List[StepResult], optional): List of step results, used for recursion.
Returns:
List[StepResult]: List of step results.
"""
# If there are no steps to run, return the results
if not steps_and_run_args:
return results

# If any of the previous steps failed, skip the remaining steps
if any(result.status is StepStatus.FAILURE for result in results):
skipped_results = []
for step_and_run_args in steps_and_run_args:
if isinstance(step_and_run_args, Tuple):
skipped_results.append(step_and_run_args[0].skip())
else:
skipped_results.append(step_and_run_args.skip())
return results + skipped_results

# Pop the next step to run
steps_to_run, remaining_steps = steps_and_run_args[0], steps_and_run_args[1:]

# wrap the step in a list if it is not already (allows for parallel steps)
if not isinstance(steps_to_run, list):
steps_to_run = [steps_to_run]

async with asyncer.create_task_group() as task_group:
tasks = []
for step in steps_to_run:
if isinstance(step, Step):
tasks.append(task_group.soonify(step.run)())
elif isinstance(step, Tuple) and isinstance(step[0], Step) and isinstance(step[1], Tuple):
step, run_args = step
tasks.append(task_group.soonify(step.run)(*run_args))

new_results = [task.value for task in tasks]

return await run_steps(remaining_steps, results + new_results)
46 changes: 43 additions & 3 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import anyio
import asyncer
from anyio import Path
from ci_connector_ops.pipelines.consts import PYPROJECT_TOML_FILE_PATH
from ci_connector_ops.pipelines.consts import PYPROJECT_TOML_FILE_PATH, LOCAL_REPORTS_PATH_ROOT
from ci_connector_ops.pipelines.actions import remote_storage
from ci_connector_ops.pipelines.utils import check_path_in_workdir, slugify, with_exit_code, with_stderr, with_stdout

from ci_connector_ops.utils import console
from dagger import Container, QueryError
from rich.console import Group
Expand Down Expand Up @@ -257,6 +259,15 @@ class Report:
steps_results: List[StepResult]
created_at: datetime = field(default_factory=datetime.utcnow)
name: str = "REPORT"
_file_path_key: str = "report.json"

@property
def file_path_key(self) -> str:
return self._file_path_key

@file_path_key.setter
def file_path_key(self, v: str) -> None:
self._file_path_key = v

@property
def failed_steps(self) -> List[StepResult]: # noqa D102
Expand All @@ -278,6 +289,30 @@ def success(self) -> bool: # noqa D102
def run_duration(self) -> int: # noqa D102
return (self.created_at - self.pipeline_context.created_at).total_seconds()

@property
def remote_storage_enabled(self) -> bool: # noqa D102
return self.pipeline_context.is_ci

async def save(self) -> None:
"""Save the report as a JSON file."""
local_report_path = anyio.Path(LOCAL_REPORTS_PATH_ROOT + self.file_path_key)
await local_report_path.parents[0].mkdir(parents=True, exist_ok=True)
await local_report_path.write_text(self.to_json())

if self.remote_storage_enabled:
local_report_dagger_file = (
self.pipeline_context.dagger_client.host().directory(".", include=[str(local_report_path)]).file(str(local_report_path))
)
report_upload_exit_code, _stdout, _stderr = await remote_storage.upload_to_gcs(
dagger_client=self.pipeline_context.dagger_client,
file_to_upload=local_report_dagger_file,
key=self.file_path_key,
bucket=self.pipeline_context.ci_report_bucket,
gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret,
)
if report_upload_exit_code != 0:
self.pipeline_context.logger.error(f"Uploading the report to GCS Bucket: {self.pipeline_context.ci_report_bucket} failed.")

def to_json(self) -> str:
"""Create a JSON representation of the report.
Expand Down Expand Up @@ -348,8 +383,13 @@ class ConnectorReport(Report):
"""A dataclass to build connector test reports to share pipelines executions results with the user."""

@property
def should_be_saved(self) -> bool: # noqa D102
return self.pipeline_context.is_ci
def file_path_key(self) -> str: # noqa D102
connector_name = self.pipeline_context.connector.technical_name
connector_version = self.pipeline_context.connector.version

suffix = f"{connector_name}/{connector_version}/output.json"
file_path_key = f"{self.pipeline_context.report_output_prefix}/{suffix}"
return file_path_key

@property
def should_be_commented_on_pr(self) -> bool: # noqa D102
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test(
git_branch=ctx.obj["git_branch"],
git_revision=ctx.obj["git_revision"],
modified_files=modified_files,
test_report_bucket=ctx.obj["ci_report_bucket_name"],
ci_report_bucket=ctx.obj["ci_report_bucket_name"],
report_output_prefix=ctx.obj["report_output_prefix"],
use_remote_secrets=ctx.obj["use_remote_secrets"],
gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"),
Expand Down Expand Up @@ -251,7 +251,7 @@ def build(ctx: click.Context) -> bool:
git_branch=ctx.obj["git_branch"],
git_revision=ctx.obj["git_revision"],
modified_files=modified_files,
test_report_bucket=ctx.obj["ci_report_bucket_name"],
ci_report_bucket=ctx.obj["ci_report_bucket_name"],
report_output_prefix=ctx.obj["report_output_prefix"],
use_remote_secrets=ctx.obj["use_remote_secrets"],
gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"),
Expand Down Expand Up @@ -377,7 +377,7 @@ def publish(
docker_hub_password=docker_hub_password,
slack_webhook=slack_webhook,
reporting_slack_channel=slack_channel,
test_report_bucket=ctx.obj["ci_report_bucket_name"],
ci_report_bucket=ctx.obj["ci_report_bucket_name"],
report_output_prefix=ctx.obj["report_output_prefix"],
is_local=ctx.obj["is_local"],
git_branch=ctx.obj["git_branch"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@
GRADLE_CACHE_PATH = "/root/.gradle/caches"
GRADLE_BUILD_CACHE_PATH = f"{GRADLE_CACHE_PATH}/build-cache-1"
GRADLE_READ_ONLY_DEPENDENCY_CACHE_PATH = "/root/gradle_dependency_cache"
LOCAL_REPORTS_PATH_ROOT = "tools/ci_connector_ops/pipeline_reports/"
52 changes: 16 additions & 36 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from anyio import Path
from asyncer import asyncify
from ci_connector_ops.pipelines.actions import remote_storage, secrets
from ci_connector_ops.pipelines.bases import CIContext, ConnectorReport, Report
from ci_connector_ops.pipelines.bases import CIContext, ConnectorReport, Report, StepStatus
from ci_connector_ops.pipelines.github import update_commit_status_check
from ci_connector_ops.pipelines.slack import send_message_to_webhook
from ci_connector_ops.pipelines.utils import AIRBYTE_REPO_URL, METADATA_FILE_NAME, sanitize_gcs_credentials
Expand Down Expand Up @@ -68,6 +68,8 @@ def __init__(
slack_webhook: Optional[str] = None,
reporting_slack_channel: Optional[str] = None,
pull_request: PullRequest = None,
ci_report_bucket: Optional[str] = None,
ci_gcs_credentials: Optional[str] = None,
):
"""Initialize a pipeline context.
Expand Down Expand Up @@ -101,6 +103,9 @@ def __init__(
self.dagger_client = None
self._report = None
self.dockerd_service = None
self.ci_gcs_credentials = sanitize_gcs_credentials(ci_gcs_credentials) if ci_gcs_credentials else None
self.ci_report_bucket = ci_report_bucket

update_commit_status_check(**self.github_commit_status)

@property
Expand Down Expand Up @@ -131,6 +136,10 @@ def report(self) -> Report: # noqa D102
def report(self, report: Report): # noqa D102
self._report = report

@property
def ci_gcs_credentials_secret(self) -> Secret:
return self.dagger_client.set_secret("ci_gcs_credentials", self.ci_gcs_credentials)

@property
def github_commit_status(self) -> dict:
"""Build a dictionary used as kwargs to the update_commit_status_check function."""
Expand Down Expand Up @@ -268,9 +277,9 @@ def __init__(
git_branch: bool,
git_revision: bool,
modified_files: List[str],
test_report_bucket: str,
report_output_prefix: str,
use_remote_secrets: bool = True,
ci_report_bucket: Optional[str] = None,
ci_gcs_credentials: Optional[str] = None,
connector_acceptance_test_image: Optional[str] = DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE,
gha_workflow_run_url: Optional[str] = None,
Expand Down Expand Up @@ -304,12 +313,10 @@ def __init__(
self.use_remote_secrets = use_remote_secrets
self.connector_acceptance_test_image = connector_acceptance_test_image
self.modified_files = modified_files
self.test_report_bucket = test_report_bucket
self.report_output_prefix = report_output_prefix
self._secrets_dir = None
self._updated_secrets_dir = None
self.cdk_version = None
self.ci_gcs_credentials = sanitize_gcs_credentials(ci_gcs_credentials) if ci_gcs_credentials else None

super().__init__(
pipeline_name=pipeline_name,
Expand All @@ -322,6 +329,8 @@ def __init__(
slack_webhook=slack_webhook,
reporting_slack_channel=reporting_slack_channel,
pull_request=pull_request,
ci_report_bucket=ci_report_bucket,
ci_gcs_credentials=ci_gcs_credentials,
)

@property
Expand Down Expand Up @@ -364,11 +373,6 @@ def metadata(self) -> dict:
def docker_image_from_metadata(self) -> str:
return f"{self.metadata['dockerRepository']}:{self.metadata['dockerImageTag']}"

@property
def ci_gcs_credentials_secret(self) -> Secret:
# TODO (ben): Update this to be in use ANYWHERE we use a service account.
return self.dagger_client.set_secret("ci_gcs_credentials", self.ci_gcs_credentials)

def get_connector_dir(self, exclude=None, include=None) -> Directory:
"""Get the connector under test source code directory.
Expand Down Expand Up @@ -411,31 +415,7 @@ async def __aexit__(
self.report.print()
self.logger.info(self.report.to_json())

local_reports_path_root = "tools/ci_connector_ops/pipeline_reports/"
connector_name = self.report.pipeline_context.connector.technical_name
connector_version = self.report.pipeline_context.connector.version

suffix = f"{connector_name}/{connector_version}/output.json"
file_path_key = f"{self.report_output_prefix}/{suffix}"

local_report_path = Path(local_reports_path_root + file_path_key)

await local_report_path.parents[0].mkdir(parents=True, exist_ok=True)
await local_report_path.write_text(self.report.to_json())

if self.report.should_be_saved:
local_report_dagger_file = (
self.dagger_client.host().directory(".", include=[str(local_report_path)]).file(str(local_report_path))
)
report_upload_exit_code, _stdout, _stderr = await remote_storage.upload_to_gcs(
dagger_client=self.dagger_client,
file_to_upload=local_report_dagger_file,
key=file_path_key,
bucket=self.test_report_bucket,
gcs_credentials=self.ci_gcs_credentials_secret,
)
if report_upload_exit_code != 0:
self.logger.error(f"Uploading the report to GCS Bucket: {self.test_report_bucket} failed.")
await self.report.save()

if self.report.should_be_commented_on_pr:
self.report.post_comment_on_pr()
Expand Down Expand Up @@ -466,7 +446,7 @@ def __init__(
docker_hub_password: str,
slack_webhook: str,
reporting_slack_channel: str,
test_report_bucket: str,
ci_report_bucket: str,
report_output_prefix: str,
is_local: bool,
git_branch: bool,
Expand All @@ -493,7 +473,7 @@ def __init__(
connector=connector,
modified_files=modified_files,
report_output_prefix=report_output_prefix,
test_report_bucket=test_report_bucket,
ci_report_bucket=ci_report_bucket,
is_local=is_local,
git_branch=git_branch,
git_revision=git_revision,
Expand Down
61 changes: 61 additions & 0 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/helpers/steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
"""The actions package is made to declare reusable pipeline components."""

from __future__ import annotations

from typing import TYPE_CHECKING, List, Tuple, Union

import asyncer
from ci_connector_ops.pipelines.bases import Step, StepStatus

if TYPE_CHECKING:
from ci_connector_ops.pipelines.bases import StepResult


async def run_steps(
steps_and_run_args: List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]], results: List[StepResult] = []
) -> List[StepResult]:
"""Run multiple steps sequentially, or in parallel if steps are wrapped into a sublist.
Args:
steps_and_run_args (List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]]): List of steps to run, if steps are wrapped in a sublist they will be executed in parallel. run function arguments can be passed as a tuple along the Step instance.
results (List[StepResult], optional): List of step results, used for recursion.
Returns:
List[StepResult]: List of step results.
"""
# If there are no steps to run, return the results
if not steps_and_run_args:
return results

# If any of the previous steps failed, skip the remaining steps
if any(result.status is StepStatus.FAILURE for result in results):
skipped_results = []
for step_and_run_args in steps_and_run_args:
if isinstance(step_and_run_args, Tuple):
skipped_results.append(step_and_run_args[0].skip())
else:
skipped_results.append(step_and_run_args.skip())
return results + skipped_results

# Pop the next step to run
steps_to_run, remaining_steps = steps_and_run_args[0], steps_and_run_args[1:]

# wrap the step in a list if it is not already (allows for parallel steps)
if not isinstance(steps_to_run, list):
steps_to_run = [steps_to_run]

async with asyncer.create_task_group() as task_group:
tasks = []
for step in steps_to_run:
if isinstance(step, Step):
tasks.append(task_group.soonify(step.run)())
elif isinstance(step, Tuple) and isinstance(step[0], Step) and isinstance(step[1], Tuple):
step, run_args = step
tasks.append(task_group.soonify(step.run)(*run_args))

new_results = [task.value for task in tasks]

return await run_steps(remaining_steps, results + new_results)
Loading

0 comments on commit 8bc8414

Please sign in to comment.