Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,20 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
Extends airflow FileTaskHandler and uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, **kwargs):
def __init__(
self,
base_log_folder: str,
oss_log_folder: str,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
) -> None:
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder)
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.log_relative_path = ""
self._hook = None
self.closed = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,19 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):

trigger_should_wrap = True

def __init__(self, base_log_folder: str, log_group_arn: str, **kwargs):
super().__init__(base_log_folder)
def __init__(
self,
base_log_folder: str,
log_group_arn: str,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
) -> None:
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
split_arn = log_group_arn.split(":")

self.handler = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,19 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
It extends airflow FileTaskHandler and uploads to and reads from S3 remote storage.
"""

def __init__(self, base_log_folder: str, s3_log_folder: str, **kwargs):
super().__init__(base_log_folder)
def __init__(
self,
base_log_folder: str,
s3_log_folder: str,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
) -> None:
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.handler: logging.FileHandler | None = None
self.remote_base = s3_log_folder
self.log_relative_path = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
It extends airflow FileTaskHandler and uploads to and reads from HDFS.
"""

def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
super().__init__(base_log_folder)
def __init__(
self,
base_log_folder: str,
hdfs_log_folder: str,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
) -> None:
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.handler: logging.FileHandler | None = None
self.remote_base = urlsplit(hdfs_log_folder).path
self.log_relative_path = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,19 @@ def __init__(
index_patterns: str = conf.get("elasticsearch", "index_patterns"),
index_patterns_callable: str = conf.get("elasticsearch", "index_patterns_callable", fallback=""),
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
):
) -> None:
es_kwargs = es_kwargs or {}
if es_kwargs == "default_es_kwargs":
es_kwargs = get_es_kwargs_from_config()
self.host = self.format_url(host)
super().__init__(base_log_folder)
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.closed = False

self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ def __init__(
gcp_keyfile_dict: dict | None = None,
gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS,
project_id: str = PROVIDE_PROJECT_ID,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
):
super().__init__(base_log_folder)
) -> None:
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.handler: logging.FileHandler | None = None
self.log_relative_path = ""
self.closed = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,15 @@ def __init__(
base_log_folder: str,
wasb_log_folder: str,
wasb_container: str,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
**kwargs,
) -> None:
super().__init__(base_log_folder)
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.handler: logging.FileHandler | None = None
self.log_relative_path = ""
self.closed = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,17 @@ def __init__(
index_patterns: str = conf.get("opensearch", "index_patterns", fallback="_all"),
index_patterns_callable: str = conf.get("opensearch", "index_patterns_callable", fallback=""),
os_kwargs: dict | None | Literal["default_os_kwargs"] = "default_os_kwargs",
):
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
) -> None:
os_kwargs = os_kwargs or {}
if os_kwargs == "default_os_kwargs":
os_kwargs = get_os_kwargs_from_config()
super().__init__(base_log_folder)
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.closed = False
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ def __init__(
max_lines: int = 10000,
ttl_seconds: int = 60 * 60 * 24 * 28,
conn_id: str | None = None,
):
super().__init__(base_log_folder)
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
) -> None:
# support log file size handling of FileTaskHandler
super().__init__(
base_log_folder=base_log_folder, max_bytes=max_bytes, backup_count=backup_count, delay=delay
)
self.handler: _RedisHandler | None = None
self.max_lines = max_lines
self.ttl_seconds = ttl_seconds
Expand Down