Skip to content

Commit

Permalink
Move state tranform
Browse files Browse the repository at this point in the history
  • Loading branch information
Octavia Squidington III committed Jun 6, 2023
1 parent 2c1af7f commit 48895e4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
12 changes: 0 additions & 12 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ class ContextState(Enum):
SUCCESSFUL = {"github_state": "success", "description": "All Pipelines ran successfully."}
FAILURE = {"github_state": "failure", "description": "Pipeline failed."}

def to_step_status(self) -> StepStatus:
if self == ContextState.SUCCESS:
return StepStatus.SUCCESS

if self == ContextState.FAILURE:
return StepStatus.FAILURE

if self == ContextState.ERROR:
return StepStatus.FAILURE

raise ValueError(f"Could not convert context state: {self} to step status")


class PipelineContext:
"""The pipeline context is used to store configuration for a specific pipeline run."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@
import dagger
from dagger import Config

from ci_connector_ops.pipelines.bases import Report, StepResult
from ci_connector_ops.pipelines.contexts import ConnectorContext
from ci_connector_ops.pipelines.bases import Report, StepResult, StepStatus
from ci_connector_ops.pipelines.contexts import ConnectorContext, ContextState

GITHUB_GLOBAL_CONTEXT = "[POC please ignore] Connectors CI"
GITHUB_GLOBAL_DESCRIPTION = "Running connectors tests"

def context_state_to_step_result(state: ContextState) -> StepResult:
if state == ContextState.SUCCESSFUL:
return StepResult(step=None, status=StepStatus.SUCCESS)

if state == ContextState.FAILURE:
return StepResult(step=None, status=StepStatus.FAILURE)

if state == ContextState.ERROR:
return StepResult(step=None, status=StepStatus.FAILURE)


raise ValueError(f"Could not convert context state: {state} to step status")


# HACK: This is to avoid wrapping the whole pipeline in a dagger pipeline to avoid instability just prior to launch
# TODO (ben): Refactor run_connectors_pipelines to wrap the whole pipeline in a dagger pipeline
async def run_report_upload_pipeline(dagger_client: dagger.Client, contexts: List[ConnectorContext]) -> List[ConnectorContext]:
# TODO (ben): Refactor run_connectors_pipelines to wrap the whole pipeline in a dagger pipeline once Steps are refactored
async def run_report_complete_pipeline(dagger_client: dagger.Client, contexts: List[ConnectorContext]) -> List[ConnectorContext]:
"""Create and Save a report representing the run of the encompassing pipeline.
This is to denote when the pipeline is complete, useful for long running pipelines like nightlies.
"""

# Repurpose the first context to be the pipeline upload context to preserve timestamps
first_connector_context = contexts[0]
pipeline_name = f"Report upload {first_connector_context.report_output_prefix}"
first_connector_context.pipeline_name = pipeline_name

# Transform contexts into a list of steps
steps_results = [StepResult(step=None, status=context.state.to_step_status) for context in contexts]
steps_results = [context_state_to_step_result(context.state) for context in contexts]

report = Report(
name=pipeline_name,
Expand All @@ -52,4 +71,7 @@ async def run_connectors_pipelines(
for context in contexts:
context.dagger_client = dagger_client.pipeline(f"{pipeline_name} - {context.connector.technical_name}")
tg.start_soon(connector_pipeline, context, semaphore, *args)

await run_report_complete_pipeline(dagger_client, contexts)

return contexts

0 comments on commit 48895e4

Please sign in to comment.