-
Notifications
You must be signed in to change notification settings - Fork 5
/
stac_fastapi.py
175 lines (139 loc) · 5.48 KB
/
stac_fastapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# encoding: utf-8
"""
Elasticsearch
-------------
An output backend which outputs the content generated to a STAC FastAPI
using the Transaction endpoint extension
**Plugin name:** ``stac_fastapi``
.. list-table::
:header-rows: 1
* - Option
- Value Type
- Description
* - ``api_url``
- ``str``
- ``REQUIRED`` root url of STAC API
* - ``verify``
- ``bool``
- Path to a yaml file which defines the mapping for the index
Example Configuration:
.. code-block:: yaml
outputs:
- name: stac_fastapi
api_url: https://localhost
"""
__author__ = "Richard Smith"
__date__ = "01 Jun 2021"
__copyright__ = "Copyright 2018 United Kingdom Research and Innovation"
__license__ = "BSD - see LICENSE file in top-level package directory"
__contact__ = "richard.d.smith@stfc.ac.uk"
import logging
from urllib.parse import urljoin
import httpx
from httpx_auth import OAuth2ClientCredentials
from stac_generator.core.output import BaseOutput
LOGGER = logging.getLogger(__name__)
class STACFastAPIOutput(BaseOutput):
"""
Connects to an elasticsearch instance and exports the
documents to elasticsearch.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not hasattr(self, "verify"):
self.verify = True
if hasattr(self, "authentication"):
auth = OAuth2ClientCredentials(
self.authentication.get("token_url"),
client_id=self.authentication.get("client_id"),
client_secret=self.authentication.get("client_secret"),
)
else:
auth = None
self.client = httpx.Client(
auth=auth,
verify=self.verify,
timeout=180,
)
def item(self, data: dict) -> None:
collections = data["collection"]
if isinstance(data["collection"], str):
collections = [collections]
for collection in collections:
collection = data["collection"] = collection.lower()
response = self.client.post(
urljoin(self.api_url, f"collections/{collection}/items"),
json=data,
)
if response.status_code == 404:
response_json = response.json()
if response_json["description"] == f"Collection {collection} does not exist":
collection = {
"type": "Collection",
"id": collection,
"stac_version": "0.1.0",
"stac_extensions": [],
"license": "",
}
response = self.client.post(
urljoin(self.api_url, "collections"),
json=collection,
)
response = self.client.post(
urljoin(self.api_url, f"collections/{collection}/items"),
json=data,
)
if response.status_code == 409:
response_json = response.json()
if (
response_json["description"]
== f"Item {data['id']} in collection {collection} already exists"
):
response = self.client.put(
urljoin(self.api_url, f"collections/{collection}/items/{data['id']}"),
json=data,
)
if response.status_code != 200:
LOGGER.warning(
"FastAPI Output Item update failed with status code: %s and response text: %s",
response.status_code,
response.text,
)
elif response.status_code != 200:
LOGGER.warning(
"FastAPI Output failed to post to STAC Fastapi items endpoint returned status code: %s and response text: %s request data: %s",
response.status_code,
response.text,
data,
)
def collection(self, data: dict) -> None:
response = self.client.post(
urljoin(self.api_url, "collections"),
json=data,
)
if response.status_code == 409:
response_json = response.json()
if response_json["description"] == f"Collection {data['id']} already exists":
response = self.client.put(
urljoin(self.api_url, "collections"),
# urljoin(self.api_url, f"collections/{data['id']}"),
json=data,
)
if response.status_code != 200:
LOGGER.warning(
"FastAPI Output Collection update failed with status code: %s and response text: %s",
response.status_code,
response.text,
)
elif response.status_code != 200:
LOGGER.warning(
"FastAPI Output failed to post to STAC Fastapi collections endpoint returned status code: %s and response text: %s request data: %s",
response.status_code,
response.text,
data,
)
def export(self, data: dict, **kwargs) -> None:
if kwargs["TYPE"].value == "item":
self.item(data)
elif kwargs["TYPE"].value == "collection":
self.collection(data)