Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Disk cache the manifests, fixes #392 #403

Merged
merged 7 commits into from
Apr 19, 2018
Merged

Disk cache the manifests, fixes #392 #403

merged 7 commits into from
Apr 19, 2018

Conversation

peterbe
Copy link
Contributor

@peterbe peterbe commented Apr 13, 2018

The gist of this is that when we download the .csv.gz files we break it up.

  1. First clear out any old downloaded .csv.gz files that are older than 48 hours.
  2. For each .csv.gz file in the manifest.json, instead of downloading it and yielding each CSV row, we download it to $CSV_DOWNLOAD_DIRECTORY (*)
  3. After the file has been downloaded, instead of reading it from a network URL, we read it from the local file.

(*) In docker-compose this becomes ./csv-download-directory but outside Docker it becomes $TMPDIR/csv-download-directory. This way it's kept between stopping and starting Docker.

@peterbe peterbe requested a review from leplatrem April 13, 2018 19:36
if not os.path.isdir(download_directory):
os.mkdir(download_directory)

print("LOOKING AT", download_directory)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I have to delete this.


# Make sure the directory exists if it wasn't already created.
if not os.path.isdir(download_directory):
os.mkdir(download_directory)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use makedirs(..., exists_ok=True) ?

os.mkdir(download_directory)

# Look for old download junk in the download directory.
too_old = 60 * 60 * 24 * 2 # two days
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to DOWNLOAD_MAX_AGE constant ?

files['MD5checksum'] + '.csv.gz'
)
# The file neither exists or has data.
if os.path.isfile(file_path) and os.stat(file_path).st_size:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add explicit st_size > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out-of-curiousity; why?

Isn't it explicit already that the test is for the st_size to be anything greater than 0?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matter of taste maybe :)

gzip_chunk = await source.read(chunk_size)
if not gzip_chunk:
break # End of response.
await destination.write(gzip_chunk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here maybe you should try/except and delete the partially downloaded file when an error happens, or maybe you don't want to check the md5sum when resuming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was totally not sure how to do this in the aiosync world. Especially not how to test it. But I did write this, locally:

import time
import os
import asyncio
import aiohttp
import aiofiles

CHUNK_SIZE = 1024 * 256  # 256 KB

async def download_csv(loop, url):
    file_path = '/tmp/' + os.path.basename(url).split('?')[0]
    os.path.isfile(file_path) and os.remove(file_path)

    async with aiohttp.ClientSession(loop=loop) as session:
        try:
            async with aiofiles.open(file_path, 'wb') as destination:
                async with session.get(url) as resp:
                    print(resp.status)
                    while True:
                        print(resp.status, end='', flush=True)
                        time.sleep(0.05)
                        chunk = await resp.content.read(CHUNK_SIZE)
                        if not chunk:
                            break
                        await destination.write(chunk)
        except aiohttp.client_exceptions.ClientPayloadError:
            print('\n')
            mb = os.stat(file_path).st_size / 1024 / 1024
            print(f'WROTE {mb:.1f}MB')
            os.remove(file_path)
            raise
    print('\nall Done!')
def run():

    loop = asyncio.get_event_loop()
    url ='http://10.0.0.80:8080/file.csv'
    loop.run_until_complete(download_csv(loop, url))

if __name__ == '__main__':
    run()

That's using plain aiohttp.ClientSession(loop=loop) though. Not await s3_client.get_object(...)

When I run that in one terminal, then quickly switch to the other terminal where I run the HTTP server on :8080 and kill it, the except block successfully cleans up the half-downloaded file.

r?

except ClientPayloadError:
if os.path.exists(file_path):
os.remove(file_path)
raise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r+ :)

@peterbe peterbe merged commit 178f97a into mozilla-services:master Apr 19, 2018
@peterbe peterbe deleted the disk-cache-the-manifests-fixes-392 branch April 19, 2018 13:18
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants