Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: allow repeated cache file removals & cleanup cache files in test #19533

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airbyte-cdk/python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
.coverage

# TODO: these are tmp files generated by unit tests. They should go to the /tmp directory.
cache_http_stream*.yml
2 changes: 2 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Changelog
## 0.9.5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.9.5 since there is another PR claiming 0.9.4 already

Allow repeated cache removals & clean up unit test cache files

## 0.9.3
Low-code: Avoid duplicate HTTP query in `simple_retriever`
Expand Down
8 changes: 3 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ def clear_cache(self):
"""
remove cache file only once
"""
STREAM_CACHE_FILES = globals().setdefault("STREAM_CACHE_FILES", set())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grubberr tagging you since you had the git blame: why did this block only allow a single removal of the cache file? the current change is passing unit tests. Is that an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada oops I have answered on below

if self.cache_filename not in STREAM_CACHE_FILES:
with suppress(FileNotFoundError):
os.remove(self.cache_filename)
STREAM_CACHE_FILES.add(self.cache_filename)
with suppress(FileNotFoundError):
os.remove(self.cache_filename)
print(f"Removed {self.cache_filename}")
Copy link
Contributor

@grubberr grubberr Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada

This can be dangerous, for example:

  1. We create Teams stream, on start it removes cache file and create it - file-inode-1
  2. We create TeamMembers(parent=Teams), on start it again removes cache file and create it file-inode-2
  3. Teams assume it has access to file-inode-1 but in realy it's already file-inode-2

I definitely remember there were some side-effects if we removed file not once

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We start to catch sqlite3 runtime exceptions

Copy link
Contributor

@grubberr grubberr Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada let me cover that runtime side-effects with unit_tests today

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada example of problem from source-github:

File "/home/user/airbyte/airbyte-integrations/connectors/source-github/.venv/lib/python3.9/site-packages/requests_cache/backends/base.py", line 100, in save_response
    self.responses[cache_key] = cached_response
  File "/home/user/airbyte/airbyte-integrations/connectors/source-github/.venv/lib/python3.9/site-packages/requests_cache/backends/sqlite.py", line 268, in __setitem__
    super().__setitem__(key, serialized_value)
  File "/home/user/airbyte/airbyte-integrations/connectors/source-github/.venv/lib/python3.9/site-packages/requests_cache/backends/sqlite.py", line 220, in __setitem__
    con.execute(
sqlite3.OperationalError: attempt to write a readonly database

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada

I have dived into request_cache internals and it seems it happens something like this under the hood:

#!/home/user/airbyte/airbyte-integrations/connectors/source-github/.venv/bin/python3

import os
import sqlite3

try:
    os.unlink("cache.sqlite")
except FileNotFoundError:
    pass

con = sqlite3.connect("cache.sqlite")
row = con.execute('CREATE TABLE t (col VARCHAR)')
con.commit()

os.unlink("cache.sqlite") # <- ATTENTION HERE

con.execute("INSERT INTO t (col) VALUES ('value')");
Traceback (most recent call last):
  File "/home/user/airbyte/airbyte-integrations/connectors/source-github/./t.py", line 28, in <module>
    con.execute("INSERT INTO t (col) VALUES ('value')");
sqlite3.OperationalError: attempt to write a readonly database


@property
@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.9.3",
version="0.9.5",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import pytest
import requests
import tempfile
from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
Expand All @@ -29,7 +30,7 @@

@patch.object(HttpStream, "_read_pages", return_value=[])
def test_simple_retriever_full(mock_http_stream):
requester = MagicMock()
requester = MagicMock(use_cache=False)
request_params = {"param": "value"}
requester.get_request_params.return_value = request_params

Expand Down Expand Up @@ -69,7 +70,7 @@ def test_simple_retriever_full(mock_http_stream):
requester.get_request_body_json.return_value = request_body_json
request_kwargs = {"kwarg": "value"}
requester.request_kwargs.return_value = request_kwargs
cache_filename = "cache"
cache_filename = tempfile.NamedTemporaryFile().name
requester.cache_filename = cache_filename
use_cache = True
requester.use_cache = use_cache
Expand Down Expand Up @@ -114,7 +115,7 @@ def test_simple_retriever_full(mock_http_stream):

@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records])
def test_simple_retriever_with_request_response_logs(mock_http_stream):
requester = MagicMock()
requester = MagicMock(use_cache=False)
paginator = MagicMock()
record_selector = MagicMock()
iterator = DatetimeStreamSlicer(
Expand Down Expand Up @@ -143,7 +144,7 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream):

@patch.object(HttpStream, "_read_pages", return_value=[])
def test_simple_retriever_with_request_response_log_last_records(mock_http_stream):
requester = MagicMock()
requester = MagicMock(use_cache=False)
paginator = MagicMock()
record_selector = MagicMock()
record_selector.select_records.return_value = request_response_logs
Expand Down
66 changes: 34 additions & 32 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
from http import HTTPStatus
import tempfile
from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY, MagicMock, patch

Expand Down Expand Up @@ -352,7 +353,12 @@ def test_body_for_all_methods(self, mocker, requests_mock):

class CacheHttpStream(StubBasicReadHttpStream):
use_cache = True

def __enter__(self):
return self

def __exit__(self, *args):
self.clear_cache()

class CacheHttpSubStream(HttpSubStream):
url_base = "https://example.com"
Expand All @@ -372,32 +378,28 @@ def path(self, **kwargs) -> str:


def test_caching_filename():
stream = CacheHttpStream()
assert stream.cache_filename == f"{stream.name}.sqlite"
with CacheHttpStream() as stream:
assert stream.cache_filename == f"{stream.name}.sqlite"


def test_caching_sessions_are_different():
stream_1 = CacheHttpStream()
stream_2 = CacheHttpStream()

assert stream_1._session != stream_2._session
assert stream_1.cache_filename == stream_2.cache_filename
with CacheHttpStream() as stream_1, CacheHttpStream() as stream_2:
assert stream_1._session != stream_2._session
assert stream_1.cache_filename == stream_2.cache_filename


def test_parent_attribute_exist():
parent_stream = CacheHttpStream()
child_stream = CacheHttpSubStream(parent=parent_stream)

assert child_stream.parent == parent_stream
with CacheHttpStream() as parent_stream:
child_stream = CacheHttpSubStream(parent=parent_stream)
assert child_stream.parent == parent_stream


def test_cache_response(mocker):
stream = CacheHttpStream()
mocker.patch.object(stream, "url_base", "https://google.com/")
list(stream.read_records(sync_mode=SyncMode.full_refresh))

with open(stream.cache_filename, "rb") as f:
assert f.read()
with CacheHttpStream() as stream:
mocker.patch.object(stream, "url_base", "https://google.com/")
list(stream.read_records(sync_mode=SyncMode.full_refresh))
with open(stream.cache_filename, "rb") as f:
assert f.read()


class CacheHttpStreamWithSlices(CacheHttpStream):
Expand All @@ -419,27 +421,27 @@ def test_using_cache(mocker, requests_mock):
requests_mock.register_uri("GET", "https://google.com/", text="text")
requests_mock.register_uri("GET", "https://google.com/search", text="text")

parent_stream = CacheHttpStreamWithSlices()
mocker.patch.object(parent_stream, "url_base", "https://google.com/")
with CacheHttpStreamWithSlices() as parent_stream:
mocker.patch.object(parent_stream, "url_base", "https://google.com/")

assert requests_mock.call_count == 0
assert parent_stream._session.cache.response_count() == 0
assert requests_mock.call_count == 0
assert parent_stream._session.cache.response_count() == 0

for _slice in parent_stream.stream_slices():
list(parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice))
for _slice in parent_stream.stream_slices():
list(parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice))

assert requests_mock.call_count == 2
assert parent_stream._session.cache.response_count() == 2
assert requests_mock.call_count == 2
assert parent_stream._session.cache.response_count() == 2

child_stream = CacheHttpSubStream(parent=parent_stream)
child_stream = CacheHttpSubStream(parent=parent_stream)

for _slice in child_stream.stream_slices(sync_mode=SyncMode.full_refresh):
pass
for _slice in child_stream.stream_slices(sync_mode=SyncMode.full_refresh):
pass

assert requests_mock.call_count == 2
assert parent_stream._session.cache.response_count() == 2
assert parent_stream._session.cache.has_url("https://google.com/")
assert parent_stream._session.cache.has_url("https://google.com/search")
assert requests_mock.call_count == 2
assert parent_stream._session.cache.response_count() == 2
assert parent_stream._session.cache.has_url("https://google.com/")
assert parent_stream._session.cache.has_url("https://google.com/search")


class AutoFailTrueHttpStream(StubBasicReadHttpStream):
Expand Down