Skip to content

Commit

Permalink
Rename to XComObjectStorageBackend (#38607)
Browse files Browse the repository at this point in the history
The config names all use "object storage", but the class is still using
"object store". Since we're not renaming the configs, let's rename the
class for consistency.

(cherry picked from commit 0371ea8)
  • Loading branch information
uranusjr authored and ephraimbuddy committed Mar 31, 2024
1 parent dcac0e1 commit 4cba4ef
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/common/io/xcom/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_compression_suffix(compression: str) -> str:
raise ValueError(f"Compression {compression} is not supported. Make sure it is installed.")


class XComObjectStoreBackend(BaseXCom):
class XComObjectStorageBackend(BaseXCom):
"""XCom backend that stores data in an object store or database depending on the size of the data.
If the value is larger than the configured threshold, it will be stored in an object store.
Expand Down Expand Up @@ -155,7 +155,7 @@ def deserialize_value(
path = conf.get(SECTION, "xcom_objectstorage_path", fallback="")

try:
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(data)
return json.load(p.open(mode="rb", compression="infer"), cls=XComDecoder)
except TypeError:
return data
Expand All @@ -167,7 +167,7 @@ def purge(xcom: XCom, session: Session) -> None:
path = conf.get(SECTION, "xcom_objectstorage_path", fallback="")
if isinstance(xcom.value, str):
try:
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(xcom.value)
p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(xcom.value)
p.unlink(missing_ok=True)
except TypeError:
pass
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-common-io/xcom_backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Object Storage XCom Backend

The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.

To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required
to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be
put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be
Expand All @@ -30,7 +30,7 @@ compress the data before storing it in object storage.
So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip::

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/core-concepts/xcoms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Object Storage XCom Backend

The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.

To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required
to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be
put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be
Expand All @@ -74,7 +74,7 @@ compress the data before storing it in object storage.
So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip::

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
Expand Down
12 changes: 6 additions & 6 deletions tests/providers/common/io/xcom/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.io.xcom.backend import XComObjectStoreBackend
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -85,15 +85,15 @@ def task_instance(task_instance_factory):
)


class TestXcomObjectStoreBackend:
class TestXComObjectStorageBackend:
path = "file:/tmp/xcom"

def setup_method(self):
try:
conf.add_section("common.io")
except DuplicateSectionError:
pass
conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStoreBackend")
conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStorageBackend")
conf.set("common.io", "xcom_objectstorage_path", self.path)
conf.set("common.io", "xcom_objectstorage_threshold", "50")
settings.configure_vars()
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_value_storage(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True

value = XCom.get_value(
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_clear(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True

XCom.clear(
Expand Down Expand Up @@ -250,7 +250,7 @@ def test_compression(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True
assert p.suffix == ".gz"

Expand Down

0 comments on commit 4cba4ef

Please sign in to comment.