Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat warehouse schedules as in the opscenter-configured timezone. #397

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions app/crud/test_wh_task_scheduling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import datetime
import pytest
from pytz import timezone
from .wh_sched import WarehouseSchedules, update_task_state, task_offsets
from .test_fixtures import MockSession

_pacific = timezone("America/Los_Angeles")
_eastern = timezone("America/New_York")
_london = timezone("Europe/London")
_kolkata = timezone("Asia/Kolkata")


def _make_schedule(
name: str,
Expand All @@ -28,7 +35,7 @@ def _make_schedule(


def test_no_schedules_disables_task(session: MockSession):
assert not update_task_state(session, [])
assert not update_task_state(session, [], tz=_pacific)
assert len(session._sql) == 1
for offset in task_offsets:
assert (
Expand All @@ -44,7 +51,7 @@ def test_disabled_schedules_disables_task(session: MockSession):
),
]

assert not update_task_state(session, schedules)
assert not update_task_state(session, schedules, tz=_pacific)
assert len(session._sql) == 1
for offset in task_offsets:
assert (
Expand All @@ -60,7 +67,7 @@ def test_weekday_schedules(session: MockSession):
_make_schedule("COMPUTE_WH", datetime.time(17, 30), datetime.time(23, 59)),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -112,7 +119,7 @@ def test_weekdays_and_weekends_schedule(session: MockSession):
),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -161,7 +168,7 @@ def test_multiple_warehouses(session: MockSession):
_make_schedule("BATCH_WH", datetime.time(12, 45), datetime.time(23, 59)),
]

assert update_task_state(session, schedules) is True
assert update_task_state(session, schedules, tz=_pacific) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

Expand Down Expand Up @@ -195,3 +202,39 @@ def test_multiple_warehouses(session: MockSession):
f"alter task if exists tasks.warehouse_scheduling_45 set schedule = 'using cron {expected_cron_45}';"
in script
)


@pytest.mark.parametrize("tz", [_pacific, _eastern, _london, _kolkata])
def test_timezones(session: MockSession, tz: timezone):
schedules = [
_make_schedule("COMPUTE_WH", datetime.time(0, 0), datetime.time(9, 0)),
_make_schedule("COMPUTE_WH", datetime.time(9, 0), datetime.time(17, 30)),
_make_schedule("COMPUTE_WH", datetime.time(17, 30), datetime.time(23, 59)),
]

assert update_task_state(session, schedules, tz=tz) is True
assert len(session._sql) == 1
script = session._sql[0].lower()

# _0 and _30 should be resumed
assert "alter task if exists tasks.warehouse_scheduling_0 suspend;" in script

expected_cron_0 = f"0 0,9 * * 1-5 {tz.zone.lower()}"
assert (
f"alter task if exists tasks.warehouse_scheduling_0 set schedule = 'using cron {expected_cron_0}';"
in script
)
assert "alter task if exists tasks.warehouse_scheduling_0 resume;" in script

assert "alter task if exists tasks.warehouse_scheduling_30 suspend;" in script
assert "alter task if exists tasks.warehouse_scheduling_30 resume;" in script

# _15 and _45 should be suspended
assert "alter task if exists tasks.warehouse_scheduling_15 suspend;" in script
assert "alter task if exists tasks.warehouse_scheduling_45 suspend;" in script

expected_cron_30 = f"30 17 * * 1-5 {tz.zone.lower()}"
assert (
f"alter task if exists tasks.warehouse_scheduling_30 set schedule = 'using cron {expected_cron_30}';"
in script
)
37 changes: 30 additions & 7 deletions app/crud/wh_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import datetime
from .base import BaseOpsCenterModel
from pydantic import validator, root_validator, Field
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
from snowflake.snowpark.functions import col, max as sp_max
from snowflake.snowpark import Row, Session
import pandas as pd
Expand Down Expand Up @@ -319,21 +321,39 @@ def regenerate_alter_statements(session: Session, schedules: List[WarehouseSched
WarehouseAlterStatements.batch_write(session, alter_stmts, overwrite=True)


def update_task_state(session: Session, schedules: List[WarehouseSchedules]) -> bool:
def get_schedule_timezone(session: Session) -> timezone:
"""
Fetch the 'default_timezone' from the internal.config table. If there is no timezone set or the
timezone which is set fails to parse, this function will return the timezone for 'America/Los_Angeles'.
"""
str_tz = session.call("get_config", "default_timezone") or "America/Los_Angeles"
try:
return timezone(str_tz)
except UnknownTimeZoneError:
return timezone("America/Los_Angeles")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't know the account timezone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, here we can probably figured it out from Snowflake. I hadn't thought about checking the snowflake default.



def update_task_state(
session: Session, schedules: List[WarehouseSchedules], tz=None
) -> bool:
# Make sure we have at least one enabled schedule.
enabled_schedules = [sch for sch in schedules if sch.enabled]
if len(enabled_schedules) == 0:
disable_all_tasks(session)
return False

# Indirection for unit tests. Caller is not expected to provide a timezone.
if not tz:
tz = get_schedule_timezone(session)

# Build the cron list for the enabled schedules
alter_statements = []
for offset in task_offsets:
# For each "offset", generate multiple statements
# 1. ALTER TASK ... SUSPEND
# 2. ALTER TASK ... SET SCHEDULE = 'USING CRON ...'
# 3. ALTER TASK ... RESUME
alter_statements.extend(_make_alter_task_statements(schedules, offset))
alter_statements.extend(_make_alter_task_statements(schedules, offset, tz))

# Collect the statements together
alter_body = "\n".join(alter_statements)
Expand All @@ -349,13 +369,15 @@ def update_task_state(session: Session, schedules: List[WarehouseSchedules]) ->


def _make_alter_task_statements(
schedules: List[WarehouseSchedules], offset: int
schedules: List[WarehouseSchedules],
offset: int,
tz: timezone,
) -> List[str]:
"""
Generates a list of ALTER TASK statements given the list of WarehouseSchedules and the task offset (the quarterly
minute offset from the hour).
"""
cron_schedule, should_run = _make_cron_schedule(schedules, offset)
cron_schedule, should_run = _make_cron_schedule(schedules, offset, tz)
if should_run:
return [
f"alter task if exists {task_name}_{offset} suspend;",
Expand All @@ -367,7 +389,9 @@ def _make_alter_task_statements(


def _make_cron_schedule(
schedules: List[WarehouseSchedules], offset: int
schedules: List[WarehouseSchedules],
offset: int,
tz: timezone,
) -> (str, bool):
"""
Takes a list of schedules and returns the cron schedule string which cover all schedule boundaries.
Expand Down Expand Up @@ -398,8 +422,7 @@ def _make_cron_schedule(
# Execute only weekends
days_of_week = "0,6"

# TODO use the configured timezone
return f"{offset} {cron_hours} * * {days_of_week} America/Los_Angeles", True
return f"{offset} {cron_hours} * * {days_of_week} {tz.zone}", True


def get_last_run(session: Session) -> datetime.datetime:
Expand Down
9 changes: 8 additions & 1 deletion app/ui/pages/06_WarehouseSchedule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import streamlit as st
import sthelp
import warehouse_schedule
from modules import add_custom_modules


sthelp.chrome("Warehouse Schedule")

# Load custom OpsCenter python modules
if not add_custom_modules():
st.warning("Unable to load OpsCenter modules.")

import warehouse_schedule # noqa E402

warehouse_schedule.display()
30 changes: 26 additions & 4 deletions bootstrap/011_warehouse_schedules.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,36 @@ CREATE OR REPLACE PROCEDURE INTERNAL.UPDATE_WAREHOUSE_SCHEDULES(last_run timesta
DECLARE
task_outcome variant default (select object_construct());
BEGIN
-- Get the configured timezone or default to 'America/Los_Angeles'
let tz text;
call internal.get_config('default_timezone') into :tz;
if (tz is null) then
tz := 'America/Los_Angeles';
end if;
task_outcome := (select object_insert(:task_outcome, 'opscenter timezone', :tz));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we have two lines here. as in they are basically the same info right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, one is recording what the configured timezone for OpsCenter is and the other is recording what the default timezone is for the user's Snowflake account. I thought knowing this might be helpful in debugging issues in the future.

task_outcome := (select object_insert(:task_outcome, 'account timezone', internal.get_current_timezone()));

-- The task calls this procedure with NULL and lets the procedure figure out the details.
-- The ability to specify timestamps is only to enable testing.
if (this_run is NULL) then
this_run := (select current_timestamp());
this_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, current_timestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the behaviour of this when we pass in an LTZ timezone instead of an NTZ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 3-arg CONVERT_TIMEZONE function, I don't believe there is any difference if you pass an LTZ or an NTZ (because the timestamp is assumed to be in the timezone specified by the first argument).

I think changing this procedure to accept an NTZ instead of an LTZ is the way we solve the goofiness of the timestamps shifting when given to a procedure. I had been messing around with this over the weekend which works as I expect:

create or replace procedure internal.test_tz(now timestamp_ntz)
returns table(tz_ts TIMESTAMP_NTZ, ts timestamp_ltz, tz_ts2 TIMESTAMP_NTZ, ts2 TIMESTAMP_NTZ)
language sql
as
begin
    let res resultset := (
        select convert_timezone(internal.get_Current_timezone(), 'America/New_York', current_timestamp()),
                current_timestamp(),
                convert_timezone(internal.get_current_timezone(), 'America/New_York', :now),
                :now);
    return table(res);
end;

else
this_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, :this_run));
end if;
task_outcome := (select object_insert(:task_outcome, 'this_run', :this_run));

if (last_run is NULL) then
last_run := (select run from internal.task_warehouse_schedule order by run desc limit 1);
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, run) from internal.task_warehouse_schedule order by run desc limit 1);
-- If we don't have any rows in internal.task_warehouse_schedule, rewind far enough that we will just pick
-- the current WH schedule and not think it has already been run.
if (last_run is NULL) then
last_run := (select timestampadd('days', -1, current_timestamp));
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, timestampadd('days', -1, current_timestamp)));
end if;
else
last_run := (select CONVERT_TIMEZONE(internal.get_current_timezone(), :tz, :last_run));
end if;

-- TODO handle looking back over a weekend boundary (from python)
-- TODO handle the timestamp from config (from python)
-- TODO the WEEK_START session parameter can alter what DAYOFWEEK returns.
let is_weekday boolean := (select DAYOFWEEK(:this_run) not in (0, 6));

Expand Down Expand Up @@ -94,3 +107,12 @@ EXCEPTION
INSERT INTO internal.task_warehouse_schedule SELECT :this_run, FALSE, :task_outcome;
RAISE;
END;

-- owners rights procedures can't get the timezone from the session parameters.
CREATE OR REPLACE FUNCTION INTERNAL.GET_CURRENT_TIMEZONE()
RETURNS STRING
LANGUAGE JAVASCRIPT
AS
$$
return Intl.DateTimeFormat().resolvedOptions().timeZone;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns a timezone identifier like "America/New_York" (or similar).

$$;
15 changes: 15 additions & 0 deletions test/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from typing import List, Dict


def generate_unique_name(prefix, timestamp_string) -> str:
Expand Down Expand Up @@ -51,3 +52,17 @@ def delete_list_of_probes(conn, sql):
for name in cur.execute(sql).fetchall():
delete_probe_statement = f"call ADMIN.DELETE_PROBE('{name[0]}');"
assert run_proc(conn, delete_probe_statement) is None


def fetch_all_warehouse_schedules(conn) -> List[Dict]:
with conn() as cnx, cnx.cursor() as cur:
return cur.execute(
"select * from internal.wh_schedules order by name, weekday, start_at"
).fetchall()


def reset_timezone(conn):
with conn.cursor() as cur:
_ = cur.execute(
"call internal.set_config('default_timezone', 'America/Los_Angeles')"
).fetchone()
17 changes: 16 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import datetime
import pytest
from contextlib import contextmanager
from common_utils import delete_list_of_labels, delete_list_of_probes
from common_utils import (
delete_list_of_labels,
delete_list_of_probes,
fetch_all_warehouse_schedules,
reset_timezone,
)

sys.path.append("../deploy")
import helpers # noqa E402
Expand Down Expand Up @@ -68,3 +73,13 @@ def timestamp_string(conn):

# call a function that deletes all the labels that were created in the session
delete_list_of_probes(conn, sql)

scheds = fetch_all_warehouse_schedules(conn)
scheds.insert(0, "Warehouse Schedules:")
print("\n".join(scheds))


@pytest.fixture(autouse=True)
def reset_timezone_before_test(conn):
with conn() as cnx:
reset_timezone(cnx)
Loading