-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an initial implementation of realtime subscriptions. #45
base: master
Are you sure you want to change the base?
Conversation
03c0ac3
to
d448eb5
Compare
@@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List): | |||
for page in self.stub.SQLQuery(request): | |||
check_proto_stat(page.stat) | |||
yield page.SQLQueryRow | |||
|
|||
@error_handler | |||
def subscribe(self, update_queue): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add type hinting for the update_queue
parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think I will do a pass of type hinting + docstrings + tests. The queue is slightly awkward because of how grpc handles bidirectional streams.
import threading | ||
import weakref | ||
|
||
from bytewax.inputs import DynamicInput, StatelessSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not important yet, but will need to update the pyproject.toml
optional dependencies to include bytewax
This implementation also includes an example bytewax adapter which can be used to process inserts with bytewax. An example bytewax flow is shown below: ``` import uuid import btrdb from btrdb.experimental.bytewax_connectors import InsertSubscription from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput def selector(db): # Selector can be anything that returns a list of uuids. rows = db.query('select uuid from streams') uuids = [uuid.UUID(row['uuid']) for row in rows] return uuids flow = Dataflow() flow.input("realtime_sub", InsertSubscription(selector, selector_refresh_interval=30)) flow.output("print_output", StdOutput()) ```
d448eb5
to
7993e26
Compare
self._background_worker.start() | ||
self._read_worker.start() | ||
|
||
def next(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted to use the bytewax flow example, and it seems in bytewax=='0.17.1' there is no next
it's either next_batch
or next_awake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was written against v0.16, needs to be reworked slightly for 0.17.
…57) * update setuptools parameters for strict standards starting 2023-Oct-30 * remove invalid url for homepage of the project * remove invalid url for homepage of the project * update the docs url in pyproject.toml
This implementation also includes an example bytewax adapter which can be used to process realtime inserts with bytewax.
An example bytewax flow is shown below: