Skip to content

Commit ea27b3e

Browse files
committed
Add BulkResponse wrapper for improved decoding of HTTP bulk responses
CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
1 parent 7cb2c68 commit ea27b3e

File tree

5 files changed

+403
-284
lines changed

5 files changed

+403
-284
lines changed

CHANGES.txt

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Unreleased
1212
"Threads may share the module, but not connections."
1313
- Added ``error_trace`` to string representation of an Error to relay
1414
server stacktraces into exception messages.
15+
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
16+
responses including ``rowcount=`` items.
1517

1618
.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
1719
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/

src/crate/client/result.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import typing as t
2+
from functools import cached_property
3+
4+
5+
class BulkResultItem(t.TypedDict):
6+
"""
7+
Define the shape of a CrateDB bulk request response item.
8+
"""
9+
10+
rowcount: int
11+
12+
13+
class BulkResponse:
14+
"""
15+
Manage CrateDB bulk request responses.
16+
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
17+
18+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
19+
"""
20+
21+
def __init__(
22+
self,
23+
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None],
24+
results: t.Union[t.Iterable[BulkResultItem], None]):
25+
self.records = records
26+
self.results = results
27+
28+
@cached_property
29+
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
30+
"""
31+
Compute list of failed records.
32+
33+
CrateDB signals failed inserts using `rowcount=-2`.
34+
35+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
36+
"""
37+
if self.records is None or self.results is None:
38+
return []
39+
errors: t.List[t.Dict[str, t.Any]] = []
40+
for record, status in zip(self.records, self.results):
41+
if status["rowcount"] == -2:
42+
errors.append(record)
43+
return errors
44+
45+
@cached_property
46+
def record_count(self) -> int:
47+
"""
48+
Compute bulk size / length of parameter list.
49+
"""
50+
if not self.records:
51+
return 0
52+
return len(self.records)
53+
54+
@cached_property
55+
def success_count(self) -> int:
56+
"""
57+
Compute number of succeeding records within a batch.
58+
"""
59+
return self.record_count - self.failed_count
60+
61+
@cached_property
62+
def failed_count(self) -> int:
63+
"""
64+
Compute number of failed records within a batch.
65+
"""
66+
return len(self.failed_records)

src/crate/client/test_result.py

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import sys
2+
import unittest
3+
4+
from crate import client
5+
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
6+
from crate.testing.settings import crate_host
7+
8+
9+
class BulkOperationTest(unittest.TestCase):
10+
11+
def setUp(self):
12+
setUpCrateLayerBaseline(self)
13+
14+
def tearDown(self):
15+
tearDownDropEntitiesBaseline(self)
16+
17+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
18+
def test_executemany_with_bulk_response(self):
19+
20+
# Import at runtime is on purpose, to permit skipping.
21+
from crate.client.result import BulkResponse
22+
23+
connection = client.connect(crate_host)
24+
cursor = connection.cursor()
25+
26+
# Run SQL DDL.
27+
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")
28+
29+
# Run a batch insert that only partially succeeds.
30+
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
31+
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)
32+
33+
# Verify CrateDB response.
34+
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])
35+
36+
# Verify decoded response.
37+
bulk_response = BulkResponse(invalid_records, result)
38+
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
39+
40+
cursor.execute("REFRESH TABLE foobar;")
41+
cursor.execute("SELECT * FROM foobar;")
42+
result = cursor.fetchall()
43+
self.assertEqual(result, [[1, "Hotzenplotz 1"]])
44+
45+
cursor.close()
46+
connection.close()

src/crate/client/test_support.py

+273
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# -*- coding: utf-8; -*-
2+
#
3+
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
4+
# license agreements. See the NOTICE file distributed with this work for
5+
# additional information regarding copyright ownership. Crate licenses
6+
# this file to you under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License. You may
8+
# obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
# License for the specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# However, if you have executed another commercial license agreement
19+
# with Crate these terms will supersede the license and you may use the
20+
# software solely pursuant to the terms of the relevant commercial agreement.
21+
22+
from __future__ import absolute_import
23+
24+
import json
25+
import os
26+
import socket
27+
import unittest
28+
from pprint import pprint
29+
from http.server import HTTPServer, BaseHTTPRequestHandler
30+
import ssl
31+
import time
32+
import threading
33+
import logging
34+
35+
import stopit
36+
37+
from crate.testing.layer import CrateLayer
38+
from crate.testing.settings import \
39+
crate_host, crate_path, crate_port, \
40+
crate_transport_port, docs_path, localhost
41+
from crate.client import connect
42+
43+
44+
makeSuite = unittest.TestLoader().loadTestsFromTestCase
45+
46+
log = logging.getLogger('crate.testing.layer')
47+
ch = logging.StreamHandler()
48+
ch.setLevel(logging.ERROR)
49+
log.addHandler(ch)
50+
51+
52+
def cprint(s):
53+
if isinstance(s, bytes):
54+
s = s.decode('utf-8')
55+
print(s)
56+
57+
58+
settings = {
59+
'udc.enabled': 'false',
60+
'lang.js.enabled': 'true',
61+
'auth.host_based.enabled': 'true',
62+
'auth.host_based.config.0.user': 'crate',
63+
'auth.host_based.config.0.method': 'trust',
64+
'auth.host_based.config.98.user': 'trusted_me',
65+
'auth.host_based.config.98.method': 'trust',
66+
'auth.host_based.config.99.user': 'me',
67+
'auth.host_based.config.99.method': 'password',
68+
}
69+
crate_layer = None
70+
71+
72+
def ensure_cratedb_layer():
73+
"""
74+
In order to skip individual tests by manually disabling them within
75+
`def test_suite()`, it is crucial make the test layer not run on each
76+
and every occasion. So, things like this will be possible::
77+
78+
./bin/test -vvvv --ignore_dir=testing
79+
80+
TODO: Through a subsequent patch, the possibility to individually
81+
unselect specific tests might be added to `def test_suite()`
82+
on behalf of environment variables.
83+
A blueprint for this kind of logic can be found at
84+
https://github.com/crate/crate/commit/414cd833.
85+
"""
86+
global crate_layer
87+
88+
if crate_layer is None:
89+
crate_layer = CrateLayer('crate',
90+
crate_home=crate_path(),
91+
port=crate_port,
92+
host=localhost,
93+
transport_port=crate_transport_port,
94+
settings=settings)
95+
return crate_layer
96+
97+
98+
def setUpCrateLayerBaseline(test):
99+
if hasattr(test, "globs"):
100+
test.globs['crate_host'] = crate_host
101+
test.globs['pprint'] = pprint
102+
test.globs['print'] = cprint
103+
104+
with connect(crate_host) as conn:
105+
cursor = conn.cursor()
106+
107+
with open(docs_path('testing/testdata/mappings/locations.sql')) as s:
108+
stmt = s.read()
109+
cursor.execute(stmt)
110+
stmt = ("select count(*) from information_schema.tables "
111+
"where table_name = 'locations'")
112+
cursor.execute(stmt)
113+
assert cursor.fetchall()[0][0] == 1
114+
115+
data_path = docs_path('testing/testdata/data/test_a.json')
116+
# load testing data into crate
117+
cursor.execute("copy locations from ?", (data_path,))
118+
# refresh location table so imported data is visible immediately
119+
cursor.execute("refresh table locations")
120+
# create blob table
121+
cursor.execute("create blob table myfiles clustered into 1 shards " +
122+
"with (number_of_replicas=0)")
123+
124+
# create users
125+
cursor.execute("CREATE USER me WITH (password = 'my_secret_pw')")
126+
cursor.execute("CREATE USER trusted_me")
127+
128+
cursor.close()
129+
130+
131+
def tearDownDropEntitiesBaseline(test):
132+
"""
133+
Drop all tables, views, and users created by `setUpWithCrateLayer*`.
134+
"""
135+
ddl_statements = [
136+
"DROP TABLE foobar",
137+
"DROP TABLE locations",
138+
"DROP BLOB TABLE myfiles",
139+
"DROP USER me",
140+
"DROP USER trusted_me",
141+
]
142+
_execute_statements(ddl_statements)
143+
144+
145+
class HttpsTestServerLayer:
146+
PORT = 65534
147+
HOST = "localhost"
148+
CERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
149+
"pki/server_valid.pem"))
150+
CACERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
151+
"pki/cacert_valid.pem"))
152+
153+
__name__ = "httpsserver"
154+
__bases__ = tuple()
155+
156+
class HttpsServer(HTTPServer):
157+
def get_request(self):
158+
159+
# Prepare SSL context.
160+
context = ssl._create_unverified_context(
161+
protocol=ssl.PROTOCOL_TLS_SERVER,
162+
cert_reqs=ssl.CERT_OPTIONAL,
163+
check_hostname=False,
164+
purpose=ssl.Purpose.CLIENT_AUTH,
165+
certfile=HttpsTestServerLayer.CERT_FILE,
166+
keyfile=HttpsTestServerLayer.CERT_FILE,
167+
cafile=HttpsTestServerLayer.CACERT_FILE)
168+
169+
# Set minimum protocol version, TLSv1 and TLSv1.1 are unsafe.
170+
context.minimum_version = ssl.TLSVersion.TLSv1_2
171+
172+
# Wrap TLS encryption around socket.
173+
socket, client_address = HTTPServer.get_request(self)
174+
socket = context.wrap_socket(socket, server_side=True)
175+
176+
return socket, client_address
177+
178+
class HttpsHandler(BaseHTTPRequestHandler):
179+
180+
payload = json.dumps({"name": "test", "status": 200, })
181+
182+
def do_GET(self):
183+
self.send_response(200)
184+
payload = self.payload.encode('UTF-8')
185+
self.send_header("Content-Length", len(payload))
186+
self.send_header("Content-Type", "application/json; charset=UTF-8")
187+
self.end_headers()
188+
self.wfile.write(payload)
189+
190+
def setUp(self):
191+
self.server = self.HttpsServer(
192+
(self.HOST, self.PORT),
193+
self.HttpsHandler
194+
)
195+
thread = threading.Thread(target=self.serve_forever)
196+
thread.daemon = True # quit interpreter when only thread exists
197+
thread.start()
198+
self.waitForServer()
199+
200+
def serve_forever(self):
201+
print("listening on", self.HOST, self.PORT)
202+
self.server.serve_forever()
203+
print("server stopped.")
204+
205+
def tearDown(self):
206+
self.server.shutdown()
207+
self.server.server_close()
208+
209+
def isUp(self):
210+
"""
211+
Test if a host is up.
212+
"""
213+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
214+
ex = s.connect_ex((self.HOST, self.PORT))
215+
s.close()
216+
return ex == 0
217+
218+
def waitForServer(self, timeout=5):
219+
"""
220+
Wait for the host to be available.
221+
"""
222+
with stopit.ThreadingTimeout(timeout) as to_ctx_mgr:
223+
while True:
224+
if self.isUp():
225+
break
226+
time.sleep(0.001)
227+
228+
if not to_ctx_mgr:
229+
raise TimeoutError("Could not properly start embedded webserver "
230+
"within {} seconds".format(timeout))
231+
232+
233+
def setUpWithHttps(test):
234+
test.globs['crate_host'] = "https://{0}:{1}".format(
235+
HttpsTestServerLayer.HOST, HttpsTestServerLayer.PORT
236+
)
237+
test.globs['pprint'] = pprint
238+
test.globs['print'] = cprint
239+
240+
test.globs['cacert_valid'] = os.path.abspath(
241+
os.path.join(os.path.dirname(__file__), "pki/cacert_valid.pem")
242+
)
243+
test.globs['cacert_invalid'] = os.path.abspath(
244+
os.path.join(os.path.dirname(__file__), "pki/cacert_invalid.pem")
245+
)
246+
test.globs['clientcert_valid'] = os.path.abspath(
247+
os.path.join(os.path.dirname(__file__), "pki/client_valid.pem")
248+
)
249+
test.globs['clientcert_invalid'] = os.path.abspath(
250+
os.path.join(os.path.dirname(__file__), "pki/client_invalid.pem")
251+
)
252+
253+
254+
def _execute_statements(statements, on_error="ignore"):
255+
with connect(crate_host) as conn:
256+
cursor = conn.cursor()
257+
for stmt in statements:
258+
_execute_statement(cursor, stmt, on_error=on_error)
259+
cursor.close()
260+
261+
262+
def _execute_statement(cursor, stmt, on_error="ignore"):
263+
try:
264+
cursor.execute(stmt)
265+
except Exception: # pragma: no cover
266+
# FIXME: Why does this croak on statements like ``DROP TABLE cities``?
267+
# Note: When needing to debug the test environment, you may want to
268+
# enable this logger statement.
269+
# log.exception("Executing SQL statement failed")
270+
if on_error == "ignore":
271+
pass
272+
elif on_error == "raise":
273+
raise

0 commit comments

Comments
 (0)