diff --git a/task/bq2bq/executor/bumblebee/transformation.py b/task/bq2bq/executor/bumblebee/transformation.py index 7e062d8..ffb0696 100644 --- a/task/bq2bq/executor/bumblebee/transformation.py +++ b/task/bq2bq/executor/bumblebee/transformation.py @@ -27,6 +27,7 @@ def __init__(self, dend: datetime, execution_time: datetime, dry_run: bool): + self.spillover_query = spillover_query self.bigquery_service = bigquery_service self.task_config = task_config self.sql_query = sql_query @@ -78,15 +79,26 @@ def transform(self): elif bq_destination_table.partitioning_type == "DAY": partition_strategy = timedelta(days=1) - # queries where source data/partition directly map with destination partitions - transformation = MultiPartitionTransformation(self.bigquery_service, - self.task_config, - self.sql_query, - self.dstart, self.dend, - self.dry_run, - localised_execution_time, - partition_strategy, - self.task_config.concurrency) + if self.spillover_query: + transformation = LegacySpilloverTransformation(self.bigquery_service, + self.task_config, + self.sql_query, + self.spillover_query, + self.dstart, + self.dend, + self.dry_run, + localised_execution_time, + partition_strategy) + else: + # queries where source data/partition directly map with destination partitions + transformation = MultiPartitionTransformation(self.bigquery_service, + self.task_config, + self.sql_query, + self.dstart, self.dend, + self.dry_run, + localised_execution_time, + partition_strategy, + self.task_config.concurrency) else: raise Exception("unable to generate a transformation for request, unsupported partition strategy") transformation.transform() @@ -376,7 +388,11 @@ def transform(self): # break query file task_queries = self.task_query.split(OPTIMUS_QUERY_BREAK_MARKER) if len(task_queries) < len(datetime_list): - raise Exception("query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER, len(task_queries), len(datetime_list), self.task_query)) + raise Exception( + "query needs to be broken using {}, {} query found, needed {}\n{}".format(OPTIMUS_QUERY_BREAK_MARKER, + len(task_queries), + len(datetime_list), + self.task_query)) tasks = [] query_index = 0 @@ -413,28 +429,33 @@ def __init__(self, sql_query: str, spillover_query: str, start_time: datetime, + end_time: datetime, dry_run: bool, - execution_time: datetime): + execution_time: datetime, + partition_delta: timedelta): self.bigquery_service = bigquery_service self.task_config = task_config self.sql_query = sql_query self.spillover_query = spillover_query self.dry_run = dry_run self.start_time = start_time + self.end_time = end_time self.execution_time = execution_time + self.partition_delta = partition_delta self.concurrency = self.task_config.concurrency def transform(self): datetime_list = [] - default_datetime = [self.start_time] - datetime_list.extend(default_datetime) + # default_datetime = [self.start_time] + # datetime_list.extend(default_datetime) if self.task_config.use_spillover: spillover = SpilloverDatetimes(self.bigquery_service, self.spillover_query, self.task_config, self.start_time, + self.end_time, self.dry_run, self.execution_time) spillover_datetimes = spillover.collect_datetimes() @@ -442,16 +463,25 @@ def transform(self): datetime_list = distinct_list(datetime_list) + execute_for = self.start_time + + # tables are partitioned for day + # iterate from start to end for each partition + while execute_for < self.end_time: + execute_for += self.partition_delta + tasks = [] for partition_time in datetime_list: logger.info("create transformation for partition: {}".format(partition_time)) loader = PartitionLoader(self.bigquery_service, self.task_config.destination_table, self.task_config.load_method, partition_time) + task_window = WindowFactory.create_window_with_time(partition_time, partition_time + self.partition_delta) + task = PartitionTransformation(self.task_config, loader, self.sql_query, - self.window, + task_window, self.dry_run, self.execution_time) tasks.append(task) diff --git a/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/properties.cfg b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/properties.cfg new file mode 100644 index 0000000..d2848cb --- /dev/null +++ b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/properties.cfg @@ -0,0 +1,13 @@ +[DESTINATION] +PROJECT="g-project" +DATASET="playground" +TABLE="test_booking_count" + +[TRANSFORMATION] +TASK_WINDOW="DAILY" +TIMEZONE="Asia/Jakarta" +USE_SPILLOVER="TRUE" +CONCURRENCY=5 + +[LOAD] +LOAD_METHOD="REPLACE" \ No newline at end of file diff --git a/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/query.sql b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/query.sql new file mode 100644 index 0000000..de163c5 --- /dev/null +++ b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/query.sql @@ -0,0 +1,6 @@ +select + TIMESTAMP('__dstart__') as dstart, + TIMESTAMP('__dend__') as dend, + "beerus" as hakai, + "naruto" as rasengan, + CAST("__execution_time__" AS TIMESTAMP) as `load_timestamp` diff --git a/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/spillover_date.sql b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/spillover_date.sql new file mode 100644 index 0000000..3e8d49e --- /dev/null +++ b/task/bq2bq/executor/samples/tasks/replace_by_spillover_date_query/spillover_date.sql @@ -0,0 +1,8 @@ +SELECT + date +FROM + `g-project.playground.calendar_date` +-- this query will generate 14 calendar dates which we will replace 14 partitions using the main query +WHERE date >= DATE_SUB('__dstart__', INTERVAL 14 DAY) + AND date < '__dend__' +ORDER BY date \ No newline at end of file