Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
optimize fetch_existing, fixes #447 #445 #446 (#448)
Browse files Browse the repository at this point in the history
* optimize fetch_existing, fixes #447 #445 #446

* remove fetch_existing migration test

* test fixes

* refactoring endpoint variable
  • Loading branch information
Peter Bengtsson authored May 3, 2018
1 parent c5e25f1 commit 71f34fd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 118 deletions.
114 changes: 47 additions & 67 deletions jobs/buildhub/to_kinto.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
NB_RETRY_REQUEST = 3
WAIT_TIMEOUT = 5
BATCH_MAX_REQUESTS = config('BATCH_MAX_REQUESTS', default=9999, cast=int)
OLD_PREVIOUS_DUMP_FILENAME = '.records-{server}-{bucket}-{collection}.json'
PREVIOUS_DUMP_FILENAME = '.records-hashes-{server}-{bucket}-{collection}.json'
CACHE_FOLDER = config('CACHE_FOLDER', default='.')

Expand All @@ -56,40 +55,16 @@
done = object()


def _migrate_old_dump_file(old_file, new_file):
"""The old file is a .json file that, when opened, is a massive list of
dictionaries. Open it and save to the new JSON file.
The format of the JSON file is like this:
ID --> [last_modified, md5_hash_string]
E.g.
"""
with open(old_file) as f:
data = json.load(f)

new_data = {}
for record in data:
new_data[record['id']] = [
record['last_modified'],
_hash_record_mutate(record)
]

with open(new_file, 'w') as f:
json.dump(new_data, f, sort_keys=True, indent=2)


def hash_record(record):
"""Return a hash string (based of MD5) that is 32 characters long.
This function does *not mutate* the record but needs to make a copy of
the record (and mutate that) so it's less performant.
"""
return _hash_record_mutate(copy.deepcopy(record))
return hash_record_mutate(copy.deepcopy(record))


def _hash_record_mutate(record):
def hash_record_mutate(record):
"""Return a hash string (based of MD5) that is 32 characters long.
NOTE! For performance, this function *will mutate* the record object.
Expand All @@ -103,10 +78,10 @@ def _hash_record_mutate(record):
).hexdigest()


@metrics.timer_decorator('to_kinto_fetch_existing')
def fetch_existing(
client,
cache_file=PREVIOUS_DUMP_FILENAME,
old_cache_file=OLD_PREVIOUS_DUMP_FILENAME,
cache_file=PREVIOUS_DUMP_FILENAME
):
"""Fetch all records since last run. A JSON file on disk is used to store
records from previous run.
Expand All @@ -116,34 +91,6 @@ def fetch_existing(
bucket=client._bucket_name,
collection=client._collection_name))

# Note! Some time in late 2018 we can delete these lines. By then,
# Stage and Prod (and pretty much every active developers') will have
# switched to the new hash-based dump file. Till then, let these lines
# sit around. If the migration happens and is successful, the old
# cache file gets deleted.
if not os.path.exists(cache_file):

# Perhaps the old file exists!
# Prior to April 2018 we used dump ALL kinto records into a .json
# file as a massive list of dicts.
# The problem with that was that it bloated RAM badly.
# The .json file was around 700+MB and when loaded into Python as a
# list object it would take up about 2.4GB of RAM.
# (Note the .json file of hashes, when read in to Python becomes
# about 200MB)
# Also, by always hashing the records consistently we could just
# compare two hashes (as strings) instead of having to compare
# dictionaries.
old_cache_file = os.path.join(CACHE_FOLDER, old_cache_file.format(
server=urlparse(client.session.server_url).hostname,
bucket=client._bucket_name,
collection=client._collection_name))
if os.path.exists(old_cache_file):
_migrate_old_dump_file(old_cache_file, cache_file)
logger.info(f'Migrated dump file {old_cache_file} to {cache_file}')
os.remove(old_cache_file)
# End dump file migration.

records = {}
previous_run_etag = None

Expand All @@ -155,16 +102,49 @@ def fetch_existing(
)
previous_run_etag = '"%s"' % highest_timestamp

new_records = client.get_records(
_since=previous_run_etag,
pages=float('inf')
)

for record in new_records:
records[record['id']] = [
record['last_modified'],
hash_record(record)
]
# The reason we can't use client.get_records() is because it is not
# an iterator and if the Kinto database has 1M objects we'll end up
# with a big fat Python list of 1M objects that has repeatedly caused
# OOM errors in our stage and prod admin nodes.
if previous_run_etag:
# However, we can use it if there was a previous_run_etag which
# means we only need to extract a limited amount of records. Not
# the whole Kinto database.
new_records_batches = [client.get_records(
_since=previous_run_etag,
pages=float('inf')
)]
else:
def new_records_iterator():
params = {'_since': None}
endpoint = client.get_endpoint('records')
while True:
record_resp, headers = client.session.request(
'get',
endpoint,
params=params
)
yield record_resp['data']
try:
endpoint = headers['Next-Page']
if not endpoint:
raise KeyError('exists but empty value')
except KeyError:
break
params.pop('_since', None)

new_records_batches = new_records_iterator()

count_new_records = 0
for new_records in new_records_batches:
for record in new_records:
count_new_records += 1
records[record['id']] = [
record['last_modified'],
hash_record_mutate(record)
]

metrics.gauge('to_kinto_fetched_new_records', count_new_records)

# Atomic write.
if records:
Expand Down
59 changes: 8 additions & 51 deletions jobs/tests/test_to_kinto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.

import json
import os
import unittest
# Because you can't just import unittest and access 'unittest.mock.MagicMock'
from unittest.mock import MagicMock
Expand All @@ -20,22 +18,26 @@ def init_cache_files(self, tmpdir):
# Use str() on these LocalPath instances to turn them into plain
# strings since to_kinto.fetch_existing() expects it to be a string.
self.cache_file = str(tmpdir.join('cache.json'))
self.old_cache_file = str(tmpdir.join('old.json'))

def test_records_are_not_duplicated(self):
mocked = MagicMock()

mocked.session.server_url = 'http://localhost:8888/v1'
# First, populate the cache.
mocked.get_records.return_value = [
{'id': 'a', 'title': 'a', 'last_modified': 1}
]
mocked.session.request.return_value = (
{
'data': [{'id': 'a', 'title': 'a', 'last_modified': 1}]
},
{} # headers
)
first = fetch_existing(mocked, cache_file=self.cache_file)
assert isinstance(first, dict)
assert len(first) == 1
assert first['a'][0] == 1 # [0] is the last_modified
first_hash = first['a'][1] # [1] is the hash string

# Now that the cache file exists, it will use the regular
# client.get_records call.
mocked.get_records.return_value = [
{'id': 'a', 'title': 'b', 'last_modified': 2}
]
Expand All @@ -45,48 +47,3 @@ def test_records_are_not_duplicated(self):
assert second['a'][0] == 2
second_hash = second['a'][1]
assert first_hash != second_hash

def test_dump_file_cache_migration(self):
# Make sure the new cache file doesn't exist
if os.path.exists(self.cache_file):
os.remove(self.cache_file)
# The old dump used to a list of records.
with open(self.old_cache_file, 'w') as f:
records = [
{'id': 'a', 'last_modified': 1, 'title': 'a'}
]
json.dump(records, f)
mocked = MagicMock()
mocked.session.server_url = 'http://localhost:8888/v1'
# First, populate the cache.
mocked.get_records.return_value = [
{'id': 'a', 'title': 'b', 'last_modified': 2},
]
fetch_existing(
mocked,
cache_file=self.cache_file,
old_cache_file=self.old_cache_file
)
assert not os.path.exists(self.old_cache_file)
with open(self.cache_file) as f:
records = json.load(f)
assert len(records) == 1
assert records['a'][0] == 2 # [0] is last modified

# The migration is done, but let's make sure the fetch_existing
# continues to work as expected.
# Prend the get_records() is more realistically called with an
# ?since=etag so this, second time, it won't include
# the previous `{'id': 'a', 'title': 'b', 'last_modified': 2}` record.
mocked.get_records.return_value = [
{'id': 'b', 'title': 'bee', 'last_modified': 3},
]
fetch_existing(
mocked,
cache_file=self.cache_file,
old_cache_file=self.old_cache_file
)
assert not os.path.exists(self.old_cache_file) # still
with open(self.cache_file) as f:
records = json.load(f)
assert len(records) == 2

0 comments on commit 71f34fd

Please sign in to comment.