Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUEUES] Allow jobs created on Component creation to be passed onto downstream components #2329

Closed
Tracked by #2227
blythed opened this issue Jul 18, 2024 · 8 comments
Assignees

Comments

@blythed
Copy link
Collaborator

blythed commented Jul 18, 2024

  1. Compute features
  2. Train PCA on finished computed features
  3. Compute dimension reduced features
class DimReduceModel(Model):
    trainer: ...
    upstream: ...

listener1 = Listener(features_model, ...)
dim_reduce_model = DimReduceModel(trainer=PCATrainer(), upstream=[listener1])
listener2 = Listener(dim_reduce_model, ...)

Explanation

  • The listener1 creates features
  • dim_reduce_model trains on those features
  • listener2 creates features after the training is complete
Screenshot 2024-07-18 at 9 21 18 PM
@blythed
Copy link
Collaborator Author

blythed commented Jul 18, 2024

After discussion with @kartik4949 we may have 2 use-cases:

  1. Computations on existing data (1-time jobs)
  2. Computations on future data (triggered jobs)

One idea might be to add 2 parallel components:

  • Listener (triggered)
  • Map (1-time)

@blythed
Copy link
Collaborator Author

blythed commented Jul 19, 2024

Solution:

  1. Component initialization
  2. Component activation
  3. Real time triggers

@jieguangzhou
Copy link
Collaborator

jieguangzhou commented Jul 22, 2024

To extend the current superduper.base.event.Event class:

Add an event_type attribute with possible values: model_update, insert_data, model_apply, schedule_jobs.

For different event types, include necessary information in the data field, such as:

  • insert_data: data should include table and ids.
  • model_update: should include type_id, identifier, table, and ids.

Before the event is sent to the queue, it should first pass through the EventManager. The EventManager will identify the event type, check if the relevant components trigger jobs, and dispatch the event to the corresponding queue.

@kartik4949 kartik4949 self-assigned this Aug 2, 2024
@kartik4949 kartik4949 moved this to In Progress in superduper-open-source Aug 2, 2024
@kartik4949
Copy link
Collaborator

Discussion with @jieguangzhou
image

@blythed blythed changed the title [QUEUES] Allow jobs created on Listener creation to be passed onto downstream components [QUEUES] Allow jobs created on Component creation to be passed onto downstream components Aug 2, 2024
@kartik4949
Copy link
Collaborator

  • No extra method will be added for the issue
  • We will change Event class and add source, dest, from_type attribute
Class Event:
Source: (type_id, identifeir)
Dest: (type-id, identifier)
from_type: 'DB' or 'COMPONENT'
event_type: 'InSERT', 'DELETE', etc
'id': []

Example: Startup event of a listener component

Source: ('listener', 'my_listener')
Dest: ('listener', 'my_listener')
from_type: 'COMPONENT'
event_type: 'InSERT'
'id': [1, 2, 3, 4, 5, ...]

Example: Database event i.e insert

Source: ('table', 'my_table')
Dest: ('listener', 'my_listener')
from_type: 'DB'
event_type: 'InSERT'
'id': 6
  • Job id will be created in schedule jobs within the event for startup event and hence we can use the job_id and pass it as dependency in subsequent component startup so that they wait for it to complete.

refer: VectorIndex or Listener schedule_jobs function.

@kartik4949
Copy link
Collaborator

kartik4949 commented Aug 9, 2024

Algorithm Explanation:

  1. Component is added to database, schedule_jobs is invoked, e.g listener schedule_jobs.

  2. schedule_jobs will create a component Event i.e `Event(dest=(listener, 'my_listener'), from_type='component', id=[1, 2, 3, ...])

  3. This event will be broadcasted to 'my_listener' queue.

  4. As the consumer recieves the event, we unpack all ids within this Event
    Note: There are two types of events
    a. Startup Component event
    This event is created by a component when it is applied to database.
    This event has multiple ids i.e all ids which are present in the database, e.g defined with select
    b. Database event
    This event is created by insert, update etc database activities.

  5. The startup event in the consumer will unpack all ids and run the job on these ids.

Example scenario

A feature listener and a trainable model listener with validation and trainer

feature_listener = Listener(model=feature_model, select=`table`, key=`x`)

feature_dependencies = db.apply(feature_listener)

trainable_model = Model(
trainer =Trainer(select=feature_listener.outputs_select, key = feature_listener.outputs ),
validation =Validation(datasets=[Dataset(select=feature_listener.outputs_select)])
)

trainable_listener = Listener(model=trainable_model, select=feature_listener.outputs_select)

db.apply(trainable_listener, dependencies= feature_dependencies)
  1. Add feature_listener, this will create feature listener job in the background and return job dependency to it with schedule_jobs

  2. we then create trainable_model with trainer and validation and make it a listener

  3. apply this listener with passing the feature_listener dependency so that it wait for it to first finish.

  4. In the schedule_jobs of Listener, we will create
    a. Training Event
    b. Validation Event with dependency to Training Event (so that we wait for it to complete)

  5. Training Event will be scheduled along with Validation Event with schedule_jobs of _Fittable

Update Added a design diagram

image

Explanation

  • We have a problem with upstream in case of db.inserts
    If we have 2 Components Component2 has upstream as Component1

now, if we apply Component2 we will create initialisation Event and this works fine.

but when select of Component1 gets new data via db insert or update

  • we should broadcast Event on Component1 queue and consumer on this queue should create a job (Component1 Job *
    now we should create a Event on downstream component Component2 and the consumer should create job with dependency on upstream job we created above.

In case of db inserts or update Events we cannot define dependency between component1 and component2 jobs because we are putting events one by one into a component queue and consumer batches them and creates a big event i.e 100 size chunk. and hence we cannot predefine job id before putting event into queue like we do in startup events (initilization)

@blythed
Copy link
Collaborator Author

blythed commented Aug 10, 2024

This explanation is nice. But I need an explanation which isn't specific to Listener. How do the dependencies between different types of Component work?

@blythed
Copy link
Collaborator Author

blythed commented Aug 10, 2024

I think that we should handle the jobs in a similar way as we did before:

We need 3 types of events:

  1. Event to trigger a Component to act on a query and some ids
  2. Event to run a Component initialization job
  3. Event to trigger a new Component or function job to be queued

@kartik4949 kartik4949 moved this from In Progress to Review in superduper-open-source Aug 16, 2024
@kartik4949 kartik4949 moved this from In Progress to Review in superduper-open-source Aug 21, 2024
@github-project-automation github-project-automation bot moved this from Review to Done in superduper-open-source Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants