Skip to content

Commit

Permalink
Merge pull request #1568 from bstasyszyn/1567
Browse files Browse the repository at this point in the history
fix: Set expiryTime field on detached operation
  • Loading branch information
fqutishat authored May 29, 2023
2 parents e01c15c + af348ed commit 23f5800
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ const (
defaultTracingServiceName = "orb"

opQueueDefaultTaskMonitorInterval = 10 * time.Second
opQueueDefaultTaskExpiration = 30 * time.Second
opQueueDefaultTaskExpiration = time.Minute
opQueueDefaultMaxOperationsToRepost = 10000
opQueueDefaultOperationLifespan = 24 * time.Hour

Expand Down
20 changes: 11 additions & 9 deletions pkg/context/opqueue/opqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,15 @@ func (q *Queue) newNackFunc(items []*queuedOperation) func() {

if _, err := q.publish(ctx, op.OperationMessage); err != nil {
q.logger.Errorc(ctx, "Error re-posting operation after NACK. Operation will be detached from this server instance",
logfields.WithOperationID(op.ID), log.WithError(err))
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix), log.WithError(err))

if e := q.detachOperation(op); e != nil {
q.logger.Errorc(ctx, "Failed to detach operation from this server instance",
logfields.WithOperationID(op.ID), log.WithError(e))
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix), log.WithError(e))
} else {
q.logger.Infoc(ctx, "Operation was detached from this server instance since it could not be "+
"published to the queue. It will be retried at a later time.", logfields.WithOperationID(op.ID))
"published to the queue. It will be retried at a later time.",
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix))
}
} else {
operationsToDelete = append(operationsToDelete, op)
Expand All @@ -499,6 +500,8 @@ func (q *Queue) newNackFunc(items []*queuedOperation) func() {
q.logger.Errorc(ctx, "Error deleting operations after NACK. Some (or all) of the operations "+
"will be left in the database and potentially reprocessed (which should be harmless). "+
"The operations should be deleted (at some point) by the data expiry service.", log.WithError(err))
} else {
q.logger.Debugc(ctx, "Deleted operations after NACK", logfields.WithTotal(len(operationsToDelete)))
}
}

Expand Down Expand Up @@ -665,7 +668,7 @@ func (q *Queue) repostOperations(serverID string) error { //nolint:cyclop

if _, e = q.publish(span.Start("re-post operations"), op); e != nil {
q.logger.Error("Error re-posting operation", logfields.WithOperationID(op.ID), log.WithError(e),
logfields.WithSuffix(op.Operation.UniqueSuffix))
logfields.WithSuffix(op.Operation.UniqueSuffix), logfields.WithPermitHolder(serverID))

deleteQueueTask = false

Expand All @@ -677,7 +680,7 @@ func (q *Queue) repostOperations(serverID string) error { //nolint:cyclop
if len(operationsToDelete) >= q.maxOperationsToRepost {
q.logger.Info("Reached max number of operations to re-post in this task run", log.WithError(e),
logfields.WithOperationID(op.ID), logfields.WithSuffix(op.Operation.UniqueSuffix),
logfields.WithMaxOperationsToRepost(q.maxOperationsToRepost))
logfields.WithMaxOperationsToRepost(q.maxOperationsToRepost), logfields.WithPermitHolder(serverID))

deleteQueueTask = false

Expand Down Expand Up @@ -766,6 +769,7 @@ func (q *Queue) detachOperation(op *queuedOperation) error {
pop := &persistedOperation{
OperationMessage: op.OperationMessage,
ServerID: detachedServerID,
ExpiryTime: time.Now().Add(q.operationLifeSpan).Unix(),
}

opBytes, err := q.marshal(pop)
Expand All @@ -774,10 +778,8 @@ func (q *Queue) detachOperation(op *queuedOperation) error {
}

return q.store.Put(op.key, opBytes,
storage.Tag{
Name: tagServerID,
Value: detachedServerID,
},
storage.Tag{Name: tagServerID, Value: detachedServerID},
storage.Tag{Name: tagExpiryTime, Value: fmt.Sprintf("%d", pop.ExpiryTime)},
)
}

Expand Down

0 comments on commit 23f5800

Please sign in to comment.