Skip to content

Commit 03c0ac3

Browse files
Add an initial implementation of insert subscriptions.
This implementation also includes an example bytewax adapter which can be used to process inserts with bytewax. An example bytewax flow is shown below: ``` import uuid import btrdb from btrdb.experimental.bytewax_connectors import InsertSubscription from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput def selector(db): # Selector can be anything that returns a list of uuids. rows = db.query('select uuid from streams') uuids = [uuid.UUID(row['uuid']) for row in rows] return uuids flow = Dataflow() flow.input("realtime_sub", InsertSubscription(selector, refresh_delay=30)) flow.output("print_output", StdOutput()) ```
1 parent 3235404 commit 03c0ac3

File tree

6 files changed

+346
-145
lines changed

6 files changed

+346
-145
lines changed

btrdb/endpoint.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List):
386386
for page in self.stub.SQLQuery(request):
387387
check_proto_stat(page.stat)
388388
yield page.SQLQueryRow
389+
390+
@error_handler
391+
def subscribe(self, update_queue):
392+
def updates():
393+
while True:
394+
update = update_queue.get()
395+
if update is None:
396+
return
397+
(to_add, to_remove) = update
398+
if len(to_add) != 0:
399+
yield btrdb_pb2.SubscriptionUpdate(
400+
op=0, uuid=[uu.bytes for uu in to_add]
401+
)
402+
if len(to_remove) != 0:
403+
yield btrdb_pb2.SubscriptionUpdate(
404+
op=1, uuid=[uu.bytes for uu in to_remove]
405+
)
406+
407+
for response in self.stub.Subscribe(updates()):
408+
check_proto_stat(response.stat)
409+
with pa.ipc.open_stream(response.arrowBytes) as reader:
410+
yield uuid.UUID(bytes=response.uuid), reader.read_all()

btrdb/experimental/__init__.py

Whitespace-only changes.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import queue
2+
import threading
3+
import weakref
4+
5+
from bytewax.inputs import DynamicInput, StatelessSource
6+
7+
import btrdb
8+
9+
10+
class InsertSubscription(DynamicInput):
11+
def __init__(
12+
self,
13+
selector_fn,
14+
refresh_delay=60 * 60 * 6,
15+
profile=None,
16+
conn_str=None,
17+
apikey=None,
18+
):
19+
self._selector_fn = selector_fn
20+
self._conn_str = conn_str
21+
self._apikey = apikey
22+
self._profile = profile
23+
self._refresh_delay = refresh_delay
24+
25+
class Source(StatelessSource):
26+
def __init__(self, db, selector_fn, refresh_delay, worker_index, worker_count):
27+
self._db = db
28+
self._selector_fn = selector_fn
29+
self._worker_index = worker_index
30+
self._worker_count = worker_count
31+
self._refresh_delay = refresh_delay
32+
self._reader_exception = None
33+
self._selector_exception = None
34+
self._del_event = threading.Event()
35+
self._update_queue = queue.Queue(1)
36+
self._data_queue = queue.Queue(128)
37+
38+
# self is wrapped in a weakref with the worker threads so
39+
# that the worker threads keep self alive.
40+
def reader(self, data):
41+
try:
42+
# Avoid exessive weakref lookups
43+
# by doing the lookup upfront initially.
44+
del_event = self._del_event
45+
data_queue = self._data_queue
46+
for dat in data:
47+
if del_event.is_set():
48+
return
49+
data_queue.put(dat)
50+
except Exception as e:
51+
self._reader_exception = e
52+
53+
# Self is a weakref, same as above.
54+
def selector(self):
55+
try:
56+
# Avoid exessive weakref lookups
57+
# by doing the lookup upfront initially.
58+
db = self._db
59+
del_event = self._del_event
60+
selector_fn = self._selector_fn
61+
worker_index = self._worker_index
62+
worker_count = self._worker_count
63+
refresh_delay = self._refresh_delay
64+
current_uuids = set()
65+
while True:
66+
next_uuids = {
67+
uuid
68+
for uuid in selector_fn(db)
69+
if uuid.int % worker_count == worker_index
70+
}
71+
added_uuids = next_uuids - current_uuids
72+
removed_uuids = current_uuids - next_uuids
73+
if len(added_uuids) != 0 or len(removed_uuids) != 0:
74+
self._update_queue.put([added_uuids, removed_uuids])
75+
current_uuids = next_uuids
76+
if del_event.wait(refresh_delay):
77+
return
78+
except Exception as e:
79+
self._selector_exception = e
80+
81+
weakself = weakref.proxy(self)
82+
data = db.ep.subscribe(self._update_queue)
83+
self._selector = threading.Thread(
84+
target=selector, args=[weakself], daemon=True
85+
)
86+
self._reader = threading.Thread(
87+
target=reader, args=[weakself, data], daemon=True
88+
)
89+
self._selector.start()
90+
self._reader.start()
91+
92+
def next(self):
93+
# Check if the selector thread has died.
94+
selector_exception = self._selector_exception
95+
if selector_exception is not None:
96+
raise selector_exception
97+
try:
98+
return self._data_queue.get_nowait()
99+
except queue.Empty:
100+
# Check if the reason no data arrived is because
101+
# the reader thead has died.
102+
reader_exception = self._reader_exception
103+
if reader_exception is not None:
104+
raise reader_exception
105+
return None
106+
107+
def __del__(self):
108+
# Signal workers to exit.
109+
self._del_event.set()
110+
# Signal the end of the subscription.
111+
self._update_queue.put(None)
112+
# self._reader.join()
113+
# self._selector.join()
114+
115+
def build(self, worker_index, worker_count):
116+
db = btrdb.connect(
117+
profile=self._profile,
118+
conn_str=self._conn_str,
119+
apikey=self._apikey,
120+
)
121+
return InsertSubscription.Source(
122+
db, self._selector_fn, self._refresh_delay, worker_index, worker_count
123+
)

btrdb/grpcinterface/btrdb.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ service BTrDB {
2828
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
2929
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
3030
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
31+
rpc Subscribe(stream SubscriptionUpdate) returns (stream SubscriptionResp);
3132
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
3233
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
3334
}
@@ -426,3 +427,19 @@ message ReducedResolutionRange {
426427
int64 End = 2;
427428
uint32 Resolution = 3;
428429
}
430+
431+
enum SubscriptionUpdateOp {
432+
ADD_UUIDS = 0;
433+
REMOVE_UUIDS = 1;
434+
}
435+
436+
message SubscriptionUpdate {
437+
SubscriptionUpdateOp op = 1;
438+
repeated bytes uuid = 2;
439+
}
440+
441+
message SubscriptionResp {
442+
Status stat = 1;
443+
bytes uuid = 2;
444+
bytes arrowBytes = 3;
445+
}

0 commit comments

Comments
 (0)