Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connectors-ci: setup nightly builds #24335

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions .github/workflows/connector_nightly_builds_dagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ on:
# 11AM UTC is 12AM CET, 4AM PST.
- cron: "0 11 * * *"
workflow_dispatch:
inputs:
runs-on:
type: string
default: large-runner
required: true
test-connectors-options:
default: --concurrency=5 --release-stage=generally_available --release-stage=beta
required: true

run-name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }}

jobs:
connectors_ci:
name: Connectors CI
timeout-minutes: 240 # 4 hours
runs-on: large-runner
test_connectors:
name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }}
timeout-minutes: 360 # 6 hours
runs-on: ${{ inputs.runs-on || 'large-runner' }}
steps:
- name: Get start timestamp
id: get-start-timestamp
Expand All @@ -31,7 +41,7 @@ jobs:
python-version: "3.10"
- name: Install ci-connector-ops package
run: pip install ./tools/ci_connector_ops\[pipelines]\
- name: Run nightly builds
- name: Test connectors
run: |
export _EXPERIMENTAL_DAGGER_RUNNER_HOST="unix:///var/run/buildkit/buildkitd.sock"
DAGGER_CLI_COMMIT="67c7e7635cf4ea0e446e2fed522a3e314c960f6a"
Expand All @@ -41,7 +51,7 @@ jobs:
mkdir -p "$DAGGER_TMP_BINDIR"
curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR"
fi
connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors --release-stage=beta --release-stage=generally_available
connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors ${{ inputs.test-connectors-options || '--concurrency=5 --release-stage=generally_available --release-stage=beta' }}
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def with_python_base(context: ConnectorTestContext, python_image_name: str = "py
return (
context.dagger_client.container()
.from_(python_image_name)
.with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.LOCKED)
.with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.SHARED)
.with_mounted_directory("/tools", context.get_repo_dir("tools", include=["ci_credentials", "ci_common_utils"], exclude=[".venv"]))
.with_exec(["pip", "install", "--upgrade", "pip"])
)
Expand Down
42 changes: 24 additions & 18 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
logger = logging.getLogger(__name__)


async def run(context: ConnectorTestContext) -> ConnectorTestReport:
async def run(context: ConnectorTestContext, semaphore: anyio.Semaphore) -> ConnectorTestReport:
"""Runs a CI pipeline for a single connector.
A visual DAG can be found on the README.md file of the pipelines modules.

Expand All @@ -46,31 +46,34 @@ async def run(context: ConnectorTestContext) -> ConnectorTestReport:
Returns:
ConnectorTestReport: The test reports holding tests results.
"""
async with context:
async with asyncer.create_task_group() as task_group:
tasks = [
task_group.soonify(checks.QaChecks(context).run)(),
task_group.soonify(checks.CodeFormatChecks(context).run)(),
task_group.soonify(tests.run_all_tests)(context),
]
results = list(itertools.chain(*(task.value for task in tasks)))
async with semaphore:
async with context:
async with asyncer.create_task_group() as task_group:
tasks = [
task_group.soonify(checks.QaChecks(context).run)(),
task_group.soonify(checks.CodeFormatChecks(context).run)(),
task_group.soonify(tests.run_all_tests)(context),
]
results = list(itertools.chain(*(task.value for task in tasks)))

context.test_report = ConnectorTestReport(context, steps_results=results)
context.test_report = ConnectorTestReport(context, steps_results=results)

return context.test_report
return context.test_report


async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext]):
async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext], concurrency: int = 5):
"""Runs a CI pipeline for all the connectors passed.

Args:
contexts (List[ConnectorTestContext]): List of connector test contexts for which a CI pipeline needs to be run.
concurrency (int): Number of test pipeline that can run in parallel. Defaults to 5
"""
semaphore = anyio.Semaphore(concurrency)
async with dagger.Connection(DAGGER_CONFIG) as dagger_client:
async with anyio.create_task_group() as tg:
for context in contexts:
context.dagger_client = dagger_client.pipeline(f"{context.connector.technical_name} - Test Pipeline")
tg.start_soon(run, context)
tg.start_soon(run, context, semaphore)


@click.group()
Expand Down Expand Up @@ -120,7 +123,7 @@ def connectors_ci(
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj["ci_context"] == "pull_request",
logger=logger,
)

Expand All @@ -140,8 +143,11 @@ def connectors_ci(
type=click.Choice(["alpha", "beta", "generally_available"]),
)
@click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool)
@click.option("--concurrency", help="Number of connector tests pipeline to run in parallel.", default=5, type=int)
@click.pass_context
def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool):
def test_connectors(
ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool, concurrency: int
):
"""Runs a CI pipeline the connector passed as CLI argument.

Args:
Expand Down Expand Up @@ -186,14 +192,14 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn
in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE] # TODO: remove this once we implement pipelines for Java connector
]
try:
anyio.run(run_connectors_test_pipelines, connectors_tests_contexts)
anyio.run(run_connectors_test_pipelines, connectors_tests_contexts, concurrency)
update_commit_status_check(
ctx.obj["git_revision"],
"success",
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj.get("ci_context") == "pull_request",
logger=logger,
)
except dagger.DaggerError as e:
Expand All @@ -204,7 +210,7 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn
ctx.obj["gha_workflow_run_url"],
GITHUB_GLOBAL_DESCRIPTION,
GITHUB_GLOBAL_CONTEXT,
should_send=not ctx.obj["is_local"],
should_send=ctx.obj.get("ci_context") == "pull_request",
logger=logger,
)

Expand Down
7 changes: 5 additions & 2 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def dagger_client(self, dagger_client: Client):
def is_ci(self):
return self.is_local is False

@property
def is_pr(self):
return self.ci_context == "pull_request"

@property
def repo(self):
return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True)
Expand Down Expand Up @@ -122,7 +126,7 @@ def github_commit_status(self) -> dict:
"target_url": self.gha_workflow_run_url,
"description": self.state.value["description"],
"context": f"[POC please ignore] Connector tests: {self.connector.technical_name}",
"should_send": self.is_ci,
"should_send": self.is_pr,
"logger": self.logger,
}

Expand Down Expand Up @@ -173,6 +177,5 @@ async def __aexit__(self, exception_type, exception_value, traceback) -> bool:
)
if report_upload_exit_code != 0:
self.logger.error("Uploading the report to S3 failed.")

await asyncify(update_commit_status_check)(**self.github_commit_status)
return True
2 changes: 1 addition & 1 deletion tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def run(self) -> StepResult:
StepResult: Failure or success of the acceptances tests with stdout and stdout.
"""
if not self.context.connector.acceptance_test_config:
return StepResult(Step.ACCEPTANCE_TESTS, StepStatus.SKIPPED), None
return StepResult(self, StepStatus.SKIPPED), None

dagger_client = self.get_dagger_pipeline(self.context.dagger_client)

Expand Down