Skip to content

Commit

Permalink
Add an initial implementation of insert subscriptions.
Browse files Browse the repository at this point in the history
This implementation also includes an example bytewax adapter
which can be used to process inserts with bytewax.

An example bytewax flow is shown below:

```
import uuid
import btrdb
from btrdb.experimental.bytewax_connectors import InsertSubscription
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput

def selector(db):
	# Selector can be anything that returns a list of uuids.
	rows = db.query('select uuid from streams')
	uuids = [uuid.UUID(row['uuid']) for row in rows]
	return uuids

flow = Dataflow()
flow.input("realtime_sub", InsertSubscription(selector, refresh_delay=30))
flow.output("print_output", StdOutput())
```
  • Loading branch information
andrewchambers committed Aug 16, 2023
1 parent 3235404 commit d448eb5
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 145 deletions.
22 changes: 22 additions & 0 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List):
for page in self.stub.SQLQuery(request):
check_proto_stat(page.stat)
yield page.SQLQueryRow

@error_handler
def subscribe(self, update_queue):
def updates():
while True:
update = update_queue.get()
if update is None:
return
(to_add, to_remove) = update
if len(to_add) != 0:
yield btrdb_pb2.SubscriptionUpdate(
op=0, uuid=[uu.bytes for uu in to_add]
)
if len(to_remove) != 0:
yield btrdb_pb2.SubscriptionUpdate(
op=1, uuid=[uu.bytes for uu in to_remove]
)

for response in self.stub.Subscribe(updates()):
check_proto_stat(response.stat)
with pa.ipc.open_stream(response.arrowBytes) as reader:
yield uuid.UUID(bytes=response.uuid), reader.read_all()
Empty file added btrdb/experimental/__init__.py
Empty file.
124 changes: 124 additions & 0 deletions btrdb/experimental/bytewax_connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import queue
import threading
import weakref

from bytewax.inputs import DynamicInput, StatelessSource

import btrdb


class InsertSubscription(DynamicInput):
def __init__(
self,
selector_fn,
refresh_delay=60 * 60 * 6,
profile=None,
conn_str=None,
apikey=None,
):
self._selector_fn = selector_fn
self._conn_str = conn_str
self._apikey = apikey
self._profile = profile
self._refresh_delay = refresh_delay

class Source(StatelessSource):
def __init__(self, db, selector_fn, refresh_delay, worker_index, worker_count):
self._db = db
self._selector_fn = selector_fn
self._worker_index = worker_index
self._worker_count = worker_count
self._refresh_delay = refresh_delay
self._reader_exception = None
self._selector_exception = None
self._del_event = threading.Event()
self._update_queue = queue.Queue(1)
self._data_queue = queue.Queue(128)

# self is wrapped in a weakref with the worker threads so
# that the worker threads keep self alive.
def reader(self, data):
try:
# Avoid exessive weakref lookups
# by doing the lookup upfront initially.
del_event = self._del_event
data_queue = self._data_queue
for dat in data:
if del_event.is_set():
return
data_queue.put(dat)
except Exception as e:
self._reader_exception = e

# Self is a weakref, same as above.
def selector(self):
try:
# Avoid exessive weakref lookups
# by doing the lookup upfront initially.
db = self._db
del_event = self._del_event
selector_fn = self._selector_fn
worker_index = self._worker_index
worker_count = self._worker_count
refresh_delay = self._refresh_delay
current_uuids = set()
while True:
next_uuids = {
uuid
for uuid in selector_fn(db)
if uuid.int % worker_count == worker_index
}
added_uuids = next_uuids - current_uuids
removed_uuids = current_uuids - next_uuids
if len(added_uuids) != 0 or len(removed_uuids) != 0:
self._update_queue.put([added_uuids, removed_uuids])
current_uuids = next_uuids
if del_event.wait(refresh_delay):
return
except Exception as e:
self._selector_exception = e

weakself = weakref.proxy(self)
data = db.ep.subscribe(self._update_queue)
self._selector = threading.Thread(
target=selector, args=[weakself], daemon=True
)
self._reader = threading.Thread(
target=reader, args=[weakself, data], daemon=True
)
self._selector.start()
self._reader.start()

def next(self):
# Check if the selector thread has died.
selector_exception = self._selector_exception
if selector_exception is not None:
raise selector_exception
try:
return self._data_queue.get_nowait()
except queue.Empty:
# Check if the reason no data arrived is because
# the reader thead has died.
reader_exception = self._reader_exception
if reader_exception is not None:
raise reader_exception
return None

def __del__(self):
# Signal workers to exit.
self._del_event.set()
# Signal the end of the subscription.
self._update_queue.put(None)
# XXX do we want to join these threads?
# self._reader.join()
# self._selector.join()

def build(self, worker_index, worker_count):
db = btrdb.connect(
profile=self._profile,
conn_str=self._conn_str,
apikey=self._apikey,
)
return InsertSubscription.Source(
db, self._selector_fn, self._refresh_delay, worker_index, worker_count
)
17 changes: 17 additions & 0 deletions btrdb/grpcinterface/btrdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ service BTrDB {
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
rpc Subscribe(stream SubscriptionUpdate) returns (stream SubscriptionResp);
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
}
Expand Down Expand Up @@ -426,3 +427,19 @@ message ReducedResolutionRange {
int64 End = 2;
uint32 Resolution = 3;
}

enum SubscriptionUpdateOp {
ADD_UUIDS = 0;
REMOVE_UUIDS = 1;
}

message SubscriptionUpdate {
SubscriptionUpdateOp op = 1;
repeated bytes uuid = 2;
}

message SubscriptionResp {
Status stat = 1;
bytes uuid = 2;
bytes arrowBytes = 3;
}
Loading

0 comments on commit d448eb5

Please sign in to comment.