Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement onclusive reingesting #1971

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/planning/events/events_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def test_new_planning_is_published_when_adding_to_published_event(self):

def test_related_planning_item_validation_on_post(self):
"""
check planning item fields validation
Check planning item fields validation
if validation fails, plannning item is not posted.
"""
events_service = get_resource_service("events")
Expand Down
38 changes: 36 additions & 2 deletions server/planning/feeding_services/onclusive_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from urllib.parse import urljoin
from superdesk.errors import ProviderError
from celery.exceptions import SoftTimeLimitExceeded
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import touch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,6 +60,13 @@ class OnclusiveApiService(HTTPFeedingServiceBase):
"required": False,
"default": 365,
},
{
"id": "days_to_reingest",
"type": "text",
"label": lazy_gettext("Days in the past to Reingest"),
"placeholder": lazy_gettext("Days"),
"required": False,
},
]

HTTP_AUTH = False
Expand Down Expand Up @@ -86,6 +95,22 @@ def _update(self, provider, update):
update["last_updated"] = utcnow().replace(
second=0
) # next time start from here, onclusive api does not use seconds

# force reingest starting from now - days_to_reingest
if provider["config"].get("days_to_reingest"):
start_date = datetime.now() - timedelta(days=int(provider["config"]["days_to_reingest"]))
logger.info("Reingesting from %s", start_date.date().isoformat())
update["config"] = provider["config"].copy()
update["config"]["days_to_reingest"] = ""
# override to reset
update["tokens"]["start_date"] = start_date
update["tokens"]["next_start"] = start_date
update["tokens"]["reingesting"] = True
update["tokens"]["import_finished"] = None
update["tokens"]["date"] = ""

reingesting = update["tokens"].get("reingesting")

if update["tokens"].get("import_finished"):
# populate it for cases when import was done before we introduced the field
update["tokens"].setdefault("next_start", update["tokens"]["import_finished"] - timedelta(hours=5))
Expand Down Expand Up @@ -120,18 +145,27 @@ def _update(self, provider, update):
if date > processed_date # when continuing skip previously ingested days
)
logger.info("ingest from onclusive %s with params %s", url, params)
lock_name = get_lock_id("ingest", provider["name"], provider["_id"])
try:
for i in iterations:
if not touch(lock_name, expire=60 * 15):
break
params[iterations_param] = i
logger.info("Onclusive PARAMS %s", params)
content = self._fetch(url, params, provider, update["tokens"])
items = parser.parse(content, provider)
logger.info("Onclusive returned %d items", len(items))
for item in items:
item.setdefault("language", self.language)
yield items
if reingesting:
item["versioncreated"] += timedelta(seconds=1) # bump versioncreated to trigger an update
if items:
yield items
update["tokens"][iterations_param] = i
update["tokens"].setdefault("import_finished", utcnow())
else:
# there was no break so we are done
update["tokens"]["import_finished"] = utcnow()
update["tokens"]["reingesting"] = False
except SoftTimeLimitExceeded:
logger.warning("stopped due to time limit, tokens=%s", update["tokens"])

Expand Down
130 changes: 86 additions & 44 deletions server/planning/feeding_services/onclusive_api_service_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from planning.feed_parsers.onclusive import OnclusiveFeedParser
from .onclusive_api_service import OnclusiveApiService
from unittest.mock import MagicMock
from datetime import datetime, timedelta

import flask
import unittest
import requests_mock
import responses

from unittest.mock import MagicMock, patch
from datetime import datetime, timedelta
from planning.feed_parsers.onclusive import OnclusiveFeedParser

from .onclusive_api_service import OnclusiveApiService


parser = MagicMock(OnclusiveFeedParser)
Expand All @@ -15,54 +16,95 @@ class OnclusiveApiServiceTestCase(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
self.app = flask.Flask(__name__)
self.service = OnclusiveApiService()
self.service.get_feed_parser = MagicMock(return_value=parser)
event = {"versioncreated": datetime(2023, 3, 1, 8, 0, 0)}

def test_update(self):
event = {"versioncreated": datetime.fromisoformat("2023-03-01T08:00:00")}
with self.app.app_context():
now = datetime.utcnow()
service = OnclusiveApiService()
service.get_feed_parser = MagicMock(return_value=parser)
parser.parse.return_value = [event]

provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}
parser.parse.return_value = [
event.copy(),
]

# for requests.json we need to convert datetime to string
self.event = {"versioncreated": event["versioncreated"].isoformat()}

self.provider = {
"_id": "onclusive_api",
"name": "onclusive",
"feed_parser": "onclusive_api",
"config": {"url": "https://api.abc.com", "username": "user", "password": "pass", "days": "30"},
}

@responses.activate
@patch("planning.feeding_services.onclusive_api_service.touch")
def test_update(self, lock_touch):
responses.post(
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)

now = datetime.now()
responses.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[self.event],
) # first returns an item
responses.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't

with self.app.app_context():
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth",
json={
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get(
"https://api.abc.com/api/v2/events/date?date={}".format(now.strftime("%Y%m%d")),
json=[{"versioncreated": event["versioncreated"].isoformat()}],
) # first returns an item
m.get("https://api.abc.com/api/v2/events/date", json=[]) # ones won't
items = list(service._update(provider, updates))
items = list(self.service._update(self.provider, updates))
self.assertIn("tokens", updates)
self.assertEqual("refresh", updates["tokens"]["refreshToken"])
self.assertIn("import_finished", updates["tokens"])
self.assertEqual(updates["last_updated"], updates["tokens"]["next_start"])
self.assertEqual("fr-CA", items[0][0]["language"])

provider.update(updates)
self.provider.update(updates)
updates = {}
with requests_mock.Mocker() as m:
m.post(
"https://api.abc.com/api/v2/auth/renew",
responses.post(
"https://api.abc.com/api/v2/auth/renew",
json={
"token": "tok2",
"refreshToken": "refresh2",
},
)
responses.get("https://api.abc.com/api/v2/events/latest", json=[])
list(self.service._update(self.provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

@patch("planning.feeding_services.onclusive_api_service.touch")
def test_reingest(self, lock_touch):
with self.app.app_context():
start = datetime.now() - timedelta(days=30)
self.provider["config"]["days_to_reingest"] = "30"
self.provider["config"]["days_to_ingest"] = "10"
updates = {}
with responses.RequestsMock() as rsps: # checks if all requests were fired
rsps.add(
responses.POST,
url="https://api.abc.com/api/v2/auth",
json={
"token": "tok2",
"refreshToken": "refresh2",
"token": "tok",
"refreshToken": "refresh",
"productId": 10,
},
)
m.get("https://api.abc.com/api/v2/events/latest", json=[])
list(service._update(provider, updates))
self.assertEqual("refresh2", updates["tokens"]["refreshToken"])

for i in range(0, 10):
rsps.add(
responses.GET,
"https://api.abc.com/api/v2/events/date?limit=2000&date={}".format(
(start + timedelta(days=i)).strftime("%Y%m%d")
),
json=[self.event],
)

items = list(self.service._update(self.provider, updates))
assert 10 == len(items)
assert 1 == len(items[0])
assert items[0][0]["versioncreated"].isoformat() > self.event["versioncreated"]
assert updates["tokens"]["import_finished"]
assert not updates["tokens"]["reingesting"]
Loading