Skip to content

Commit

Permalink
POST/ processes endpoint (#22)
Browse files Browse the repository at this point in the history
* Adding IOClient and jobs results and payload endpoints

* Adding write object methods for IOClient

* Adding delete bucket methods for IOClient

* post processes endpoint

* use discriminant union for workflow configs

* addressing PR comments

* update status code

* fixup all the __root__ nonsense

* clean up process execute insert queries

* empty string not none for create/drop db name

* workflow name doesn't match process id check

* addressing PR comments

* remove id in payload/update tests

* dont use relative path in bad workflow config

---------

Co-authored-by: Hector Machin <hector.machin@ursaspace.com>
Co-authored-by: jkeifer <jkeifer@element84.com>
  • Loading branch information
3 people authored May 26, 2023
1 parent a0504af commit 14c48ee
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 109 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ git commit -m "message" --no-verify
Start the swoop api by running:

```commandline
uvicorn swoop.api.main:app --host 0.0.0.0 --port 8000
uvicorn swoop.api.main:app --host 0.0.0.0 --port 8000 --reload
```

## Testing
Expand Down
6 changes: 3 additions & 3 deletions db/migrations/20230501205418_cache.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ CREATE TABLE IF NOT EXISTS swoop.input_item (

CREATE TABLE IF NOT EXISTS swoop.payload_cache (
payload_uuid uuid DEFAULT gen_random_uuid() PRIMARY KEY,
payload_hash bytea,
workflow_version smallint,
workflow_name text,
payload_hash bytea UNIQUE,
workflow_version smallint NOT NULL,
workflow_name text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
invalid_after timestamptz
);
Expand Down
6 changes: 3 additions & 3 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ INSERT INTO swoop.event_state (name, description) VALUES

CREATE TABLE IF NOT EXISTS swoop.payload_cache (
payload_uuid uuid DEFAULT gen_random_uuid() PRIMARY KEY,
payload_hash bytea,
workflow_version smallint,
workflow_name text,
payload_hash bytea UNIQUE,
workflow_version smallint NOT NULL,
workflow_name text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
invalid_after timestamptz
);
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ readme = {file = "README.md"}

[tool.isort]
profile = "black"

[tool.pytest.ini_options]
minversion = "6.0"
4 changes: 3 additions & 1 deletion src/swoop/api/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path

from pydantic import BaseSettings, Field


Expand Down Expand Up @@ -37,7 +39,7 @@ def __init__(self, *args, _env_file=None, **kwargs):
bucket_name: str
execution_dir: str
s3_endpoint: str
workflow_config_file: str
workflow_config_file: Path

class Config:
env_prefix = "swoop_"
6 changes: 6 additions & 0 deletions src/swoop/api/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class SwoopApiException(Exception):
pass


class WorkflowConfigError(SwoopApiException, ValueError):
pass
17 changes: 0 additions & 17 deletions src/swoop/api/models.py → src/swoop/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ class ConfClasses(BaseModel):
conformsTo: list[str]


class Response(Enum):
raw = "raw"
document = "document"


class Type(Enum):
process = "process"

Expand Down Expand Up @@ -191,8 +186,6 @@ class ProcessSummary(DescriptionType):
outputTransmission: list[TransmissionMode] | None = None
description: str | None = None
handler: str | None = None
argoTemplate: str | None = None
cacheEnabled: bool | None = None
cacheKeyHashIncludes: list[str] | None = None
cacheKeyHashExcludes: list[str] | None = None
links: list[Link] | None = None
Expand All @@ -211,16 +204,6 @@ class InlineOrRefData(BaseModel):
__root__: InputValueNoObject | QualifiedInputValue | Link


class Execute(BaseModel):
# TODO: I believe this is where we need to specify the input payload schema
inputs: dict[str, InlineOrRefData | list[InlineOrRefData]] | None = None
# TODO: We should likely omit the ability to specify outputs
outputs: dict[str, Output] | None = None
# TODO: Response isn't really to be supported, all results are json
response: Response | None = "raw"
subscriber: Subscriber | None = None


class Results(BaseModel):
# TODO: This becomes the schema of the workflow output
__root__: dict[str, InlineOrRefData] | None = None
Expand Down
100 changes: 100 additions & 0 deletions src/swoop/api/models/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

from abc import ABC
from enum import Enum
from pathlib import Path
from typing import Annotated, Literal, Union

import yaml
from pydantic import BaseModel, Field, StrictBool, StrictInt, StrictStr

from swoop.api.exceptions import WorkflowConfigError


class Response(Enum):
# raw = "raw"
document = "document"


class BaseWorkflow(BaseModel, ABC):
name: StrictStr
description: StrictStr
version: StrictInt
cache_key_hash_includes: list[StrictStr] = []
cache_key_hash_excludes: list[StrictStr] = []


class ArgoWorkflow(BaseWorkflow):
handler: Literal["argo-workflow"]
argo_template: StrictStr


class CirrusWorkflow(BaseWorkflow):
handler: Literal["cirrus-workflow"]
sfn_arn: StrictStr


Workflow = Annotated[
Union[
ArgoWorkflow,
CirrusWorkflow,
],
Field(discriminator="handler"),
]


class Feature(BaseModel):
id: StrictStr
collection: StrictStr


class UploadOptions(BaseModel):
path_template: StrictStr
collections: dict
public_assets: list[StrictStr] = []
headers: dict
s3_urls: StrictBool


class Process(BaseModel):
description: StrictStr | None = None
tasks: dict
# input_collections: Optional[list[StrictStr]] = None
upload_options: UploadOptions
workflow: StrictStr


class Payload(BaseModel):
type: StrictStr = "FeatureCollection"
features: list[Feature]
process: list[Process]


class InputPayload(BaseModel):
payload: Payload


class Execute(BaseModel):
# TODO: I believe this is where we need to specify the input payload schema
# inputs: dict[str, InlineOrRefData | list[InlineOrRefData]] | None = None
inputs: InputPayload
# TODO: We should likely omit the ability to specify outputs
# outputs: dict[str, Output] | None = None
# TODO: Response isn't really to be supported, all results are json
response: Literal["document"] = "document"
# subscriber: Subscriber | None = None


class Workflows(dict[str, Workflow]):
class _type(BaseModel):
__root__: dict[str, Workflow]

@classmethod
def from_yaml(cls, path: Path) -> Workflows:
try:
workflows = yaml.safe_load(path.read_text())["workflows"]
for name, workflow in workflows.items():
workflow["name"] = name
return cls(cls._type.parse_obj(workflows).__root__)
except Exception as e:
raise WorkflowConfigError("Could not load workflow configuration") from e
Loading

0 comments on commit 14c48ee

Please sign in to comment.