Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Add basic worker task and scheduler shell #328

Merged
merged 15 commits into from
Oct 17, 2023

Conversation

stevenlanders
Copy link
Contributor

@stevenlanders stevenlanders commented Oct 11, 2023

Describe your changes and provide context

  • Adds a basic scheduler shell (see TODOs)
  • Adds a basic task definition with request/response/index
  • Listens to abort channel after an execution to determine conflict

Testing performed to validate your change

  • Compiles (holding off until shape is validated)
  • Basic Unit Test for ProcessAll

@stevenlanders stevenlanders changed the title add basic worker task and scheduler shell Add basic worker task and scheduler shell Oct 11, 2023
@stevenlanders stevenlanders changed the title Add basic worker task and scheduler shell [occ] Add basic worker task and scheduler shell Oct 11, 2023
Incarnation int
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to store any data here about "dependent TX" for the ESTIMATE abort case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes once we do the estimate/validation pieces there are a few things we'll need to track

case <-gCtx.Done():
return gCtx.Err()
case task, ok := <-ch:
if !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we refactor this logic under case to a function like receiveTask?

// TODO: error scenarios
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*Task) error {
ch := make(chan *Task, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we create the context WithCancel so that we can kill off the workers from the scheduler? we check the context.Done case below, but we would still need to indicate Done for the context via Cancel right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With errgroups, one can just return an error like context timeout error, and all ctx.Done()s will cancel. If something outside a task needs to cancel, then yes we can wrap errgroup with a WithCancel context, but then it's still going to be the ctx.Done() that exits those routines.

return nil
}
//TODO: ensure version multi store is on context
//abortCh := make(chan Abort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can wrap mvkv here prior to calling the delivertx

}
//TODO: ensure version multi store is on context
//abortCh := make(chan Abort)
resp := s.deliverTx(ctx, task.Request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt it be better here to call deliverTx async, otherwise there would be a deadlock since we'd never get to the channel read below? and then below we have a select statement with abortChannel, deliverTx response, or context Canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I added a comment on this, but basically if the abort channel is buffered, writes won't block)


// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want validation to also be ongoing and not done after all executions are done, that way if we fail validation for tx X we can immediately re-execute instead of having waited for ALL the txs to have executed before realizing the validation failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it clear that as soon as validation fails is when we know it should be immediately re-executed? (If we have to prioritize re-executions?) That wasn't yet clear to me because it seems like we have to do a full-validation at the end anyway.

TaskStatusValidated TaskStatus = "validated"
// statusPending tasks are ready for execution
// all executing tasks are in pending state
statusPending status = "pending"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt these be StatusPending so theyre public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the moment tasks don't leak the scheduler so nothing else cares

func (s *scheduler) validateAll(ctx sdk.Context, tasks []*Task) ([]*Task, error) {
var res []*Task
for _, t := range tasks {
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isnt actually doing any validation, should we rename this to something like collectTasksForValidation (since thats what it appears to be doing?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's going to be doing the validation, it's collecting tasks for re-execution

)

// Abort contains the information for a transaction's conflict
type Abort struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm ok moving this to scheduler too if we want, otherwise if theres a concern of import cycle we can keep in types so it can be used by both store and scheduler

@stevenlanders stevenlanders merged commit ab38f6b into occ-main Oct 17, 2023
14 checks passed
@stevenlanders stevenlanders deleted the add-worker-task-and-shell-scheduler branch October 17, 2023 14:20
udpatil pushed a commit that referenced this pull request Oct 17, 2023
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Oct 17, 2023
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Oct 17, 2023
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 2, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 8, 2024
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 18, 2024
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 18, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 25, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Jan 31, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
codchen pushed a commit that referenced this pull request Feb 6, 2024
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Feb 27, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
udpatil pushed a commit that referenced this pull request Mar 1, 2024
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants