Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/minor bugs #2348

Merged
merged 9 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use declare_component from base class.
- Use different colors to distinguish logs


#### New Features & Functionality

- Modify the field name output to _outputs.predict_id in the model results of Ibis.
Expand All @@ -26,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Make "create a table" compulsory
- All datatypes should be wrapped with a Schema
- Support eager mode
- Add CSN env var

#### Bug Fixes

Expand All @@ -34,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove --user from make install_devkit as it supposed to run on a virtualenv.
- component info support list
- Trigger downstream vector indices.
- Fix vector_index function job.

## [0.3.0](https://github.com/superduper-io/superduper/compare/0.3.0...0.2.0]) (2024-Jun-21)

Expand Down
3 changes: 1 addition & 2 deletions superduper/backends/mongodb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,10 @@ def select_using_ids(self, ids: t.Sequence[str]):
)

@property
@applies_to('find', 'update_many', 'delete_many', 'delete_one')
def select_ids(self):
"""Select the ids of the documents."""
filter_ = {}
if self.parts[0][1]:
if self.parts and self.parts[0][1]:
filter_ = self.parts[0][1][0]
projection = {'_id': 1}
coll = MongoQuery(table=self.table, db=self.db)
Expand Down
72 changes: 2 additions & 70 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import click
import networkx
import tqdm

import superduper as s
from superduper import logging
Expand All @@ -24,17 +23,16 @@
from superduper.base.document import Document
from superduper.base.event import Event
from superduper.components.component import Component
from superduper.components.datatype import DataType, _BaseEncodable
from superduper.components.datatype import DataType
from superduper.components.schema import Schema
from superduper.components.table import Table
from superduper.jobs.job import Job
from superduper.misc.annotations import deprecated
from superduper.misc.colors import Colors
from superduper.misc.data import ibatch
from superduper.misc.download import download_from_one
from superduper.misc.retry import db_retry
from superduper.misc.special_dicts import recursive_update
from superduper.vector_search.base import BaseVectorSearcher, VectorItem
from superduper.vector_search.base import BaseVectorSearcher
from superduper.vector_search.interface import FastVectorSearcher

DBResult = t.Any
Expand Down Expand Up @@ -126,7 +124,6 @@ def __init__(

self.compute = compute
self.compute.queue.db = self
self._server_mode = False
self._cfg = s.CFG

def __getitem__(self, item):
Expand All @@ -142,21 +139,6 @@ def cdc(self, cdc):
"""CDC property setter."""
self._cdc = cdc

@property
def server_mode(self):
"""Property for server mode."""
return self._server_mode

@server_mode.setter
def server_mode(self, is_server: bool):
"""
Set server mode property.

:param is_server: New boolean property.
"""
assert isinstance(is_server, bool)
self._server_mode = is_server

def initialize_vector_searcher(
self, identifier, searcher_type: t.Optional[str] = None
) -> t.Optional[BaseVectorSearcher]:
Expand All @@ -180,58 +162,8 @@ def initialize_vector_searcher(
vector_comparison = vector_search_cls.from_component(vi)

assert isinstance(clt.identifier, str), 'clt.identifier must be a string'

self.backfill_vector_search(vi, vector_comparison)

return FastVectorSearcher(self, vector_comparison, vi.identifier)

def backfill_vector_search(self, vi, searcher):
blythed marked this conversation as resolved.
Show resolved Hide resolved
"""
Backfill vector search from model outputs of a given vector index.

:param vi: Identifier of vector index.
:param searcher: FastVectorSearch instance to load model outputs as vectors.
"""
if s.CFG.cluster.vector_search.type == 'native':
return

if s.CFG.cluster.vector_search.uri and not self.server_mode:
return

logging.info(f"Loading vectors of vector-index: '{vi.identifier}'")

if vi.indexing_listener.select is None:
raise ValueError('.select must be set')

if vi.indexing_listener.select.db is None:
vi.indexing_listener.select.db = self

query = vi.indexing_listener.outputs_select

logging.info(str(query))

id_field = query.table_or_collection.primary_id

progress = tqdm.tqdm(desc='Loading vectors into vector-table...')
all_items = []
for record_batch in ibatch(
self.execute(query),
s.CFG.cluster.vector_search.backfill_batch_size,
):
items = []
for record in record_batch:
id = record[id_field]
assert not isinstance(vi.indexing_listener.model, str)
h = record[vi.indexing_listener.outputs_key]
if isinstance(h, _BaseEncodable):
h = h.unpack()
items.append(VectorItem.create(id=str(id), vector=h))
searcher.add(items)
all_items.extend(items)
progress.update(len(items))

searcher.post_create()

# TODO - needed?
def set_compute(self, new: ComputeBackend):
"""
Expand Down
5 changes: 5 additions & 0 deletions superduper/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,11 @@ def info(self, verbosity: int = 1):

_display_component(self, verbosity=verbosity)

@property
def cdc_table(self):
"""Get table for cdc."""
return False


def ensure_initialized(func):
"""Decorator to ensure that the model is initialized before calling the function.
Expand Down
35 changes: 21 additions & 14 deletions superduper/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from superduper.backends.base.query import Query
from superduper.base.document import _OUTPUTS_KEY
from superduper.components.model import Mapping
from superduper.misc.server import request_server
from superduper.misc.server import is_csn, request_server

from ..jobs.job import Job
from .component import Component
Expand Down Expand Up @@ -72,32 +72,34 @@ def outputs_select(self):
"""Get select statement for outputs."""
return self.db[self.select.table].select().outputs(self.predict_id)

@property
def cdc_table(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdc table property which returns the component table to be listened by cdc service when added

"""Get table for cdc."""
return self.select.table_or_collection.identifier

@override
def post_create(self, db: "Datalayer") -> None:
"""Post-create hook.

:param db: Data layer instance.
"""
self.create_output_dest(db, self.uuid, self.model)
if self.select is not None: # and not db.server_mode:
if self.select is not None:
logging.info('Requesting listener setup on CDC service')
if CFG.cluster.cdc.uri:
if CFG.cluster.cdc.uri and not is_csn('cdc'):
logging.info('Sending request to add listener')
request_server(
service='cdc',
endpoint='listener/add',
args={'name': self.identifier},
endpoint='component/add',
args={'name': self.identifier, 'type_id': self.type_id},
type='get',
)
else:
logging.info(
'Skipping listener setup on CDC service since no URI is set'
)
else:
logging.info(
'Skipping listener setup on CDC service'
f' since select is {self.select} or server mode is {db.server_mode}'
)
logging.info('Skipping listener setup on CDC service')
db.compute.queue.declare_component(self)

@classmethod
Expand Down Expand Up @@ -172,7 +174,9 @@ def trigger_ids(self, query: Query, primary_ids: t.Sequence):
keys = [self.key]
elif isinstance(self.key, dict):
keys = list(self.key.keys())
return self._ready_ids(data, keys)

def _ready_ids(self, data, keys):
ready_ids = []
for select in data:
notfound = 0
Expand All @@ -191,24 +195,27 @@ def schedule_jobs(
db: "Datalayer",
dependencies: t.Sequence[Job] = (),
overwrite: bool = False,
ids: t.Optional[t.List[t.Any]] = None,
) -> t.Sequence[t.Any]:
"""Schedule jobs for the listener.

:param db: Data layer instance to process.
:param dependencies: A list of dependencies.
:param overwrite: Overwrite the existing data.
:param ids: Optional ids to schedule.
"""
if self.select is None:
return []
from superduper.base.datalayer import DBEvent
from superduper.base.event import Event

events = []
if ids is None:
ids = db.execute(self.select.select_ids)
ids = [id[self.select.primary_id] for id in ids]
data = db.execute(self.select)
keys = self.key

if isinstance(self.key, str):
keys = [self.key]
elif isinstance(self.key, dict):
keys = list(self.key.keys())
ids = self._ready_ids(data, keys)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, will remove the TODO

events = [
Event(
Expand Down
2 changes: 1 addition & 1 deletion superduper/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ def post_create(self, db):

:param db: Datalayer instance.
"""
db.compute.component_hook(self.identifier, compute_kwargs=self.compute_kwargs)
db.compute.queue.declare_component(self)
blythed marked this conversation as resolved.
Show resolved Hide resolved
super().post_create(db)


Expand Down
36 changes: 29 additions & 7 deletions superduper/components/vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from overrides import override

from superduper import CFG, logging
from superduper.backends.base.query import Query
from superduper.base.datalayer import Datalayer, DBEvent
from superduper.base.document import Document
Expand All @@ -14,6 +15,7 @@
from superduper.ext.utils import str_shape
from superduper.jobs.job import FunctionJob
from superduper.misc.annotations import component
from superduper.misc.server import is_csn, request_server
from superduper.misc.special_dicts import MongoStyleDict
from superduper.vector_search.base import VectorIndexMeasureType
from superduper.vector_search.update_tasks import copy_vectors, delete_vectors
Expand Down Expand Up @@ -171,12 +173,30 @@ def cleanup(self, db: Datalayer):
db.fast_vector_searchers[self.identifier].drop()
del db.fast_vector_searchers[self.identifier]

@property
def cdc_table(self):
"""Get table for cdc."""
return self.indexing_listener.outputs

@override
def post_create(self, db: "Datalayer") -> None:
"""Post-create hook.

:param db: Data layer instance.
"""
logging.info('Requesting vector index setup on CDC service')
if CFG.cluster.cdc.uri and not is_csn('cdc'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

csn means current service name

this check if the current process is cdc
if yes then donot create a cdc trigger

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this work if the services are in a single server?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the function, is_csn

When single server is used via testing
I check for it

logging.info('Sending request to add vector index')
request_server(
service='cdc',
endpoint='component/add',
blythed marked this conversation as resolved.
Show resolved Hide resolved
args={'name': self.identifier, 'type_id': self.type_id},
type='get',
)
else:
logging.info(
'Skipping vector index setup on CDC service since no URI is set'
)
db.compute.queue.declare_component(self)

@property
Expand Down Expand Up @@ -220,8 +240,11 @@ def trigger_ids(self, query: Query, primary_ids: t.Sequence):
if self.indexing_listener.outputs != query.table:
return []

return self._ready_ids(primary_ids)

def _ready_ids(self, ids: t.Sequence):
select = self.indexing_listener.outputs_select
data = self.db.execute(select.select_using_ids(primary_ids))
data = self.db.execute(select.select_using_ids(ids))
key = self.indexing_listener.outputs_key

ready_ids = []
Expand Down Expand Up @@ -266,7 +289,7 @@ def run_jobs(
kwargs=dict(
vector_index=self.identifier,
ids=ids,
query=self.indexing_listener.outputs_select.dict().encode(),
query=db[self.indexing_listener.outputs].dict().encode(),
),
)
job(db=db, dependencies=dependencies)
Expand All @@ -277,21 +300,20 @@ def schedule_jobs(
self,
db: Datalayer,
dependencies: t.Sequence['Job'] = (),
ids: t.Optional[t.List[t.Any]] = None,
) -> t.Sequence[t.Any]:
"""Schedule jobs for the vector index.

:param db: The DB instance to process
:param dependencies: A list of dependencies
:param ids: Optional ids to schedule.
"""
from superduper.base.event import Event

assert self.indexing_listener.select is not None

if ids is None:
ids = db.execute(self.indexing_listener.select.select_ids)
ids = [id[self.indexing_listener.select.primary_id] for id in ids]
outputs = db[self.indexing_listener.outputs]
ids = db.execute(outputs.select_ids)
ids = [id[outputs.primary_id] for id in ids]

events = [
Event(
type_id=self.type_id,
Expand Down
4 changes: 1 addition & 3 deletions superduper/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ def publish(self, events: t.List[Event]):
:param to: Component name for events to be published.
"""

@abstractmethod
def declare_component(self, component):
"""Declare component and add it to queue."""
logging.info(f'Declaring component {component.type_id}/{component.identifier}')
self.db.compute.component_hook(component.identifier, type_id=component.type_id)


class LocalQueuePublisher(BaseQueuePublisher):
Expand All @@ -125,7 +124,6 @@ def build_consumer(self):

def declare_component(self, component):
"""Declare component and add it to queue."""
super().declare_component(component)
self.components[component.type_id, component.identifier] = component

def publish(self, events: t.List[Event]):
Expand Down
Loading
Loading