Skip to content

Commit

Permalink
Add toggle to schedule analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 11, 2024
1 parent 5639987 commit fcc8b19
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
12 changes: 8 additions & 4 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ async def main():

run_async(main())


@cli.command()
@probe_cc_option
@test_name_option
def schedule(
probe_cc: List[str],
test_name: List[str],
):
@click.option(
"--analysis/--no-analysis",
default=True,
help="should we drop tables before creating them",
)
def schedule(probe_cc: List[str], test_name: List[str], analysis: bool):
"""
Create schedules for the specified parameters
"""
Expand All @@ -203,6 +206,7 @@ async def main():
test_name=test_name,
clickhouse_url=config.clickhouse_url,
data_dir=config.data_dir,
schedule_analysis=analysis,
)

run_async(main())
Expand Down
76 changes: 39 additions & 37 deletions oonipipeline/src/oonipipeline/temporal/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async def schedule_all(
test_name: List[str],
clickhouse_url: str,
data_dir: str,
schedule_analysis: bool = True,
) -> ScheduleIdMap:
schedule_id_map = ScheduleIdMap()
filter_id = gen_schedule_filter_id(probe_cc, test_name)
Expand Down Expand Up @@ -136,44 +137,45 @@ async def schedule_all(
)
schedule_id_map.observations = sched_handle.id

analysis_params = AnalysisWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
AnalysisWorkflow.run,
analysis_params,
id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
if schedule_analysis == True:
analysis_params = AnalysisWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
AnalysisWorkflow.run,
analysis_params,
id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
# We offset the Analysis workflow by 4 hours assuming
# that the observation generation will take less than 4
# hours to complete.
# TODO(art): it's probably better to refactor this into some
# kind of DAG
every=timedelta(days=1),
offset=timedelta(hours=6),
)
],
),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed"
),
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
# We offset the Analysis workflow by 4 hours assuming
# that the observation generation will take less than 4
# hours to complete.
# TODO(art): it's probably better to refactor this into some
# kind of DAG
every=timedelta(days=1),
offset=timedelta(hours=6),
)
],
),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed"
),
),
)
)
schedule_id_map.analysis = sched_handle.id

return schedule_id_map
Expand Down

0 comments on commit fcc8b19

Please sign in to comment.