Skip to content

Commit

Permalink
Update changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Aug 19, 2024
1 parent a2aa91e commit a1f61e4
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Move all plugins superduperdb/ext/* to /plugins
- Optimize the logic for file saving and retrieval in the artifact_store.
- Add backfill on load of vector index
- Add startup event to initialize db.apply jobs.

#### New Features & Functionality

Expand Down
11 changes: 5 additions & 6 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ def _add_child_components(self, components, parent, parent_deps):
)
tmp = self._apply(component, parent=parent.uuid, dependencies=dependencies)
jobs[n] = tmp

return sum(list(jobs.values()), [])

def _update_component(
Expand Down Expand Up @@ -683,12 +682,10 @@ def _apply(
children = [
v for v in serialized[KEY_BUILDS].values() if isinstance(v, Component)
]

jobs.extend(
self._add_child_components(
children, parent=object, parent_deps=dependencies
)
child_jobs = self._add_child_components(
children, parent=object, parent_deps=dependencies
)
jobs.extend(child_jobs)

if children:
serialized = self._change_component_reference_prefix(serialized)
Expand All @@ -707,6 +704,8 @@ def _apply(
for job in jobs:
if isinstance(job, Job):
deps.append(job.job_id)
else:
deps.append(job)
dependencies = list(set([*deps, *dependencies])) # type: ignore[list-item]

object.post_create(self)
Expand Down
3 changes: 2 additions & 1 deletion superduper/base/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class Event:
:param uuid: Unique identifier for the event.
This id will be used as job id in
startup events.
:param dependencies: List of dependencies on the event.
"""

dest: _Component
dest: t.Union[_Component, t.Dict]
id: t.Any

src: t.Optional[_Component] = None
Expand Down
2 changes: 1 addition & 1 deletion superduper/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def run_jobs(
"""
# Note: DB events are not supported yet
# i.e if new data is added after model apply.
self.fit_in_db_job(
return self.fit_in_db_job(
db=db,
dependencies=list(set(dependencies)),
)
Expand Down
6 changes: 3 additions & 3 deletions superduper/components/vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def trigger_ids(self, query: Query, primary_ids: t.Sequence):
)
return ids

def _create_predict_job(self, db, callable, deps, ids, job_id=None):
def _create_vector_sync_job(self, db, callable, deps, ids, job_id=None):
job = FunctionJob(
callable=callable,
args=[],
Expand Down Expand Up @@ -358,7 +358,7 @@ def run_jobs(
jobs = []
for event in component_events:
jobs += [
self._create_predict_job(
self._create_vector_sync_job(
db=db,
callable=callable,
ids=event.id,
Expand All @@ -372,7 +372,7 @@ def run_jobs(
return jobs

jobs += [
self._create_predict_job(
self._create_vector_sync_job(
db=db,
callable=callable,
ids=[event.id for event in db_events],
Expand Down
7 changes: 6 additions & 1 deletion superduper/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,16 @@ def start_consuming(self):
"""Start consuming."""

def _get_consumers(self, db, components):
components = list(set(components.keys()))
def _remove_duplicates(clist):
seen = set()
return [x for x in clist if not (x in seen or seen.add(x))]

components = list(_remove_duplicates(components.keys()))
components_to_use = []

for type_id, _ in components:
components_to_use += [(type_id, x) for x in db.show(type_id)]

return set(components_to_use + components)

def consume(self, db: 'Datalayer', queue: t.Dict, components: t.Dict):
Expand Down
9 changes: 7 additions & 2 deletions test/unittest/component/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class MyComponent1(Component):

def schedule_jobs(self, *args, **kwargs):
self.triggered_schedule_jobs = True
return []
return ('my_dependency_listener',)

c1 = MyComponent1(identifier='c1')

Expand All @@ -142,5 +142,10 @@ def schedule_jobs(self, *args, **kwargs):
select=db["docs"].find(),
)

def mock_schedule_jobs(self, *args, **kwargs):
assert kwargs == {'dependencies': ['my_dependency_listener']}
return []

m.schedule_jobs = mock_schedule_jobs
db.apply(m)
assert m.upstream.triggered_schedule_jobs == True
assert m.upstream.triggered_schedule_jobs == True # noqa
9 changes: 5 additions & 4 deletions test/unittest/component/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,19 @@ def insert_random(start=0):
identifier="listener1",
uuid="listener1",
)
deps, features = db.apply(features_listener)

select = db[features_listener.outputs].select()
trainable_model.trainer = MyTrainer(
'test', select=features_listener.outputs_select, key=features_listener.outputs
'test', select=select, key=features_listener.outputs
)

listener2 = Listener(
upstream=features_listener,
model=trainable_model,
select=features_listener.outputs_select,
select=select,
key=features_listener.outputs,
identifier='listener2',
uuid='listener2',
)
db.apply(listener2, deps)
db.apply(listener2)
assert trainable_model.trainer.training_done is True

0 comments on commit a1f61e4

Please sign in to comment.