Skip to content

Commit

Permalink
Fix the condition of the CDC job.
Browse files Browse the repository at this point in the history
  • Loading branch information
jieguangzhou committed Aug 20, 2024
1 parent 1786fc1 commit b45817d
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Make component reload after caching in apply
- Fix a minor bug in schedule_jobs
- Fix vector index cleanup
- Fix the condition of the CDC job.

## [0.3.0](https://github.com/superduper-io/superduper/compare/0.3.0...0.2.0]) (2024-Jun-21)

Expand Down
7 changes: 7 additions & 0 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ def _insert(
cdc_status = s.CFG.cluster.cdc.uri is not None
is_output_table = insert.table.startswith(CFG.output_prefix)

logging.info(f'Inserted {len(inserted_ids)} documents into {insert.table}')
logging.debug(f'Inserted IDs: {inserted_ids}')

if refresh:
if cdc_status and not is_output_table:
logging.warn('CDC service is active, skipping model/listener refresh')
Expand Down Expand Up @@ -398,6 +401,10 @@ def on_event(self, query: Query, ids: t.List[str], event_type: str = 'insert'):
[Event(dest=dest, id=str(id), event_type=event_type) for id in ids]
)

logging.info(
f'Created {len(events)} events for {event_type} on [{query.table}]'
)
logging.info(f'Broadcasting {len(events)} events')
return self.compute.broadcast(events)

def _write(self, write: Query, refresh: bool = True) -> UpdateResult:
Expand Down
2 changes: 1 addition & 1 deletion superduper/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def post_create(self, db: "Datalayer") -> None:
self.create_output_dest(db, self.predict_id, self.model)
if self.select is not None:
logging.info('Requesting listener setup on CDC service')
if CFG.cluster.cdc.uri:
if CFG.cluster.cdc.uri and not self.dependencies:
logging.info('Sending request to add listener')
request_server(
service='cdc',
Expand Down
4 changes: 4 additions & 0 deletions superduper/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ def _prepare_inputs_from_select(
)

if len(X_data) > len(ids):
logging.error("Your select is returning more documents than unique ids.")
logging.error(f"X_data: {len(X_data)}; ids: {len(ids)}")
logging.error(f"ids: {ids}")
logging.error(f"select: {select}")
raise Exception(
'You\'ve specified more documents than unique ids;'
f' Is it possible that {select.table_or_collection.primary_id}'
Expand Down
2 changes: 2 additions & 0 deletions superduper/misc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def _request_server(
service_uri = CFG.cluster.vector_search.uri
elif service == 'scheduler':
service_uri = CFG.cluster.scheduler.uri
elif service == 'crontab':
service_uri = CFG.cluster.crontab.uri
else:
raise NotImplementedError(f'Unknown service {service}')

Expand Down

0 comments on commit b45817d

Please sign in to comment.