Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation handling for reliable receipts #1637

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Conversation

EnriqueL8
Copy link
Contributor

@EnriqueL8 EnriqueL8 commented Feb 10, 2025

Work in Progress but has been tested e2e, need to write unit tests

Proposed changes

Add a new function in the Operation Handler called BulkOperationUpdates that takes in a series of OperationUpdate and will notify the submitter when the TX transaction has been committed to the database.

This is important for blockchain plugins that want to preserve ordering of receipts and updating the operation before acknowledging to the blockchain connector that FireFly has accepted those receipts.

Reasons why I went this way:

  • The idempotency key is the Operation ID so correlation is easy in the blockchain plugin between the receipt and the operation
  • Blockchain plugin framework already has the concept of callbacks for updating operations with transaction information
  • These callback go through the operation manager and finally end up in the DB. Today it's a fire and forget as the data gets offloaded to some arbitrary amount of workers to write to the DB.
  • Need a way to communicate to the blockchain plugin when the operation updates has been committed so it can acknowledge a batch in the durable event stream and carry on processing. Used an error channel for this.

Another approach is simply to use the onComplete handler on the OperationUpdate which is executed when it's committed but it's granular to each update

Key part of the code to looks at

  • doBatchUpdateWithRetry runs in a retry.Do and in a DB RunAsGroup, it retries forever see

    func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates []*core.OperationUpdate) error {
    return ou.retry.Do(ctx, "operation update", func(attempt int) (retry bool, err error) {
    err = ou.database.RunAsGroup(ctx, func(ctx context.Context) error {
    return ou.doBatchUpdate(ctx, updates)
    })
    if err != nil {
    return true, err
    }
    for _, update := range updates {
    if update.OnComplete != nil {
    update.OnComplete()
    }
    }
    return false, nil
    })
    }

  • Update worker get picked based on operation ID here

    func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, update *core.OperationUpdate) chan *core.OperationUpdate {

  • Workers call doBatchUpdateWithRetry after timeout or reached a limit to write

    if batch != nil && (timedOut || len(batch.updates) >= ou.conf.maxInserts) {
    batch.timeoutCancel()
    err := ou.doBatchUpdateWithRetry(ctx, batch.updates)
    if err != nil {
    log.L(ctx).Debugf("Operation update worker exiting: %s", err)
    return
    }
    batch = nil
    }
    - this is in memory and causes ordering problems!

Contributes to #1622

Example Blockchain plugin code that has a durable websocket communication:

func (bc *blockchainConnector) eventLoop() {
   // Get some events from the blockchain connector through a weboscket for transactions that have completed: Receipts

   onCommit := make(chan bool)
   bc.callbacks.BulkOperationUpdates(ctx, namespace, updates, onCommit)
   // Wait for the operations to be commit to the DB before acknowledging the
   select {
	case <-ctx.Done():
		return
	case <-onCommit:
	}
   }
  
  // Acknowledging receipts once they have been committed to the DB

  bs.ws.ack()
}

Types of changes

  • Bug fix
  • New feature added
  • Documentation Update

Please make sure to follow these points

  • I have read the contributing guidelines.
  • I have performed a self-review of my own code or work.
  • I have commented my code, particularly in hard-to-understand areas.
  • My changes generates no new warnings.
  • My Pull Request title is in format < issue name > eg Added links in the documentation.
  • I have added tests that prove my fix is effective or that my feature works.
  • My changes have sufficient code coverage (unit, integration, e2e tests).

Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
New callback that can be called to insert a number of operation updates reliably

Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
@@ -77,6 +78,18 @@ type operationsManager struct {
cache cache.CInterface
}

// SubmitBulkOperationUpdate implements Manager.
func (om *operationsManager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- bool) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworking this to be able to handle errors

Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant