Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set expiryTime field on detached operation #1568

Merged
merged 1 commit into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ const (
defaultTracingServiceName = "orb"

opQueueDefaultTaskMonitorInterval = 10 * time.Second
opQueueDefaultTaskExpiration = 30 * time.Second
opQueueDefaultTaskExpiration = time.Minute
opQueueDefaultMaxOperationsToRepost = 10000
opQueueDefaultOperationLifespan = 24 * time.Hour

Expand Down
20 changes: 11 additions & 9 deletions pkg/context/opqueue/opqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,15 @@ func (q *Queue) newNackFunc(items []*queuedOperation) func() {

if _, err := q.publish(ctx, op.OperationMessage); err != nil {
q.logger.Errorc(ctx, "Error re-posting operation after NACK. Operation will be detached from this server instance",
logfields.WithOperationID(op.ID), log.WithError(err))
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix), log.WithError(err))

if e := q.detachOperation(op); e != nil {
q.logger.Errorc(ctx, "Failed to detach operation from this server instance",
logfields.WithOperationID(op.ID), log.WithError(e))
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix), log.WithError(e))
} else {
q.logger.Infoc(ctx, "Operation was detached from this server instance since it could not be "+
"published to the queue. It will be retried at a later time.", logfields.WithOperationID(op.ID))
"published to the queue. It will be retried at a later time.",
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix))
}
} else {
operationsToDelete = append(operationsToDelete, op)
Expand All @@ -499,6 +500,8 @@ func (q *Queue) newNackFunc(items []*queuedOperation) func() {
q.logger.Errorc(ctx, "Error deleting operations after NACK. Some (or all) of the operations "+
"will be left in the database and potentially reprocessed (which should be harmless). "+
"The operations should be deleted (at some point) by the data expiry service.", log.WithError(err))
} else {
q.logger.Debugc(ctx, "Deleted operations after NACK", logfields.WithTotal(len(operationsToDelete)))
}
}

Expand Down Expand Up @@ -665,7 +668,7 @@ func (q *Queue) repostOperations(serverID string) error { //nolint:cyclop

if _, e = q.publish(span.Start("re-post operations"), op); e != nil {
q.logger.Error("Error re-posting operation", logfields.WithOperationID(op.ID), log.WithError(e),
logfields.WithSuffix(op.Operation.UniqueSuffix))
logfields.WithSuffix(op.Operation.UniqueSuffix), logfields.WithPermitHolder(serverID))

deleteQueueTask = false

Expand All @@ -677,7 +680,7 @@ func (q *Queue) repostOperations(serverID string) error { //nolint:cyclop
if len(operationsToDelete) >= q.maxOperationsToRepost {
q.logger.Info("Reached max number of operations to re-post in this task run", log.WithError(e),
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix),
logfields.WithMaxOperationsToRepost(q.maxOperationsToRepost))
logfields.WithMaxOperationsToRepost(q.maxOperationsToRepost), logfields.WithPermitHolder(serverID))

deleteQueueTask = false

Expand Down Expand Up @@ -766,6 +769,7 @@ func (q *Queue) detachOperation(op *queuedOperation) error {
pop := &persistedOperation{
OperationMessage: op.OperationMessage,
ServerID: detachedServerID,
ExpiryTime: time.Now().Add(q.operationLifeSpan).Unix(),
}

opBytes, err := q.marshal(pop)
Expand All @@ -774,10 +778,8 @@ func (q *Queue) detachOperation(op *queuedOperation) error {
}

return q.store.Put(op.key, opBytes,
storage.Tag{
Name: tagServerID,
Value: detachedServerID,
},
storage.Tag{Name: tagServerID, Value: detachedServerID},
storage.Tag{Name: tagExpiryTime, Value: fmt.Sprintf("%d", pop.ExpiryTime)},
)
}

Expand Down