Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Disk cache the manifests, fixes #392 #403

Merged
merged 7 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ buildhub-lambda*.zip

jobs/.cache
ui/node_modules/
csv-download-directory/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ buildhub-lambda-*.zip
.docker-build
.metadata-*.json
.records-kinto-build-hub-releases.json
csv-download-directory/
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ services:
- "kinto"
environment:
- SERVER_URL=http://kinto:8888/v1
- CSV_DOWNLOAD_DIRECTORY=./csv-download-directory
volumes:
- $PWD:/app
# All things within the 'buildhub' directive are meant to be executed
Expand Down
3 changes: 1 addition & 2 deletions jobs/buildhub/inventory_to_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def fetch_release_metadata(session, record):

async def process_batch(session, batch, skip_incomplete):
# Parallel fetch of metadata for each item of the batch.
logger.info('Fetch metadata for {} releases...'.format(len(batch)))
logger.debug('Fetch metadata for {} releases...'.format(len(batch)))
futures = [fetch_metadata(session, record) for record in batch]
metadatas = await asyncio.gather(*futures)
results = [merge_metadata(record, metadata)
Expand Down Expand Up @@ -439,7 +439,6 @@ def deduplicate_entries(entries):
entry['LastModifiedDate'],
'%Y-%m-%dT%H:%M:%S.%fZ'
)

if min_last_modified and lastmodified < min_last_modified:
continue

Expand Down
93 changes: 81 additions & 12 deletions jobs/buildhub/s3_inventory_to_kinto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@
import re
import asyncio
import datetime
import glob
import json
import logging
import os
import pkgutil
import tempfile
import time
import zlib
from concurrent.futures import ThreadPoolExecutor

import aiofiles
import aiobotocore
import botocore
from decouple import config
from aiohttp.client_exceptions import ClientPayloadError
import kinto_http
import raven
from raven.handlers.logging import SentryHandler
Expand All @@ -33,13 +39,19 @@
'delivery-{inventory}/'
)
CHUNK_SIZE = 1024 * 256 # 256 KB
MAX_CSV_DOWNLOAD_AGE = 60 * 60 * 24 * 2 # two days

INITIALIZE_SERVER = os.getenv('INITIALIZE_SERVER', 'true').lower() == 'true'

# Minimum number of hours old an entry in the CSV files need to be
# to NOT be skipped.
MIN_AGE_LAST_MODIFIED_HOURS = int(os.getenv('MIN_AGE_LAST_MODIFIED_HOURS', 0))

CSV_DOWNLOAD_DIRECTORY = config(
'CSV_DOWNLOAD_DIRECTORY',
tempfile.gettempdir()
)

# Optional Sentry with synchronuous client.
SENTRY_DSN = os.getenv('SENTRY_DSN')
sentry = raven.Client(SENTRY_DSN, transport=raven.transport.http.HTTPTransport)
Expand Down Expand Up @@ -129,25 +141,82 @@ async def list_manifest_entries(loop, s3_client, inventory):
async with manifest['Body'] as stream:
body = await stream.read()
manifest_content = json.loads(body.decode('utf-8'))
# Return keys of csv.gz files
for f in manifest_content['files']:
yield f['key']


async def download_csv(loop, s3_client, keys_stream, chunk_size=CHUNK_SIZE):
# Here, each 'f' is a dictionary that looks something like this:
#
# {
# "key" : "inventories/net-mozaw...f-b1a0-5fb25bb83752.csv.gz",
# "size" : 7945521,
# "MD5checksum" : "7454b0d773000f790f15b867ee152049"
# }
#
# We yield the whole thing. The key is used to download from S3.
# The MD5checksum is used to know how to store the file on
# disk for caching.
yield f


async def download_csv(
loop,
s3_client,
files_stream,
chunk_size=CHUNK_SIZE,
download_directory=CSV_DOWNLOAD_DIRECTORY,
):
"""
Download the S3 object of each key and return deflated data chunks (CSV).
:param loop: asyncio event loop.
:param s3_client: Initialized S3 client.
:param keys_stream async generator: List of object keys for
the csv.gz manifests.
"""
async for key in keys_stream:
key = 'public/' + key
logger.info('Fetching inventory piece {}'.format(key))
file_csv_gz = await s3_client.get_object(Bucket=BUCKET, Key=key)

# Make sure the directory exists if it wasn't already created.
if not os.path.isdir(download_directory):
os.makedirs(download_directory, exists_ok=True)

# Look for old download junk in the download directory.
too_old = MAX_CSV_DOWNLOAD_AGE
for file_path in glob.glob(os.path.join(download_directory, '*.csv.gz')):
age = time.time() - os.stat(file_path).st_mtime
if age > too_old:
logger.info(
f'Delete old download file {file_path} '
f'({age} seconds old)'
)
os.remove(file_path)

async for files in files_stream:
# If it doesn't exist on disk, download to disk.
file_path = os.path.join(
download_directory,
files['MD5checksum'] + '.csv.gz'
)
# The file neither exists or has data.
if os.path.isfile(file_path) and os.stat(file_path).st_size:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add explicit st_size > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out-of-curiousity; why?

Isn't it explicit already that the test is for the st_size to be anything greater than 0?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matter of taste maybe :)

logger.debug(f'{file_path} was already downloaded locally')
else:
key = 'public/' + files['key']
logger.info('Fetching inventory piece {}'.format(key))
file_csv_gz = await s3_client.get_object(Bucket=BUCKET, Key=key)
try:
async with aiofiles.open(file_path, 'wb') as destination:
async with file_csv_gz['Body'] as source:
while 'there are chunks to read':
gzip_chunk = await source.read(chunk_size)
if not gzip_chunk:
break # End of response.
await destination.write(gzip_chunk)
size = os.stat(file_path).st_size
logger.info(f'Downloaded {key} to {file_path} ({size} bytes)')
except ClientPayloadError:
if os.path.exists(file_path):
os.remove(file_path)
raise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r+ :)


# Now we expect the file to exist locally. Let's read it.
gzip = zlib.decompressobj(zlib.MAX_WBITS | 16)
async with file_csv_gz['Body'] as stream:
async with aiofiles.open(file_path, 'rb') as stream:
while 'there are chunks to read':
gzip_chunk = await stream.read(chunk_size)
if not gzip_chunk:
Expand Down Expand Up @@ -195,8 +264,8 @@ async def main(loop, inventory):
async with session.create_client(
's3', region_name=REGION_NAME, config=boto_config
) as client:
keys_stream = list_manifest_entries(loop, client, inventory)
csv_stream = download_csv(loop, client, keys_stream)
files_stream = list_manifest_entries(loop, client, inventory)
csv_stream = download_csv(loop, client, files_stream)
records_stream = csv_to_records(
loop,
csv_stream,
Expand Down
3 changes: 3 additions & 0 deletions jobs/requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ datadog==0.20.0 \
--hash=sha256:7bb4af836d7422a6138f983b81c16acd56c2d608913982602856cc273ae74768
python-decouple==3.1 \
--hash=sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d
aiofiles==0.3.2 \
--hash=sha256:25c66ea3872d05d53292a6b3f7fa0f86691512076446d83a505d227b5e76f668 \
--hash=sha256:852a493a877b73e11823bfd4e8e5ef2610d70d12c9eaed961bcd9124d8de8c10
3 changes: 3 additions & 0 deletions jobs/tests/test_inventory_to_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ async def setUp(self):
mocked.get(utils.ARCHIVE_URL + url, payload=payload)
self.addCleanup(mocked.stop)

# inventory_to_records._candidates_build_folder.clear()

async def async_gen():
_csv_input = (
'net-mozaws-delivery-firefox,pub/firefox/releases/51.0/win64/'
Expand Down Expand Up @@ -827,6 +829,7 @@ async def test_csv_to_records(self):
async for r in output:
records.append(r)

assert len(records) == 1
assert records == [{
'data': {
'id': 'firefox_51-0_win64_fy-nl',
Expand Down
25 changes: 19 additions & 6 deletions jobs/tests/test_s3_inventory_to_kinto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.

import base64
import json

import asynctest

Expand Down Expand Up @@ -33,7 +34,12 @@ async def __aexit__(self, *args):
return self

async def read(self):
return b'{"files": [{"key": "a/b"}, {"key": "c/d"}]}'
return json.dumps({
'files': [
{'key': 'a/b', 'size': 1234, 'MD5checksum': 'eaf123'},
{'key': 'c/d', 'size': 2222, 'MD5checksum': 'ef001'},
]
}).encode('utf-8')

class FakeClient:
def get_paginator(self, *args):
Expand All @@ -53,7 +59,10 @@ async def test_return_keys_of_latest_manifest(self):
'firefox'
):
results.append(r)
assert results == ['a/b', 'c/d']
assert results == [
{'key': 'a/b', 'size': 1234, 'MD5checksum': 'eaf123'},
{'key': 'c/d', 'size': 2222, 'MD5checksum': 'ef001'},
]


class DownloadCSV(asynctest.TestCase):
Expand Down Expand Up @@ -84,11 +93,15 @@ async def get_object(self, Bucket, Key):

async def test_unzip_chunks(self):

async def keys():
yield 'key-1'
async def files_iterator():
yield {
'key': 'key-1',
'size': 123456,
'MD5checksum': 'deadbeef0123',
}

keys = keys()
files = files_iterator()
results = []
async for r in download_csv(self.loop, self.client, keys):
async for r in download_csv(self.loop, self.client, files):
results.append(r)
assert results == [b"1;2;3;4\n5;6"]