Skip to content

Commit

Permalink
airbyte-ci: upload test artifacts along with reports
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored and stephane-airbyte committed Feb 22, 2024
1 parent 81d706f commit 823b5c4
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
<Property name="container-log-pattern">%d{yyyy-MM-dd'T'HH:mm:ss,SSS}{GMT+0}`%replace{%X{log_source}}{^ -}{} > %replace{%m}{$${env:LOG_SCRUB_PATTERN:-\*\*\*\*\*}}{*****}%n</Property>
<!-- Always log INFO by default. -->
<Property name="log-level">${sys:LOG_LEVEL:-${env:LOG_LEVEL:-INFO}}</Property>
<Property name="logSubDir">${env:AIRBYTE_LOG_SUBDIR:-${date:yyyy-MM-dd'T'HH:mm:ss}}</Property>
<Property name="logDir">build/test-logs/${logSubDir}</Property>
<Property name="logDir">build/test-logs/${date:yyyy-MM-dd'T'HH:mm:ss}</Property>
</Properties>

<Appenders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
from __future__ import annotations

import json
import os
import webbrowser
from dataclasses import dataclass
from pathlib import Path
from types import MappingProxyType
from typing import TYPE_CHECKING, Optional
from zipfile import ZIP_DEFLATED, ZipFile
from typing import TYPE_CHECKING, Dict

from connector_ops.utils import console # type: ignore
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines.consts import GCS_PUBLIC_DOMAIN
from pipelines.helpers.utils import format_duration
from pipelines.models.artifacts import Artifact
from pipelines.models.reports import Report
from pipelines.models.steps import StepStatus
from rich.console import Group
Expand Down Expand Up @@ -90,7 +89,7 @@ def to_json(self) -> str:
}
)

async def to_html(self) -> str:
def to_html(self) -> str:
env = Environment(
loader=PackageLoader("pipelines.airbyte_ci.connectors.test.steps"),
autoescape=select_autoescape(),
Expand All @@ -101,10 +100,17 @@ async def to_html(self) -> str:
template.globals["StepStatus"] = StepStatus
template.globals["format_duration"] = format_duration
local_icon_path = Path(f"{self.pipeline_context.connector.code_directory}/icon.svg").resolve()
step_result_to_artifact_link = {}
step_result_to_artifact_links: Dict[str, List[Dict]] = {}
for step_result in self.steps_results:
if test_artifacts_link := await self.upload_path(step_result.test_artifacts_path):
step_result_to_artifact_link[step_result.step.title] = test_artifacts_link
for artifact in step_result.artifacts:
if artifact.gcs_url:
url = artifact.gcs_url
elif artifact.local_path:
url = artifact.local_path.resolve().as_uri()
else:
continue
step_result_to_artifact_links.setdefault(step_result.step.title, []).append({"name": artifact.name, "url": url})

template_context = {
"connector_name": self.pipeline_context.connector.technical_name,
"step_results": self.steps_results,
Expand All @@ -118,7 +124,7 @@ async def to_html(self) -> str:
"commit_url": None,
"icon_url": local_icon_path.as_uri(),
"report": self,
"step_result_to_artifact_link": MappingProxyType(step_result_to_artifact_link),
"step_result_to_artifact_links": MappingProxyType(step_result_to_artifact_links),
}

if self.pipeline_context.is_ci:
Expand All @@ -131,17 +137,32 @@ async def to_html(self) -> str:
] = f"https://raw.githubusercontent.com/airbytehq/airbyte/{self.pipeline_context.git_revision}/{self.pipeline_context.connector.code_directory}/icon.svg"
return template.render(template_context)

async def save_html_report(self) -> None:
"""Save the report as HTML, upload it to GCS if the pipeline is running in CI"""

html_report_path = self.report_dir_path / self.html_report_file_name
report_dir = self.pipeline_context.dagger_client.host().directory(str(self.report_dir_path))
local_html_report_file = report_dir.with_new_file(self.html_report_file_name, self.to_html()).file(self.html_report_file_name)
html_report_artifact = Artifact(name="HTML Report", content_type="text/html", content=local_html_report_file)
await html_report_artifact.save_to_local_path(html_report_path)
absolute_path = html_report_path.absolute()
self.pipeline_context.logger.info(f"Report saved locally at {absolute_path}")
if self.remote_storage_enabled and self.pipeline_context.ci_gcs_credentials_secret and self.pipeline_context.ci_report_bucket:
gcs_url = await html_report_artifact.upload_to_gcs(
dagger_client=self.pipeline_context.dagger_client,
bucket=self.pipeline_context.ci_report_bucket,
key=self.html_report_remote_storage_key,
gcs_credentials=self.pipeline_context.ci_gcs_credentials_secret,
)
self.pipeline_context.logger.info(f"HTML report uploaded to {gcs_url}")

elif self.pipeline_context.enable_report_auto_open:
self.pipeline_context.logger.info("Opening HTML report in browser.")
webbrowser.open(absolute_path.as_uri())

async def save(self) -> None:
local_html_path = await self.save_local(self.html_report_file_name, await self.to_html())
absolute_path = local_html_path.resolve()
if self.pipeline_context.enable_report_auto_open:
self.pipeline_context.logger.info(f"HTML report saved locally: {absolute_path}")
if self.pipeline_context.enable_report_auto_open:
self.pipeline_context.logger.info("Opening HTML report in browser.")
webbrowser.open(absolute_path.as_uri())
if self.remote_storage_enabled:
await self.save_remote(local_html_path, self.html_report_remote_storage_key, "text/html")
await super().save()
await self.save_html_report()

def print(self) -> None:
"""Print the test report to the console in a nice way."""
Expand Down Expand Up @@ -169,31 +190,3 @@ def print(self) -> None:

main_panel = Panel(Group(*to_render), title=main_panel_title, subtitle=duration_subtitle)
console.print(main_panel)

async def upload_path(self, path: Optional[Path]) -> Optional[str]:
if not path or not path.exists():
return None
if self.pipeline_context.is_local:
return str(path.resolve())

zip_file_path = Path(str(path) + ".zip")
with ZipFile(zip_file_path, mode="w") as zip_file:
# lifted from https://github.com/python/cpython/blob/3.12/Lib/zipfile/__init__.py#L2277C9-L2295C44
def add_to_zip(zf: ZipFile, path_to_zip: str, zippath: str) -> None:
if os.path.isfile(path_to_zip):
zf.write(path_to_zip, zippath, ZIP_DEFLATED)
elif os.path.isdir(path_to_zip):
if zippath:
zf.write(path_to_zip, zippath)
for nm in sorted(os.listdir(path_to_zip)):
add_to_zip(zf, os.path.join(path_to_zip, nm), os.path.join(zippath, nm))

add_to_zip(zip_file, str(path), "")

if not self.remote_storage_enabled:
self.pipeline_context.logger.info(f"remote storage is disable. zip file is at {zip_file_path.resolve()}")
return str(zip_file_path.resolve())
else:
await self.save_remote(zip_file_path, self.file_remote_storage_key(zip_file_path.name), "application/zip")
self.pipeline_context.logger.info(f"zip file uploaded to {self.file_url(str(zip_file_path))}")
return self.file_url(zip_file_path.name)
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,13 @@ function copyToClipBoard(htmlElement) {
<label for="{{ step_result.step.title }}" class="lbl-toggle">{{ step_result.step.title }} | {{ format_duration(step_result.step.run_duration) }}</label>
{% endif %}
<div class="collapsible-content">
{% if step_result_to_artifact_link[step_result.step.title] %}
<div><a href="{{ step_result_to_artifact_link[step_result.step.title] }}">Test Artifacts</a></div>
{% if step_result_to_artifact_links[step_result.step.title] %}
<h3>Artifacts</h3>
<ul>
{% for artifact in step_result_to_artifact_links[step_result.step.title] %}
<li><a href="{{ artifact.url }}">{{ artifact.name }}</a></li>
{% endfor %}
</ul>
{% endif %}
<div class="content-inner">
{% if step_result.stdout %}
Expand Down
101 changes: 55 additions & 46 deletions airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from abc import ABC
from datetime import datetime
from pathlib import Path
from typing import Any, ClassVar, List, Optional, Tuple, cast
from typing import Any, ClassVar, List, Optional, Tuple

import pipelines.dagger.actions.system.docker
from dagger import CacheSharingMode, CacheVolume, Container, QueryError
from dagger import CacheSharingMode, CacheVolume, Container, ExecError
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.consts import AMAZONCORRETTO_IMAGE
from pipelines.dagger.actions import secrets
from pipelines.hacks import never_fail_exec
from pipelines.helpers.utils import sh_dash_c
from pipelines.helpers.utils import dagger_directory_as_zip_file, sh_dash_c
from pipelines.models.artifacts import Artifact
from pipelines.models.steps import Step, StepResult


Expand All @@ -38,15 +37,6 @@ class GradleTask(Step, ABC):
with_test_artifacts: ClassVar[bool] = False
accept_extra_params = True

@property
def airbyte_logs_subdir(self) -> str:
return datetime.fromtimestamp(cast(float, self.context.pipeline_start_timestamp)).isoformat() + "-" + self.gradle_task_name

@property
def test_artifacts_path(self) -> Path:
test_artifacts_path = f"{self.context.connector.code_directory}/build/test-artifacts/{self.airbyte_logs_subdir}"
return Path(test_artifacts_path)

@property
def gradle_task_options(self) -> Tuple[str, ...]:
return self.STATIC_GRADLE_OPTIONS + (f"-Ds3BuildCachePrefix={self.context.connector.technical_name}",)
Expand Down Expand Up @@ -111,8 +101,6 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult:
.with_env_variable("GRADLE_HOME", self.GRADLE_HOME_PATH)
# Same for GRADLE_USER_HOME.
.with_env_variable("GRADLE_USER_HOME", self.GRADLE_HOME_PATH)
# Set the AIRBYTE_LOG_SUBDIR for log4j
.with_env_variable("AIRBYTE_LOG_SUBDIR", self.airbyte_logs_subdir)
# Install a bunch of packages as early as possible.
.with_exec(
sh_dash_c(
Expand Down Expand Up @@ -203,11 +191,18 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult:
connector_gradle_task = f":airbyte-integrations:connectors:{self.context.connector.technical_name}:{self.gradle_task_name}"
gradle_command = self._get_gradle_command(connector_gradle_task, task_options=self.params_as_cli_options)
gradle_container = gradle_container.with_(never_fail_exec([gradle_command]))
await self._collect_logs(gradle_container)
await self._collect_test_report(gradle_container)
return await self.get_step_result(gradle_container)

async def get_step_result(self, container: Container) -> StepResult:
# Collect the test artifacts, if applicable.
artifacts = []
if self.with_test_artifacts:
if test_logs := await self._collect_test_logs(gradle_container):
artifacts.append(test_logs)
if test_results := await self._collect_test_results(gradle_container):
artifacts.append(test_results)

return await self.get_step_result(gradle_container, artifacts)

async def get_step_result(self, container: Container, output_artifacts: List[Artifact]) -> StepResult:
step_result = await super().get_step_result(container)
# Decorate with test report, if applicable.
return StepResult(
Expand All @@ -216,48 +211,62 @@ async def get_step_result(self, container: Container) -> StepResult:
stdout=step_result.stdout,
stderr=step_result.stderr,
output_artifact=step_result.output_artifact,
test_artifacts_path=self.test_artifacts_path,
artifacts=output_artifacts,
)

async def _collect_logs(self, gradle_container: Container) -> None:
async def _collect_test_logs(self, gradle_container: Container) -> Optional[Artifact]:
"""
Exports the java docs from the container into the host filesystem.
The docs in the container are expected to be in build/test-logs, and will end up test-artifact directory by default
One can change the destination directory by setting the test_artifacts_path
One can change the destination directory by setting the output_artifacts
"""
if not self.with_test_artifacts:
test_logs_dir_name = "test-logs"
if test_logs_dir_name not in await gradle_container.directory(f"{self.context.connector.code_directory}/build").entries():
self.context.logger.warn(f"No {test_logs_dir_name} found directory in the build folder")
return None
logs_dir_path = f"{self.context.connector.code_directory}/build/test-logs/{self.airbyte_logs_subdir}"
try:
container_logs_dir = await gradle_container.directory(logs_dir_path)
# the gradle task didn't create any logs.
if not container_logs_dir:
return None

self.test_artifacts_path.mkdir(parents=True, exist_ok=True)
if not await container_logs_dir.export(str(self.test_artifacts_path)):
self.context.logger.error("Error when trying to export log files from container")
except QueryError as e:
zip_file = await (
dagger_directory_as_zip_file(
self.dagger_client,
await gradle_container.directory(f"{self.context.connector.code_directory}/build/{test_logs_dir_name}"),
test_logs_dir_name,
)
)
return Artifact(
name="test-logs.zip",
content=zip_file,
content_type="application/zip",
to_upload=True,
)
except ExecError as e:
self.context.logger.error(str(e))
self.context.logger.warn(f"Failed to retrieve junit test results from {logs_dir_path} gradle container.")
return None
return None

async def _collect_test_report(self, gradle_container: Container) -> None:
async def _collect_test_results(self, gradle_container: Container) -> Optional[Artifact]:
"""
Exports the junit test reports from the container into the host filesystem.
The docs in the container are expected to be in build/test-results, and will end up test-artifact directory by default
Only the XML files generated by junit are downloaded into the host filesystem
One can change the destination directory by setting the test_artifacts_path
One can change the destination directory by setting the output_artifacts
"""
if not self.with_test_artifacts:
test_results_dir_name = "test-results"
if test_results_dir_name not in await gradle_container.directory(f"{self.context.connector.code_directory}/build").entries():
self.context.logger.warn(f"No {test_results_dir_name} found directory in the build folder")
return None

junit_xml_path = f"{self.context.connector.code_directory}/build/test-results/{self.gradle_task_name}"
try:
junit_xml_dir = await gradle_container.directory(junit_xml_path)
for file_name in await junit_xml_dir.entries():
if file_name.endswith(".xml"):
await junit_xml_dir.file(file_name).export(str(self.test_artifacts_path), allow_parent_dir_path=True)
except QueryError as e:
zip_file = await (
dagger_directory_as_zip_file(
self.dagger_client,
await gradle_container.directory(f"{self.context.connector.code_directory}/build/{test_results_dir_name}"),
test_results_dir_name,
)
)
return Artifact(
name="test-results.zip",
content=zip_file,
content_type="application/zip",
to_upload=True,
)
except ExecError as e:
self.context.logger.error(str(e))
return None
23 changes: 22 additions & 1 deletion airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import anyio
import asyncclick as click
import asyncer
from dagger import Client, Config, Container, ExecError, File, ImageLayerCompression, Platform, Secret
from dagger import Client, Config, Container, Directory, ExecError, File, ImageLayerCompression, Platform, Secret
from more_itertools import chunked

if TYPE_CHECKING:
Expand Down Expand Up @@ -353,3 +353,24 @@ def java_log_scrub_pattern(secrets_to_mask: List[str]) -> str:
":": "&#58;",
},
)


def dagger_directory_as_zip_file(dagger_client: Client, directory: Directory, directory_name: str) -> File:
"""Compress a directory and return a File object representing the zip file.
Args:
dagger_client (Client): The dagger client.
directory (Path): The directory to compress.
directory_name (str): The name of the directory.
Returns:
File: The File object representing the zip file.
"""
return (
dagger_client.container()
.from_("alpine:3.19.1")
.with_exec(sh_dash_c(["apk update", "apk add zip"]))
.with_mounted_directory(f"/{directory_name}", directory)
.with_exec(["zip", "-r", "/zipped.zip", f"/{directory_name}"])
.file("/zipped.zip")
)
45 changes: 45 additions & 0 deletions airbyte-ci/connectors/pipelines/pipelines/models/artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import dagger
from pipelines.consts import GCS_PUBLIC_DOMAIN
from pipelines.dagger.actions import remote_storage


@dataclass(kw_only=True)
class Artifact:
"""A dataclass to represent an artifact produced by a pipeline execution."""

name: str
content_type: str
content: dagger.File
to_upload: bool = True
local_path: Optional[Path] = None
gcs_url: Optional[str] = None

async def save_to_local_path(self, path: Path) -> Path:
exported = await self.content.export(str(path))
if exported:
self.local_path = path
return path
else:
raise Exception(f"Failed to save artifact {self.name} to local path {path}")

async def upload_to_gcs(self, dagger_client: dagger.Client, bucket: str, key: str, gcs_credentials: dagger.Secret) -> str:
gcs_cp_flags = None if self.content_type is None else [f"--content-type={self.content_type}"]

report_upload_exit_code, _, _ = await remote_storage.upload_to_gcs(
dagger_client=dagger_client,
file_to_upload=self.content,
key=key,
bucket=bucket,
gcs_credentials=gcs_credentials,
flags=gcs_cp_flags,
)
if report_upload_exit_code != 0:
raise Exception(f"Failed to upload artifact {self.name} to GCS. Exit code: {report_upload_exit_code}.")
self.gcs_url = f"{GCS_PUBLIC_DOMAIN}/{bucket}/{key}"
return f"{GCS_PUBLIC_DOMAIN}/{bucket}/{key}"
Loading

0 comments on commit 823b5c4

Please sign in to comment.