diff --git a/.dockerignore b/.dockerignore index 63d7acee..5592ba1c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -8,3 +8,4 @@ buildhub-lambda*.zip jobs/.cache ui/node_modules/ +csv-download-directory/ diff --git a/.gitignore b/.gitignore index 367ccd2f..0b75c38e 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ buildhub-lambda-*.zip .docker-build .metadata-*.json .records-kinto-build-hub-releases.json +csv-download-directory/ diff --git a/docker-compose.yml b/docker-compose.yml index bb84e9c7..d7038314 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,7 @@ services: - "kinto" environment: - SERVER_URL=http://kinto:8888/v1 + - CSV_DOWNLOAD_DIRECTORY=./csv-download-directory volumes: - $PWD:/app # All things within the 'buildhub' directive are meant to be executed diff --git a/jobs/buildhub/inventory_to_records.py b/jobs/buildhub/inventory_to_records.py index fbf9f49c..8e7c4005 100644 --- a/jobs/buildhub/inventory_to_records.py +++ b/jobs/buildhub/inventory_to_records.py @@ -321,7 +321,7 @@ async def fetch_release_metadata(session, record): async def process_batch(session, batch, skip_incomplete): # Parallel fetch of metadata for each item of the batch. - logger.info('Fetch metadata for {} releases...'.format(len(batch))) + logger.debug('Fetch metadata for {} releases...'.format(len(batch))) futures = [fetch_metadata(session, record) for record in batch] metadatas = await asyncio.gather(*futures) results = [merge_metadata(record, metadata) @@ -439,7 +439,6 @@ def deduplicate_entries(entries): entry['LastModifiedDate'], '%Y-%m-%dT%H:%M:%S.%fZ' ) - if min_last_modified and lastmodified < min_last_modified: continue diff --git a/jobs/buildhub/s3_inventory_to_kinto.py b/jobs/buildhub/s3_inventory_to_kinto.py index 2f441d52..63839967 100644 --- a/jobs/buildhub/s3_inventory_to_kinto.py +++ b/jobs/buildhub/s3_inventory_to_kinto.py @@ -5,15 +5,21 @@ import re import asyncio import datetime +import glob import json import logging import os import pkgutil +import tempfile +import time import zlib from concurrent.futures import ThreadPoolExecutor +import aiofiles import aiobotocore import botocore +from decouple import config +from aiohttp.client_exceptions import ClientPayloadError import kinto_http import raven from raven.handlers.logging import SentryHandler @@ -33,6 +39,7 @@ 'delivery-{inventory}/' ) CHUNK_SIZE = 1024 * 256 # 256 KB +MAX_CSV_DOWNLOAD_AGE = 60 * 60 * 24 * 2 # two days INITIALIZE_SERVER = os.getenv('INITIALIZE_SERVER', 'true').lower() == 'true' @@ -40,6 +47,11 @@ # to NOT be skipped. MIN_AGE_LAST_MODIFIED_HOURS = int(os.getenv('MIN_AGE_LAST_MODIFIED_HOURS', 0)) +CSV_DOWNLOAD_DIRECTORY = config( + 'CSV_DOWNLOAD_DIRECTORY', + tempfile.gettempdir() +) + # Optional Sentry with synchronuous client. SENTRY_DSN = os.getenv('SENTRY_DSN') sentry = raven.Client(SENTRY_DSN, transport=raven.transport.http.HTTPTransport) @@ -129,12 +141,28 @@ async def list_manifest_entries(loop, s3_client, inventory): async with manifest['Body'] as stream: body = await stream.read() manifest_content = json.loads(body.decode('utf-8')) - # Return keys of csv.gz files for f in manifest_content['files']: - yield f['key'] - - -async def download_csv(loop, s3_client, keys_stream, chunk_size=CHUNK_SIZE): + # Here, each 'f' is a dictionary that looks something like this: + # + # { + # "key" : "inventories/net-mozaw...f-b1a0-5fb25bb83752.csv.gz", + # "size" : 7945521, + # "MD5checksum" : "7454b0d773000f790f15b867ee152049" + # } + # + # We yield the whole thing. The key is used to download from S3. + # The MD5checksum is used to know how to store the file on + # disk for caching. + yield f + + +async def download_csv( + loop, + s3_client, + files_stream, + chunk_size=CHUNK_SIZE, + download_directory=CSV_DOWNLOAD_DIRECTORY, +): """ Download the S3 object of each key and return deflated data chunks (CSV). :param loop: asyncio event loop. @@ -142,12 +170,53 @@ async def download_csv(loop, s3_client, keys_stream, chunk_size=CHUNK_SIZE): :param keys_stream async generator: List of object keys for the csv.gz manifests. """ - async for key in keys_stream: - key = 'public/' + key - logger.info('Fetching inventory piece {}'.format(key)) - file_csv_gz = await s3_client.get_object(Bucket=BUCKET, Key=key) + + # Make sure the directory exists if it wasn't already created. + if not os.path.isdir(download_directory): + os.makedirs(download_directory, exists_ok=True) + + # Look for old download junk in the download directory. + too_old = MAX_CSV_DOWNLOAD_AGE + for file_path in glob.glob(os.path.join(download_directory, '*.csv.gz')): + age = time.time() - os.stat(file_path).st_mtime + if age > too_old: + logger.info( + f'Delete old download file {file_path} ' + f'({age} seconds old)' + ) + os.remove(file_path) + + async for files in files_stream: + # If it doesn't exist on disk, download to disk. + file_path = os.path.join( + download_directory, + files['MD5checksum'] + '.csv.gz' + ) + # The file neither exists or has data. + if os.path.isfile(file_path) and os.stat(file_path).st_size: + logger.debug(f'{file_path} was already downloaded locally') + else: + key = 'public/' + files['key'] + logger.info('Fetching inventory piece {}'.format(key)) + file_csv_gz = await s3_client.get_object(Bucket=BUCKET, Key=key) + try: + async with aiofiles.open(file_path, 'wb') as destination: + async with file_csv_gz['Body'] as source: + while 'there are chunks to read': + gzip_chunk = await source.read(chunk_size) + if not gzip_chunk: + break # End of response. + await destination.write(gzip_chunk) + size = os.stat(file_path).st_size + logger.info(f'Downloaded {key} to {file_path} ({size} bytes)') + except ClientPayloadError: + if os.path.exists(file_path): + os.remove(file_path) + raise + + # Now we expect the file to exist locally. Let's read it. gzip = zlib.decompressobj(zlib.MAX_WBITS | 16) - async with file_csv_gz['Body'] as stream: + async with aiofiles.open(file_path, 'rb') as stream: while 'there are chunks to read': gzip_chunk = await stream.read(chunk_size) if not gzip_chunk: @@ -195,8 +264,8 @@ async def main(loop, inventory): async with session.create_client( 's3', region_name=REGION_NAME, config=boto_config ) as client: - keys_stream = list_manifest_entries(loop, client, inventory) - csv_stream = download_csv(loop, client, keys_stream) + files_stream = list_manifest_entries(loop, client, inventory) + csv_stream = download_csv(loop, client, files_stream) records_stream = csv_to_records( loop, csv_stream, diff --git a/jobs/requirements/default.txt b/jobs/requirements/default.txt index a58554dc..129ef95c 100644 --- a/jobs/requirements/default.txt +++ b/jobs/requirements/default.txt @@ -33,3 +33,6 @@ datadog==0.20.0 \ --hash=sha256:7bb4af836d7422a6138f983b81c16acd56c2d608913982602856cc273ae74768 python-decouple==3.1 \ --hash=sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d +aiofiles==0.3.2 \ + --hash=sha256:25c66ea3872d05d53292a6b3f7fa0f86691512076446d83a505d227b5e76f668 \ + --hash=sha256:852a493a877b73e11823bfd4e8e5ef2610d70d12c9eaed961bcd9124d8de8c10 diff --git a/jobs/tests/test_inventory_to_records.py b/jobs/tests/test_inventory_to_records.py index 1f4e81e9..9faa2d8e 100644 --- a/jobs/tests/test_inventory_to_records.py +++ b/jobs/tests/test_inventory_to_records.py @@ -799,6 +799,8 @@ async def setUp(self): mocked.get(utils.ARCHIVE_URL + url, payload=payload) self.addCleanup(mocked.stop) + # inventory_to_records._candidates_build_folder.clear() + async def async_gen(): _csv_input = ( 'net-mozaws-delivery-firefox,pub/firefox/releases/51.0/win64/' @@ -827,6 +829,7 @@ async def test_csv_to_records(self): async for r in output: records.append(r) + assert len(records) == 1 assert records == [{ 'data': { 'id': 'firefox_51-0_win64_fy-nl', diff --git a/jobs/tests/test_s3_inventory_to_kinto.py b/jobs/tests/test_s3_inventory_to_kinto.py index a59b860a..708c4864 100644 --- a/jobs/tests/test_s3_inventory_to_kinto.py +++ b/jobs/tests/test_s3_inventory_to_kinto.py @@ -3,6 +3,7 @@ # file, you can obtain one at http://mozilla.org/MPL/2.0/. import base64 +import json import asynctest @@ -33,7 +34,12 @@ async def __aexit__(self, *args): return self async def read(self): - return b'{"files": [{"key": "a/b"}, {"key": "c/d"}]}' + return json.dumps({ + 'files': [ + {'key': 'a/b', 'size': 1234, 'MD5checksum': 'eaf123'}, + {'key': 'c/d', 'size': 2222, 'MD5checksum': 'ef001'}, + ] + }).encode('utf-8') class FakeClient: def get_paginator(self, *args): @@ -53,7 +59,10 @@ async def test_return_keys_of_latest_manifest(self): 'firefox' ): results.append(r) - assert results == ['a/b', 'c/d'] + assert results == [ + {'key': 'a/b', 'size': 1234, 'MD5checksum': 'eaf123'}, + {'key': 'c/d', 'size': 2222, 'MD5checksum': 'ef001'}, + ] class DownloadCSV(asynctest.TestCase): @@ -84,11 +93,15 @@ async def get_object(self, Bucket, Key): async def test_unzip_chunks(self): - async def keys(): - yield 'key-1' + async def files_iterator(): + yield { + 'key': 'key-1', + 'size': 123456, + 'MD5checksum': 'deadbeef0123', + } - keys = keys() + files = files_iterator() results = [] - async for r in download_csv(self.loop, self.client, keys): + async for r in download_csv(self.loop, self.client, files): results.append(r) assert results == [b"1;2;3;4\n5;6"]