Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
of limiting concurrency, Glue needs 5-10 seconds to clean up resources.
Thus if status is returned immediately it might end up in case of more than 1 concurrent run.
It is recommended to set this parameter to 10 when you are using concurrency=1.
For more information see: https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA

For more information see:
https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA
:param waiter_delay: Time in seconds to wait between status checks. (default: 60)
:param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 20)
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
Expand Down Expand Up @@ -135,6 +137,8 @@ def __init__(
stop_job_run_on_kill: bool = False,
sleep_before_return: int = 0,
job_poll_interval: int | float = 6,
waiter_delay: int = 60,
waiter_max_attempts: int = 75,
**kwargs,
):
super().__init__(**kwargs)
Expand Down Expand Up @@ -162,6 +166,8 @@ def __init__(
self._job_run_id: str | None = None
self.sleep_before_return: int = sleep_before_return
self.s3_script_location: str | None = None
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts

@property
def _hook_parameters(self):
Expand Down Expand Up @@ -241,8 +247,8 @@ def execute(self, context: Context):
run_id=self._job_run_id,
verbose=self.verbose,
aws_conn_id=self.aws_conn_id,
waiter_delay=int(self.job_poll_interval),
waiter_max_attempts=self.retry_limit,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
)
Expand Down Expand Up @@ -293,7 +299,6 @@ class GlueDataQualityOperator(AwsBaseOperator[GlueDataQualityHook]):
:param description: A description of the data quality ruleset.
:param update_rule_set: To update existing ruleset, Set this flag to True. (default: False)
:param data_quality_ruleset_kwargs: Extra arguments for RuleSet.

:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
Expand Down Expand Up @@ -389,7 +394,6 @@ class GlueDataQualityRuleSetEvaluationRunOperator(AwsBaseOperator[GlueDataQualit
:param deferrable: If True, the operator will wait asynchronously for the job to stop.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)

:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
Expand Down Expand Up @@ -554,7 +558,6 @@ class GlueDataQualityRuleRecommendationRunOperator(AwsBaseOperator[GlueDataQuali
:param deferrable: If True, the operator will wait asynchronously for the job to stop.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)

:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def __init__(
super().__init__()
# parameters that should be hardcoded in the child's implem
self.serialized_fields = serialized_fields

self.waiter_name = waiter_name
self.waiter_args = waiter_args
self.failure_message = failure_message
Expand Down