Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/-/open-csv-from-url endpoint #14

Closed
Tracked by #107
simonw opened this issue Sep 12, 2021 · 6 comments
Closed
Tracked by #107

/-/open-csv-from-url endpoint #14

simonw opened this issue Sep 12, 2021 · 6 comments
Labels
enhancement New feature or request

Comments

@simonw
Copy link
Owner

simonw commented Sep 12, 2021

Need a /-/open-csv-from-url endpoint.

Originally posted by @simonw in simonw/datasette-app#107 (comment)

@simonw simonw changed the title /-/open-csv-from-url endpoint. /-/open-csv-from-url endpoint Sep 12, 2021
@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

Design:

POST /-/open-csv-from-url
{"url": "https://..."}

Returns 200 {"ok": true, "path": "/temporary/newtable"} if it works, 400 or 500 {"ok": false, "error": "Error message"} if it fails.

@simonw simonw added the enhancement New feature or request label Sep 12, 2021
@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

Efficiently sucking down a large CSV file over HTTP and writing rows to the database without blocking the server (or blocking the Datasette write queue) is a bit tricky. I'm going to base this implementation on https://github.com/simonw/datasette-import-table/blob/9825cab7c3e1cdd7bcfd0b21dda4a22bda601f88/datasette_import_table/__init__.py#L84-L127

@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

This example looks useful too: https://github.com/mosquito/aiofile#async-csv-dict-reader

@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

And https://www.python-httpx.org/async/#streaming-responses for Response.aiter_lines()

@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

First attempt at that:

class AsyncDictReader:
    def __init__(self, async_line_iterator, **kwargs):
        self.async_line_iterator = async_line_iterator
        self.buffer = io.BytesIO()
        self.reader = DictReader(
            io.TextIOWrapper(
                self.buffer,
                encoding=kwargs.pop('encoding', 'utf-8'),
                errors=kwargs.pop('errors', 'replace'),
            ), **kwargs,
        )
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.async_line_iterator.__anext__()
            self.buffer.write(header.encode("latin-1"))

        line = await self.async_line_iterator.__anext__()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line.encode("latin-1"))
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result

url = "https://covid-19.datasettes.com/covid/us_census_county_populations_2019.csv?_stream=on&_size=max"
async with httpx.AsyncClient() as client:
    async with client.stream('GET', url) as response:
        async for row in AsyncDictReader(response.aiter_lines()):
            print(row)

@simonw
Copy link
Owner Author

simonw commented Sep 12, 2021

Simpler version that takes advantage of aiter_lines() having decoded to unicode already:

class AsyncDictReader:
    def __init__(self, async_line_iterator):
        self.async_line_iterator = async_line_iterator
        self.buffer = io.StringIO()
        self.reader = DictReader(self.buffer)
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.async_line_iterator.__anext__()
            self.buffer.write(header)

        line = await self.async_line_iterator.__anext__()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line)
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant