Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

475 added acceptable content types #498

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ services:
- MONGO_INITDB_ROOT_PASSWORD=admin123
volumes:
- mongo:/data/db

adminer:
image: adminer
ports:
- "9000:8080"
environment:
ADMINER_PLUGINS: 'tables-filter'
ADMINER_DEFAULT_SERVER: 'postgres'
volumes:
postgres:
mongo:
2 changes: 1 addition & 1 deletion spinta/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def getall(

@command()
def getall():
"""Find multiple records in the databse."""
"""Find multiple records in the database."""


@command()
Expand Down
52 changes: 33 additions & 19 deletions spinta/commands/write.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import cgi
import os
import tempfile
import typing
from typing import Any
from typing import AsyncIterator, Union, Optional
from typing import overload

import itertools
import json
import pathlib

from typing import Any
from typing import AsyncIterator, Union, Optional
from typing import overload
from typing import Dict
from typing import Iterator

from authlib.oauth2.rfc6750.errors import InsufficientScopeError

from starlette.datastructures import URL, Headers
from starlette.requests import Request
from starlette.responses import Response

Expand All @@ -22,16 +22,15 @@
from spinta.accesslog import AccessLog
from spinta.accesslog import log_async_response
from spinta.auth import check_scope
from spinta.backends import check_type_value
from spinta.backends.helpers import get_select_prop_names
from spinta.backends.helpers import get_select_tree
from spinta.backends.components import Backend, BackendFeatures
from spinta.components import Context, Node, UrlParams, Action, DataItem, Namespace, Model, Property, DataStream, DataSubItem
from spinta.datasets.backends.helpers import detect_backend_from_content_type, get_stream_for_direct_upload
from spinta.renderer import render
from spinta.types.datatype import DataType, Object, Array, File, Ref, ExternalRef, Denorm, Inherit, BackRef
from spinta.urlparams import get_model_by_name
from spinta.utils.aiotools import agroupby
from spinta.utils.aiotools import aslice, alist, aiter
from spinta.utils.aiotools import agroupby, aslice, alist, aiter
from spinta.utils.errors import report_error
from spinta.utils.nestedstruct import flatten_value
from spinta.utils.streams import splitlines
Expand All @@ -40,7 +39,6 @@
from spinta.types.namespace import traverse_ns_models
from spinta.core.ufuncs import asttoexpr
from spinta.formats.components import Format
from spinta.types.text.components import Text

if typing.TYPE_CHECKING:
from spinta.backends.postgresql.components import WriteTransaction
Expand All @@ -49,6 +47,7 @@
STREAMING_CONTENT_TYPES = [
'application/x-jsonlines',
'application/x-ndjson',
'application/json'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

application/json should not be here, this is not a streaming content type.

]


Expand All @@ -74,20 +73,28 @@ async def push(
scope = scope.prop

stop_on_error = not params.fault_tolerant
if is_streaming_request(request):
content_type = get_content_type_from_request(request)
temp_file = tempfile.NamedTemporaryFile(delete=False)
context.attach('uploaded_file', temp_file)
if is_streaming_request(content_type):
stream = _read_request_stream(
context, request, scope, action, stop_on_error,
)
else:
stream = _read_request_body(
context, request, scope, action, params, stop_on_error,
)
backend = detect_backend_from_content_type(context, content_type)
if backend:
async for line in request.stream():
temp_file.write(line)
rows = commands.getall(context, scope, backend, file_path=temp_file.name)
stream = get_stream_for_direct_upload(context, rows, request, params, content_type)
else:
stream = _read_request_body(
context, request, scope, action, params, stop_on_error,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will never be called.

dstream = push_stream(context, stream,
stop_on_error=stop_on_error,
params=params)

dstream = log_async_response(context, dstream)

batch = False
if params.summary:
status_code, response = await _summary_response(context, dstream)
Expand All @@ -105,6 +112,8 @@ async def push(
dstream,
)
headers = prepare_headers(context, scope, response, action, is_batch=batch)
if temp_file:
os.unlink(temp_file.name)
return render(context, request, scope, params, response,
action=action, status_code=status_code, headers=headers)

Expand Down Expand Up @@ -140,7 +149,7 @@ async def push_stream(
context: Context,
stream: AsyncIterator[DataItem],
stop_on_error: bool = True,
params: UrlParams = None
params: UrlParams = None,
) -> AsyncIterator[DataItem]:

cmds = {
Expand Down Expand Up @@ -211,15 +220,20 @@ def _stream_group_key(data: DataItem):
return data.model, data.prop, data.backend, data.action


def is_streaming_request(request: Request):
def get_content_type_from_request(request: Request):
content_type = request.headers.get('content-type')
if content_type:
content_type = cgi.parse_header(content_type)[0]
return content_type


def is_streaming_request(content_type):
return content_type in STREAMING_CONTENT_TYPES


async def is_batch(request: Request, node: Node):
if is_streaming_request(request):
content_type = get_content_type_from_request(request)
if is_streaming_request(content_type):
return True

ct = request.headers.get('content-type')
Expand Down
18 changes: 11 additions & 7 deletions spinta/datasets/backends/dataframe/commands/read.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import json
import os
import pathlib

import numpy as np
Expand All @@ -9,11 +10,10 @@
import dask
import requests
from dask.dataframe import DataFrame
from dask.dataframe.utils import make_meta
from lxml import etree

from spinta import commands
from spinta.components import Context, Property, Model, UrlParams
from spinta.components import Context, Property, Model
from spinta.core.ufuncs import Expr
from spinta.datasets.backends.dataframe.components import DaskBackend, Csv, Xml, Json
from spinta.datasets.backends.dataframe.commands.query import DaskDataFrameQueryBuilder, Selected
Expand Down Expand Up @@ -408,6 +408,7 @@ def getall(
backend: Csv,
*,
query: Expr = None,
file_path: pathlib.Path = None,
**kwargs
) -> Iterator[ObjectData]:
base = model.external.resource.external
Expand All @@ -417,12 +418,15 @@ def getall(
builder = DaskDataFrameQueryBuilder(context)
builder.update(model=model)
for params in iterparams(context, model):
if base:
url = urllib.parse.urljoin(base, model.external.name)
if file_path and os.path.exists(file_path):
df = dask.dataframe.read_csv(file_path, sep=resource_builder.seperator)
else:
url = model.external.name
url = url.format(**params)
df = dask.dataframe.read_csv(url, sep=resource_builder.seperator)
if base:
url = urllib.parse.urljoin(base, model.external.name)
else:
url = model.external.name
url = url.format(**params)
df = dask.dataframe.read_csv(url, sep=resource_builder.seperator)
yield from _dask_get_all(context, query, df, backend, model, builder)


Expand Down
3 changes: 3 additions & 0 deletions spinta/datasets/backends/dataframe/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def begin(self):

class Xml(DaskBackend):
type: str = 'xml'
accept_types: set = {'text/xml'}

@contextlib.contextmanager
def begin(self):
Expand All @@ -21,6 +22,7 @@ def begin(self):

class Csv(DaskBackend):
type: str = 'csv'
accept_types: set = {'text/csv'}

@contextlib.contextmanager
def begin(self):
Expand All @@ -29,6 +31,7 @@ def begin(self):

class Json(DaskBackend):
type: str = 'json'
accept_types: set = {'application/json'}

@contextlib.contextmanager
def begin(self):
Expand Down
35 changes: 33 additions & 2 deletions spinta/datasets/backends/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from typing import Any

from spinta.components import Property, Model
from spinta.components import Model
from spinta.core.ufuncs import Env
from spinta.datasets.enums import Level
from spinta.cli.helpers.store import prepare_manifest
from spinta.components import Property, Context
from spinta.datasets.backends.notimpl.components import BackendNotImplemented
from spinta.datasets.components import ExternalBackend
from spinta.datasets.keymaps.components import KeyMap
from spinta.exceptions import GivenValueCountMissmatch
from spinta.exceptions import GivenValueCountMissmatch, NoMatchingBackendDetected
from spinta.types.datatype import Ref, Array
from spinta.formats.helpers import get_response_type_as_format_class


def handle_ref_key_assignment(keymap: KeyMap, env: Env, value: Any, ref: Ref) -> dict:
Expand Down Expand Up @@ -80,3 +85,29 @@ def extract_values_from_row(row: Any, model: Model, keys: list):
if len(return_list) == 1:
return_list = return_list[0]
return return_list


def detect_backend_from_content_type(context, content_type):
config = context.get('config')
backends = config.components['backends']
for backend in backends.values():
if issubclass(backend, ExternalBackend) and not issubclass(backend, BackendNotImplemented):
if backend.accept_types and content_type in backend.accept_types:
return backend()
raise NoMatchingBackendDetected


def get_stream_for_direct_upload(
context: Context,
rows,
request,
params,
content_type
):
from spinta.commands.write import write
store = prepare_manifest(context)
manifest = store.manifest
root = manifest.objects['ns']['']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if access rights are checked here? We should limit client to only write to a specified namespace or model. And here, we just take root namespace, which probably gives permission to write data to any namespace?

fmt = get_response_type_as_format_class(context, request, params, content_type)
stream = write(context, root, rows, changed=True, fmt=fmt)
return stream
1 change: 1 addition & 0 deletions spinta/datasets/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(self):
class ExternalBackend(Backend):
engine: Engine = None
schema: sa.MetaData = None
accept_types: set = set()


class External(Node):
Expand Down
4 changes: 4 additions & 0 deletions spinta/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,7 @@ class DuplicateRdfPrefixMissmatch(UserError):

class InvalidName(UserError):
template = 'Invalid {name!r} {type} code name.'


class NoMatchingBackendDetected(UserError):
template = "Can't find a matching external backend for the given content type {content_type!r}"
14 changes: 14 additions & 0 deletions spinta/formats/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from spinta.components import Model
from spinta.components import UrlParams
from spinta.types.datatype import Array, ArrayBackRef, BackRef
from spinta.formats.components import Format
from spinta.formats.json.components import Json
from spinta.types.datatype import Inherit
from spinta.types.datatype import ExternalRef
from spinta.types.datatype import DataType
from spinta.types.datatype import Object
from spinta.types.datatype import File
from spinta.types.datatype import Ref
from spinta.types.text.components import Text
from spinta.urlparams import get_response_type
from spinta.utils.data import take


Expand Down Expand Up @@ -174,3 +177,14 @@ def get_model_tabular_header(
)
header = list(_get_model_header(model, names, select, reserved))
return header


def get_response_type_as_format_class(context, request, params, content_type):
response_type = get_response_type(context, request, params)
if isinstance(response_type, Format):
return response_type
elif response_type == 'json':
return Json
raise ValueError(
f"Can't find a matching response type for the given format type {params.fmt!r}"
)
Loading
Loading