Skip to content

Commit

Permalink
Use ProcessPoolExecutor over ThreadPoolExecutor.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Make `max_workers` configurable.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed May 14, 2024
1 parent 7db851f commit b9bb056
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,10 @@ def is_disabled() -> bool:
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""


@cache
def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1)
return option
5 changes: 3 additions & 2 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
from __future__ import annotations

import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING

from openlineage.client.serde import Serde

from airflow import __version__ as airflow_version
from airflow.listeners import hookimpl
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
Expand Down Expand Up @@ -281,7 +282,7 @@ def on_failure():
@property
def executor(self):
if not self._executor:
self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_")
self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size())
return self._executor

@hookimpl
Expand Down
21 changes: 21 additions & 0 deletions tests/providers/openlineage/plugins/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope
listener.adapter.complete_task.assert_not_called()


@pytest.mark.parametrize(
"max_workers,expected",
[
(None, 1),
("8", 8),
],
)
@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True)
def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected):
"""mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers"""
listener = OpenLineageListener()
# mock ProcessPoolExecutor class
try:
with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}):
listener.on_dag_run_running(mock.MagicMock(), None)
mock_executor.assert_called_once_with(max_workers=expected)
mock_executor.return_value.submit.assert_called_once()
finally:
conf.dag_state_change_process_pool_size.cache_clear()


class TestOpenLineageSelectiveEnable:
def setup_method(self):
self.dag = DAG(
Expand Down

0 comments on commit b9bb056

Please sign in to comment.