diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bcf63378..f34d542c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Use declare_component from base class. - Use different colors to distinguish logs - Change all the `_outputs.` to `_outputs__` +- Disable cdc on output tables. #### New Features & Functionality diff --git a/superduper/backends/ibis/data_backend.py b/superduper/backends/ibis/data_backend.py index ff66c9337..a2375cfdc 100644 --- a/superduper/backends/ibis/data_backend.py +++ b/superduper/backends/ibis/data_backend.py @@ -203,6 +203,7 @@ def create_table_and_schema(self, identifier: str, schema: Schema): t = self.conn.table(identifier) else: raise e + return t def drop(self, force: bool = False): diff --git a/superduper/base/datalayer.py b/superduper/base/datalayer.py index f1667ed50..199e76dbd 100644 --- a/superduper/base/datalayer.py +++ b/superduper/base/datalayer.py @@ -338,9 +338,10 @@ def _insert( inserted_ids = insert.do_execute(self) cdc_status = s.CFG.cluster.cdc.uri is not None + is_output_table = insert.table.startswith('_outputs__') if refresh: - if cdc_status: + if cdc_status and not is_output_table: logging.warn('CDC service is active, skipping model/listener refresh') else: return inserted_ids, self.on_event( @@ -391,7 +392,9 @@ def on_event(self, query: Query, ids: t.List[str], event_type: str = 'insert'): identifier = event_data['identifier'] type_id = event_data['type_id'] ids = event_data['ids'] - events.extend([Event(type_id, identifier, id, event_type) for id in ids]) + events.extend( + [Event(type_id, identifier, str(id), event_type) for id in ids] + ) return self.compute.broadcast(events) @@ -437,8 +440,9 @@ def _update(self, update: Query, refresh: bool = True) -> UpdateResult: updated_ids = update.do_execute(self) cdc_status = s.CFG.cluster.cdc.uri is not None + is_output_table = update.table.startswith('_outputs__') if refresh and updated_ids: - if cdc_status: + if cdc_status and not is_output_table: logging.warn('CDC service is active, skipping model/listener refresh') else: # Overwrite should be true since updates could be done on collections diff --git a/superduper/components/vector_index.py b/superduper/components/vector_index.py index f388d1b8a..21102ce5a 100644 --- a/superduper/components/vector_index.py +++ b/superduper/components/vector_index.py @@ -4,7 +4,6 @@ import numpy as np from overrides import override -from superduper import CFG, logging from superduper.backends.base.query import Query from superduper.base.datalayer import Datalayer, DBEvent from superduper.base.document import Document @@ -15,7 +14,6 @@ from superduper.ext.utils import str_shape from superduper.jobs.job import FunctionJob from superduper.misc.annotations import component -from superduper.misc.server import request_server from superduper.misc.special_dicts import MongoStyleDict from superduper.vector_search.base import VectorIndexMeasureType from superduper.vector_search.update_tasks import copy_vectors, delete_vectors @@ -171,30 +169,12 @@ def cleanup(self, db: Datalayer): db.fast_vector_searchers[self.identifier].drop() del db.fast_vector_searchers[self.identifier] - @property - def cdc_table(self): - """Get table for cdc.""" - return self.indexing_listener.outputs - @override def post_create(self, db: "Datalayer") -> None: """Post-create hook. :param db: Data layer instance. """ - logging.info('Requesting vector index setup on CDC service') - if CFG.cluster.cdc.uri: - logging.info('Sending request to add vector index') - request_server( - service='cdc', - endpoint='component/add', - args={'name': self.identifier, 'type_id': self.type_id}, - type='get', - ) - else: - logging.info( - 'Skipping vector index setup on CDC service since no URI is set' - ) db.compute.queue.declare_component(self) @property diff --git a/superduper/misc/server.py b/superduper/misc/server.py index eb4510b19..62a6d9687 100644 --- a/superduper/misc/server.py +++ b/superduper/misc/server.py @@ -74,7 +74,7 @@ def _request_server( result = json.loads(response.content) else: response = requests.get(url, params=args) - result = None + result = json.loads(response.content) if response.status_code != 200: error = json.loads(response.content) msg = f'Server error at {service} with {response.status_code} :: {error}'