-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Service Bus messages send from HTTP failing to renew message locks #169
Conversation
…no delivery headers present
This is hopefully now ready for review, end2end smoketest is passing :) |
4265bd8
to
c4c9926
Compare
internal/app/dispatcher/run.go
Outdated
if err != nil { | ||
// Todo: Additional could be put in here to cleanup operations. See: #171 | ||
// https://github.com/lawrencegripper/ion/issues/171 | ||
log.WithError(err).Panic("Failed to renew locks therefor cannot continue to operation as message maybe reassigned to another dispatcher.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check grammar: would guess: "Failed to renew locks therefor therefore cannot continue to operation as message maybe could be reassigned to another dispatcher."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ta
@@ -150,7 +150,8 @@ func (c *Committer) commitBlob(blobsDir string) (map[string]string, error) { | |||
|
|||
logger.Info(c.context, "committed blob data") | |||
logger.DebugWithFields(c.context, "blob file names", map[string]interface{}{ | |||
"files": files, | |||
"files": files, | |||
"blobURIs": blobURIs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a potential security leak i.e. storing authenticated URLs in logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, removed. Left over from debugging :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot
@@ -192,22 +192,45 @@ func NewAmqpConnection(ctx context.Context, config *types.Configuration) *AmqpCo | |||
return &listener | |||
} | |||
|
|||
func swapIndex(indexOne, indexTwo int, array *[16]byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can perform the swap with only 1 temp variable if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well now your just getting fancy, had gone down the explicit route but agree will fix this up.
go func() { | ||
defer wg.Done() | ||
for { | ||
// Locks are held for 1 mins, renew every 20 sec to keep locks | ||
time.Sleep(20 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we provide cancellation contexts to our Sleeps to keep the dispatcher responsive to Kubernetes ops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point - probably something worthy of a separate issue as quite a chunk of changes unrelated to this PR. It's already tracked #110 as help wanted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Related: #157
So I'm very nearly finished with this fix. It's been a long journey to get to the bottom of the issue and I've done quite a bit of work with the go amqp 1 library to get things in place.
Currently I'm seeing slightly non-deterministic behavior with the renewals. The lock token is in the deliveryTag of each message but is stored in the .NET GUID byte format not the AMQP UUID format so a conversion is needed my current code only works roughly 80% of the time.
closes #157, closes #170