diff --git a/.github/workflows/connector_nightly_builds_dagger.yml b/.github/workflows/connector_nightly_builds_dagger.yml index bf8af4ed8fd9..db8e9c55dd25 100644 --- a/.github/workflows/connector_nightly_builds_dagger.yml +++ b/.github/workflows/connector_nightly_builds_dagger.yml @@ -5,12 +5,22 @@ on: # 11AM UTC is 12AM CET, 4AM PST. - cron: "0 11 * * *" workflow_dispatch: + inputs: + runs-on: + type: string + default: large-runner + required: true + test-connectors-options: + default: --concurrency=5 --release-stage=generally_available --release-stage=beta + required: true + +run-name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }} jobs: - connectors_ci: - name: Connectors CI - timeout-minutes: 240 # 4 hours - runs-on: large-runner + test_connectors: + name: Test connectors ${{ inputs.test-connectors-options || 'Nightly builds GA and Beta connectors' }} + timeout-minutes: 360 # 6 hours + runs-on: ${{ inputs.runs-on || 'large-runner' }} steps: - name: Get start timestamp id: get-start-timestamp @@ -31,7 +41,7 @@ jobs: python-version: "3.10" - name: Install ci-connector-ops package run: pip install ./tools/ci_connector_ops\[pipelines]\ - - name: Run nightly builds + - name: Test connectors run: | export _EXPERIMENTAL_DAGGER_RUNNER_HOST="unix:///var/run/buildkit/buildkitd.sock" DAGGER_CLI_COMMIT="67c7e7635cf4ea0e446e2fed522a3e314c960f6a" @@ -41,7 +51,7 @@ jobs: mkdir -p "$DAGGER_TMP_BINDIR" curl "https://dl.dagger.io/dagger/main/${DAGGER_CLI_COMMIT}/dagger_${DAGGER_CLI_COMMIT}_$(uname -s | tr A-Z a-z)_$(uname -m | sed s/x86_64/amd64/).tar.gz" | tar xvz -C "$DAGGER_TMP_BINDIR" fi - connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors --release-stage=beta --release-stage=generally_available + connectors-ci --is-ci --gha-workflow-run-id=${{ github.run_id }} test-connectors ${{ inputs.test-connectors-options || '--concurrency=5 --release-stage=generally_available --release-stage=beta' }} env: GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} AWS_ACCESS_KEY_ID: ${{ secrets.STATUS_API_AWS_ACCESS_KEY_ID }} diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py index eac3f373ceb7..353f3ab8e777 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py @@ -50,7 +50,7 @@ def with_python_base(context: ConnectorTestContext, python_image_name: str = "py return ( context.dagger_client.container() .from_(python_image_name) - .with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.LOCKED) + .with_mounted_cache("/root/.cache/pip", pip_cache, sharing=CacheSharingMode.SHARED) .with_mounted_directory("/tools", context.get_repo_dir("tools", include=["ci_credentials", "ci_common_utils"], exclude=[".venv"])) .with_exec(["pip", "install", "--upgrade", "pip"]) ) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py index eb6231c6e497..7c6efc059f4d 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/connectors_ci.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) -async def run(context: ConnectorTestContext) -> ConnectorTestReport: +async def run(context: ConnectorTestContext, semaphore: anyio.Semaphore) -> ConnectorTestReport: """Runs a CI pipeline for a single connector. A visual DAG can be found on the README.md file of the pipelines modules. @@ -46,31 +46,34 @@ async def run(context: ConnectorTestContext) -> ConnectorTestReport: Returns: ConnectorTestReport: The test reports holding tests results. """ - async with context: - async with asyncer.create_task_group() as task_group: - tasks = [ - task_group.soonify(checks.QaChecks(context).run)(), - task_group.soonify(checks.CodeFormatChecks(context).run)(), - task_group.soonify(tests.run_all_tests)(context), - ] - results = list(itertools.chain(*(task.value for task in tasks))) + async with semaphore: + async with context: + async with asyncer.create_task_group() as task_group: + tasks = [ + task_group.soonify(checks.QaChecks(context).run)(), + task_group.soonify(checks.CodeFormatChecks(context).run)(), + task_group.soonify(tests.run_all_tests)(context), + ] + results = list(itertools.chain(*(task.value for task in tasks))) - context.test_report = ConnectorTestReport(context, steps_results=results) + context.test_report = ConnectorTestReport(context, steps_results=results) - return context.test_report + return context.test_report -async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext]): +async def run_connectors_test_pipelines(contexts: List[ConnectorTestContext], concurrency: int = 5): """Runs a CI pipeline for all the connectors passed. Args: contexts (List[ConnectorTestContext]): List of connector test contexts for which a CI pipeline needs to be run. + concurrency (int): Number of test pipeline that can run in parallel. Defaults to 5 """ + semaphore = anyio.Semaphore(concurrency) async with dagger.Connection(DAGGER_CONFIG) as dagger_client: async with anyio.create_task_group() as tg: for context in contexts: context.dagger_client = dagger_client.pipeline(f"{context.connector.technical_name} - Test Pipeline") - tg.start_soon(run, context) + tg.start_soon(run, context, semaphore) @click.group() @@ -120,7 +123,7 @@ def connectors_ci( ctx.obj["gha_workflow_run_url"], GITHUB_GLOBAL_DESCRIPTION, GITHUB_GLOBAL_CONTEXT, - should_send=not ctx.obj["is_local"], + should_send=ctx.obj["ci_context"] == "pull_request", logger=logger, ) @@ -140,8 +143,11 @@ def connectors_ci( type=click.Choice(["alpha", "beta", "generally_available"]), ) @click.option("--modified/--not-modified", help="Only test modified connectors in the current branch.", default=False, type=bool) +@click.option("--concurrency", help="Number of connector tests pipeline to run in parallel.", default=5, type=int) @click.pass_context -def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool): +def test_connectors( + ctx: click.Context, names: Tuple[str], languages: Tuple[ConnectorLanguage], release_stages: Tuple[str], modified: bool, concurrency: int +): """Runs a CI pipeline the connector passed as CLI argument. Args: @@ -186,14 +192,14 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE] # TODO: remove this once we implement pipelines for Java connector ] try: - anyio.run(run_connectors_test_pipelines, connectors_tests_contexts) + anyio.run(run_connectors_test_pipelines, connectors_tests_contexts, concurrency) update_commit_status_check( ctx.obj["git_revision"], "success", ctx.obj["gha_workflow_run_url"], GITHUB_GLOBAL_DESCRIPTION, GITHUB_GLOBAL_CONTEXT, - should_send=not ctx.obj["is_local"], + should_send=ctx.obj.get("ci_context") == "pull_request", logger=logger, ) except dagger.DaggerError as e: @@ -204,7 +210,7 @@ def test_connectors(ctx: click.Context, names: Tuple[str], languages: Tuple[Conn ctx.obj["gha_workflow_run_url"], GITHUB_GLOBAL_DESCRIPTION, GITHUB_GLOBAL_CONTEXT, - should_send=not ctx.obj["is_local"], + should_send=ctx.obj.get("ci_context") == "pull_request", logger=logger, ) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py index 7c3e8182da7b..b7aec1416795 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py @@ -89,6 +89,10 @@ def dagger_client(self, dagger_client: Client): def is_ci(self): return self.is_local is False + @property + def is_pr(self): + return self.ci_context == "pull_request" + @property def repo(self): return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True) @@ -122,7 +126,7 @@ def github_commit_status(self) -> dict: "target_url": self.gha_workflow_run_url, "description": self.state.value["description"], "context": f"[POC please ignore] Connector tests: {self.connector.technical_name}", - "should_send": self.is_ci, + "should_send": self.is_pr, "logger": self.logger, } @@ -173,6 +177,5 @@ async def __aexit__(self, exception_type, exception_value, traceback) -> bool: ) if report_upload_exit_code != 0: self.logger.error("Uploading the report to S3 failed.") - await asyncify(update_commit_status_check)(**self.github_commit_status) return True diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py index 3161996c861f..73b268787280 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/tests.py @@ -85,7 +85,7 @@ async def run(self) -> StepResult: StepResult: Failure or success of the acceptances tests with stdout and stdout. """ if not self.context.connector.acceptance_test_config: - return StepResult(Step.ACCEPTANCE_TESTS, StepStatus.SKIPPED), None + return StepResult(self, StepStatus.SKIPPED), None dagger_client = self.get_dagger_pipeline(self.context.dagger_client)