Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an initial implementation of realtime subscriptions. #45

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List):
for page in self.stub.SQLQuery(request):
check_proto_stat(page.stat)
yield page.SQLQueryRow

@error_handler
def subscribe(self, update_queue):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add type hinting for the update_queue parameter?

Copy link
Author

@andrewchambers andrewchambers Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I will do a pass of type hinting + docstrings + tests. The queue is slightly awkward because of how grpc handles bidirectional streams.

def updates():
while True:
update = update_queue.get()
if update is None:
return
(to_add, to_remove) = update
if len(to_add) != 0:
yield btrdb_pb2.SubscriptionUpdate(
op=0, uuid=[uu.bytes for uu in to_add]
)
if len(to_remove) != 0:
yield btrdb_pb2.SubscriptionUpdate(
op=1, uuid=[uu.bytes for uu in to_remove]
)

for response in self.stub.Subscribe(updates()):
check_proto_stat(response.stat)
with pa.ipc.open_stream(response.arrowBytes) as reader:
yield uuid.UUID(bytes=response.uuid), reader.read_all()
Empty file added btrdb/experimental/__init__.py
Empty file.
168 changes: 168 additions & 0 deletions btrdb/experimental/bytewax_connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import time
import queue
import threading
import weakref
import pyarrow as pa

from bytewax.inputs import DynamicInput, StatelessSource

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important yet, but will need to update the pyproject.toml optional dependencies to include bytewax


import btrdb

_empty_values = pa.Table.from_arrays(
[pa.array([]), pa.array([])],
schema=pa.schema(
[
pa.field("time", pa.timestamp("ns", tz="UTC"), nullable=False),
pa.field("value", pa.float64(), nullable=False),
]
),
)


class InsertSubscription(DynamicInput):
def __init__(
self,
selector_fn,
selector_refresh_interval=60 * 60 * 6,
heartbeat_interval=None,
profile=None,
conn_str=None,
apikey=None,
):
self._selector_fn = selector_fn
self._conn_str = conn_str
self._apikey = apikey
self._profile = profile
self._selector_refresh_interval = selector_refresh_interval
self._heartbeat_interval = heartbeat_interval

class Source(StatelessSource):
def __init__(
self,
db,
selector_fn,
selector_refresh_interval,
heartbeat_interval,
worker_index,
worker_count,
):
self._db = db
self._selector_fn = selector_fn
self._worker_index = worker_index
self._worker_count = worker_count
self._selector_refresh_interval = selector_refresh_interval
self._heartbeat_interval = heartbeat_interval
self._read_worker_exception = None
self._background_worker_exception = None
self._del_event = threading.Event()
self._update_queue = queue.Queue(1)
self._data_queue = queue.Queue(512)

# self is wrapped in a weakref with the worker threads so
# that the worker threads keep self alive.
def read_worker(self, data):
try:
# Avoid exessive weakref lookups
# by doing the lookup upfront initially.
del_event = self._del_event
data_queue = self._data_queue
for dat in data:
if del_event.is_set():
return
data_queue.put(dat)
except Exception as e:
self._read_worker_exception = e

# Self is a weakref, same as above.
def background_worker(self):
try:
# Avoid exessive weakref lookups
# by doing the lookup upfront initially.
db = self._db
del_event = self._del_event
selector_fn = self._selector_fn
worker_index = self._worker_index
worker_count = self._worker_count
data_queue = self._data_queue
heartbeat_interval = self._heartbeat_interval
last_heartbeat = time.monotonic()
selector_refresh_interval = self._selector_refresh_interval
last_selector_refresh = time.monotonic() - selector_refresh_interval
current_uuids = set()
while True:
now = time.monotonic()
if (now - last_selector_refresh) >= selector_refresh_interval:
last_selector_refresh = now
next_uuids = {
uuid
for uuid in selector_fn(db)
if uuid.int % worker_count == worker_index
}
added_uuids = next_uuids - current_uuids
removed_uuids = current_uuids - next_uuids
if len(added_uuids) != 0 or len(removed_uuids) != 0:
self._update_queue.put([added_uuids, removed_uuids])
current_uuids = next_uuids
if (
heartbeat_interval is not None
and (now - last_heartbeat) >= heartbeat_interval
):
last_heartbeat = now
for uuid in current_uuids:
while not del_event.is_set():
try:
data_queue.put((uuid, _empty_values), 0.1)
break
except queue.Full:
pass
if del_event.wait(1.0):
return
except Exception as e:
self._background_worker_exception = e

weakself = weakref.proxy(self)
data = db.ep.subscribe(self._update_queue)
self._background_worker = threading.Thread(
target=background_worker, args=[weakself], daemon=True
)
self._read_worker = threading.Thread(
target=read_worker, args=[weakself, data], daemon=True
)
self._background_worker.start()
self._read_worker.start()

def next(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to use the bytewax flow example, and it seems in bytewax=='0.17.1' there is no next it's either next_batch or next_awake

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was written against v0.16, needs to be reworked slightly for 0.17.

# Check if the selector thread has died.
background_worker_exception = self._background_worker_exception
if background_worker_exception is not None:
raise background_worker_exception
try:
return self._data_queue.get_nowait()
except queue.Empty:
# Check if the reason no data arrived is because
# the reader thead has died.
read_worker_exception = self._read_worker_exception
if read_worker_exception is not None:
raise read_worker_exception
return None

def __del__(self):
# Signal workers to exit.
self._del_event.set()
# Signal the end of the subscription.
self._update_queue.put(None)

def build(self, worker_index, worker_count):
db = btrdb.connect(
profile=self._profile,
conn_str=self._conn_str,
apikey=self._apikey,
)
return InsertSubscription.Source(
db,
self._selector_fn,
self._selector_refresh_interval,
self._heartbeat_interval,
worker_index,
worker_count,
)
17 changes: 17 additions & 0 deletions btrdb/grpcinterface/btrdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ service BTrDB {
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
rpc Subscribe(stream SubscriptionUpdate) returns (stream SubscriptionResp);
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
}
Expand Down Expand Up @@ -426,3 +427,19 @@ message ReducedResolutionRange {
int64 End = 2;
uint32 Resolution = 3;
}

enum SubscriptionUpdateOp {
ADD_UUIDS = 0;
REMOVE_UUIDS = 1;
}

message SubscriptionUpdate {
SubscriptionUpdateOp op = 1;
repeated bytes uuid = 2;
}

message SubscriptionResp {
Status stat = 1;
bytes uuid = 2;
bytes arrowBytes = 3;
}
Loading
Loading