Skip to content

Commit

Permalink
/-/open-csv-from-url endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Sep 12, 2021
1 parent 059aed7 commit d8eb4c3
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 36 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ Permanently imports a CSV or TSV file into the specified database. Used by the "

Returns HTTP 200 status with `{"ok": True, "path": "/database_name/table"}` if it works, 400 or 500 with an `"error"` JSON string message if it fails.

### /-/open-csv-from-url

```
POST /-/open-csv-from-url
{"url": "https://example.com/file.csv"}
```
Imports a CSV file into the default `/temporary` in-memory database. Used by the "Open CSV from URL..." menu option.

Returns HTTP 200 status with `{"ok": True, "path": "/temporary/table"}` if it works, 400 or 500 with an `"error"` JSON string message if it fails.

### /-/dump-temporary-to-file

```
Expand Down
72 changes: 46 additions & 26 deletions datasette_app_support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pathlib
import secrets
import sqlite_utils
from .utils import derive_table_name, import_csv_url_to_database


@hookimpl
Expand Down Expand Up @@ -68,14 +69,18 @@ def permission_allowed(actor, action, resource):
return False


unauthorized = Response.json({"ok": False, "error": "Not authorized"}, status=401)


@hookimpl
def extra_css_urls(datasette):
return [datasette.urls.static_plugins("datasette_app_support", "sticky-footer.css")]


def error(message, status=400):
return Response.json({"ok": False, "error": message}, status=status)


unauthorized = error("Not authorized", status=401)


def check_auth(request):
env_token = os.environ.get("DATASETTE_API_TOKEN")
request_token = request.headers.get("authorization", "").replace("Bearer ", "")
Expand All @@ -93,25 +98,21 @@ async def open_database_file(request, datasette):
try:
filepath = await _filepath_from_json_body(request)
except PathError as e:
return Response.json({"ok": False, "error": e.message}, status=400)
return error(e.message)
# Confirm it's a valid SQLite database
conn = sqlite3.connect(filepath)
try:
conn.execute("select * from sqlite_master")
except sqlite3.DatabaseError:
return Response.json(
{"ok": False, "error": "Not a valid SQLite database"}, status=400
)
return error("Not a valid SQLite database")
# Is that file already open?
existing_paths = {
pathlib.Path(db.path).resolve()
for db in datasette.databases.values()
if db.path
}
if pathlib.Path(filepath).resolve() in existing_paths:
return Response.json(
{"ok": False, "error": "That file is already open"}, status=400
)
return error("That file is already open")
added_db = datasette.add_database(
Database(datasette, path=filepath, is_mutable=True)
)
Expand All @@ -124,12 +125,10 @@ async def new_empty_database_file(request, datasette):
try:
filepath = await _filepath_from_json_body(request, must_exist=False)
except PathError as e:
return Response.json({"ok": False, "error": e.message}, status=400)
return error(e.message)
# File should not exist yet
if os.path.exists(filepath):
return Response.json(
{"ok": False, "error": "That file already exists"}, status=400
)
return error("That file already exists")

conn = sqlite3.connect(filepath)
conn.execute("vacuum")
Expand Down Expand Up @@ -166,6 +165,32 @@ async def open_csv_file(request, datasette):
return await _import_csv_file(request, datasette, database="temporary")


async def open_csv_from_url(request, datasette):
body = await request.post_body()
try:
data = json.loads(body)
except ValueError:
return error("Invalid request body, should be JSON")
url = data.get("url")
if not url or not (url.startswith("http://") or url.startswith("https://")):
return error("URL must start with http:// or https://")
database = data.get("database") or "temporary"
if database and (database not in datasette.databases):
return error("Invalid database")
db = datasette.get_database(database)
try:
table_name, num_rows = await import_csv_url_to_database(url, db)
except Exception as e:
return error(str(e), status=500)
return Response.json(
{
"ok": True,
"path": datasette.urls.table(db.name, table_name),
"rows": num_rows,
}
)


async def import_csv_file(request, datasette):
return await _import_csv_file(request, datasette)

Expand All @@ -176,22 +201,16 @@ async def _import_csv_file(request, datasette, database=None):
try:
filepath, body = await _filepath_from_json_body(request, return_data=True)
except PathError as e:
return Response.json({"ok": False, "error": e.message}, status=400)
return error(e.message)

if database is None:
database = body.get("database")
if not database or database not in datasette.databases:
return Response.json({"ok": False, "error": "Invalid database"}, status=400)
return error("Invalid database")

db = datasette.get_database(database)

# Derive a table name
table_name = pathlib.Path(filepath).stem
root_table_name = table_name
i = 1
while await db.table_exists(table_name):
table_name = "{}_{}".format(root_table_name, i)
i += 1
table_name = await derive_table_name(db, pathlib.Path(filepath).stem)

try:
rows = rows_from_file(open(filepath, "rb"))[0]
Expand All @@ -210,7 +229,7 @@ def write_rows(conn):
}
)
except Exception as e:
return Response.json({"ok": False, "error": str(e)}, status=500)
return error(str(e), status=500)


async def auth_app_user(request, datasette):
Expand All @@ -233,7 +252,7 @@ async def dump_temporary_to_file(request, datasette):
try:
filepath = await _filepath_from_json_body(request, must_exist=False)
except PathError as e:
return Response.json({"ok": False, "error": e.message}, status=400)
return error(e.message)
db = datasette.get_database("temporary")

def backup(conn):
Expand All @@ -251,7 +270,7 @@ async def restore_temporary_from_file(request, datasette):
try:
filepath = await _filepath_from_json_body(request)
except PathError as e:
return Response.json({"ok": False, "error": e.message}, status=400)
return error(e.message)
temporary = datasette.get_database("temporary")
backup_db = sqlite3.connect(filepath, uri=True)
temporary_conn = temporary.connect(write=True)
Expand Down Expand Up @@ -280,6 +299,7 @@ def register_routes():
(r"^/-/open-database-file$", open_database_file),
(r"^/-/new-empty-database-file$", new_empty_database_file),
(r"^/-/open-csv-file$", open_csv_file),
(r"^/-/open-csv-from-url$", open_csv_from_url),
(r"^/-/import-csv-file$", import_csv_file),
(r"^/-/auth-app-user$", auth_app_user),
(r"^/-/dump-temporary-to-file$", dump_temporary_to_file),
Expand Down
88 changes: 88 additions & 0 deletions datasette_app_support/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sqlite_utils
from csv import DictReader
import io
import httpx
import urllib


async def derive_table_name(db, table_name):
root_table_name = table_name
i = 1
while await db.table_exists(table_name):
table_name = "{}_{}".format(root_table_name, i)
i += 1
return table_name


class AsyncDictReader:
def __init__(self, async_line_iterator):
self.async_line_iterator = async_line_iterator
self.buffer = io.StringIO()
self.reader = DictReader(self.buffer)
self.line_num = 0

def __aiter__(self):
return self

async def __anext__(self):
if self.line_num == 0:
header = await self.async_line_iterator.__anext__()
self.buffer.write(header)

line = await self.async_line_iterator.__anext__()

if not line:
raise StopAsyncIteration

self.buffer.write(line)
self.buffer.seek(0)

try:
result = next(self.reader)
except StopIteration as e:
raise StopAsyncIteration from e

self.buffer.seek(0)
self.buffer.truncate(0)
self.line_num = self.reader.line_num

return result


async def import_csv_url_to_database(url, db):
# Returns table_name, num_rows
last_path_bit = urllib.parse.urlparse(url).path.split("/")[-1]
last_path_bit_minus_extension = last_path_bit.rsplit(".", 1)[0]
table_name = await derive_table_name(
db, last_path_bit_minus_extension or "data_from_csv"
)

async def write_batch(rows):
def _write(conn):
db_conn = sqlite_utils.Database(conn)
with db_conn.conn:
db_conn[table_name].insert_all(rows)

r = await db.execute_write_fn(_write, block=True)
return r

async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as response:
reader = AsyncDictReader(response.aiter_lines())
batch = []
num_rows = 0
async for row in reader:
num_rows += 1
batch.append(row)
if len(batch) >= 100:
# Write this batch to disk
await write_batch(batch)
batch = []
if batch:
await write_batch(batch)

import asyncio

await asyncio.sleep(1)

return table_name, num_rows
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_long_description():
},
entry_points={"datasette": ["app_support = datasette_app_support"]},
install_requires=["datasette>=0.59a2", "sqlite-utils", "packaging"],
extras_require={"test": ["pytest", "pytest-asyncio", "black"]},
extras_require={"test": ["pytest", "pytest-asyncio", "black", "pytest-httpx"]},
tests_require=["datasette-app-support[test]"],
python_requires=">=3.6",
)
29 changes: 29 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,32 @@
def mock_settings_env_vars():
with mock.patch.dict(os.environ, {"DATASETTE_API_TOKEN": "fake-token"}):
yield


@pytest.fixture(autouse=True)
def mock_datasette_plugin_api(httpx_mock):
httpx_mock.add_response(
url="https://datasette.io/content/plugins.json?_shape=array",
json=[
{
"name": "datasette-write",
"full_name": "simonw/datasette-write",
"owner": "simonw",
"description": "Datasette plugin providing a UI for executing SQL writes against the database",
"stargazers_count": 3,
"tag_name": "0.2",
"latest_release_at": "2021-09-11T05:59:43Z",
"created_at": "2020-06-29T02:27:31Z",
"openGraphImageUrl": "https://avatars.githubusercontent.com/u/9599?s=400&v=4",
"usesCustomOpenGraphImage": 0,
"downloads_this_week": 163,
"is_plugin": 1,
"is_tool": 0,
}
],
)


@pytest.fixture
def non_mocked_hosts():
return ["localhost"]
10 changes: 1 addition & 9 deletions tests/test_app_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@
import pytest


@pytest.mark.asyncio
async def test_plugin_is_installed():
datasette = Datasette([], memory=True)
response = await datasette.client.get("/-/plugins.json")
assert response.status_code == 200
installed_plugins = {p["name"] for p in response.json()}
assert "datasette-app-support" in installed_plugins


@pytest.mark.asyncio
async def test_static_asset_sticky_footer():
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
response = await datasette.client.get(
"/-/static-plugins/datasette_app_support/sticky-footer.css"
)
Expand Down
1 change: 1 addition & 0 deletions tests/test_auth_app_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@pytest.mark.asyncio
async def test_auth_app_user():
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
response = await datasette.client.post(
"/-/auth-app-user",
json={"redirect": "/-/metadata"},
Expand Down
1 change: 1 addition & 0 deletions tests/test_new_empty_database_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@pytest.mark.asyncio
async def test_new_empty_database_file(tmpdir):
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
path = str(tmpdir / "new.db")
response = await datasette.client.post(
"/-/new-empty-database-file",
Expand Down
21 changes: 21 additions & 0 deletions tests/test_open_csv_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,24 @@ async def test_import_csv_files(tmpdir):
assert response.json() == {"ok": True, "path": "/data/demo", "rows": 1}
response2 = await datasette.client.get("/data/demo.json?_shape=array")
assert response2.json() == [{"rowid": 1, "id": "123", "name": "Hello"}]


@pytest.mark.asyncio
async def test_import_csv_url(httpx_mock):
httpx_mock.add_response(
url="http://example.com/test.csv", data="id,name\n1,Banyan\n2,Crystal"
)
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
response = await datasette.client.post(
"/-/open-csv-from-url",
json={"url": "http://example.com/test.csv"},
headers={"Authorization": "Bearer fake-token"},
)
assert response.status_code == 200
assert response.json() == {"ok": True, "path": "/temporary/test", "rows": 2}
response2 = await datasette.client.get("/temporary/test.json?_shape=array")
assert response2.json() == [
{"rowid": 1, "id": "1", "name": "Banyan"},
{"rowid": 2, "id": "2", "name": "Crystal"},
]
2 changes: 2 additions & 0 deletions tests/test_open_database_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@pytest.mark.asyncio
async def test_open_database_files(tmpdir):
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
path = str(tmpdir / "test.db")
db = sqlite3.connect(path)
db.execute("create table foo (id integer primary key)")
Expand Down Expand Up @@ -35,6 +36,7 @@ async def test_open_database_files(tmpdir):
@pytest.mark.parametrize("file", ("does-not-exist.txt", "invalid.txt"))
async def test_open_database_files_invalid(file, tmpdir):
datasette = Datasette([], memory=True)
await datasette.invoke_startup()
if file == "does-not-exist.txt":
path = str(tmpdir / "does-not-exists.txt")
elif file == "invalid.txt":
Expand Down

0 comments on commit d8eb4c3

Please sign in to comment.