Skip to content

Commit

Permalink
adding quest-test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 16, 2020
1 parent 649f970 commit bdaa835
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
other_gs_configurations=None,
table_name=None,
hive_database="default",
other_hive_configurations=None,
condition=None,
value=None,
filter_column_name=None,
select_column_names=None,
watermark_frequency=None,
sliding_window_value_frequency=None,
window_interval_frequency=None,
other_columns=None):
other_hive_configurations=None):
"""
:param watermark_frequency:
Expand Down Expand Up @@ -739,26 +731,26 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
log.info(property_response)
final_response = property_response
if operators:
for operator, value in operators.items():
if operator is "filter":
for operator in operators:
if operator.item()[0][0] is "filter":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
condition=value["condition"],
value=value["value"],
filter_column_name=value["column_name"])
elif operator is "select":
condition=operator["filter"]["condition"],
value=operator["filter"]["value"],
filter_column_name=operator["filter"]["column_name"])
elif operator.item()[0][0] is "select":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
select_column_names=value["column_names"])
elif operator is "watermark":
select_column_names=operator["select"]["column_names"])
elif operator.item()[0][0] is "watermark":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
watermark_column_name=value["column_names"],
watermark_frequency=value["frequency"])
watermark_column_name=operator["watermark"]["column_names"],
watermark_frequency=operator["watermark"]["frequency"])

elif operator is "windowed_group":
elif operator.item()[0][0] is "windowed_group":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
groupby_column_name=value["column_name"],
sliding_window_value=value["sliding_window_value"],
window_interval_frequency=value["window_interval_frequency"],
other_columns=value["other_columns"])
groupby_column_name=operator["windowed_group"]["column_name"],
sliding_window_value=operator["windowed_group"]["sliding_window_value"],
window_interval_frequency=operator["windowed_group"]["window_interval_frequency"],
other_columns=operator["windowed_group"]["other_columns"])
else:
raise ParseError("Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]")

Expand All @@ -770,7 +762,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
log.info(response)
final_response = response
QuestAssisted.pipeline_code = QuestAssisted.get_code(pipeline_id)
return final_response
return QuestAssisted

@staticmethod
def add_operator(pipeline_id, operator,
Expand Down

0 comments on commit bdaa835

Please sign in to comment.