-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding 4MB limit for Azure Storage Table batch insert #191
Adding 4MB limit for Azure Storage Table batch insert #191
Conversation
…t payload will go over the limit of 4 MB.
Hello @cgillum, After thinking a bit I added support for two more cases that we check for each case in the switch statement:
I think with adding these cases we have better coverage. I am interested to hear your opinion on these suggestions. I like the simplicity of your initial design but I am worried that there can be edge cases we can potentially miss. Let me know if these are valid considerations. Thank you! |
Thanks @gled4er I will review this. I don't understand the case where we throw an exception if the payload is above 4 MB. With your fix, I would expect this would never happen because that's exactly what we're intending to prevent. I'll look at the code changes to see if I can understand better what you're suggesting. |
@@ -21,6 +21,10 @@ | |||
<PackageReference Include="WindowsAzure.Storage" version="8.5.0" /> | |||
</ItemGroup> | |||
|
|||
<ItemGroup> | |||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary? If not, I suggest we remove it. I'm not super-comfortable adding these newer semi-framework packages since I don't fully understand the implications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I will remove it. The value provided is not worth extending the list of dependencies. Thank you!
int byteCount = Encoding.Unicode.GetByteCount(data); | ||
if(byteCount > 4 * 1024 * 1024) | ||
{ | ||
throw new StorageException($"Data with payload of {byteCount} bytes can not be added to a storage table."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause an infinite failure loop, and will result in the exact same problem that this PR is intended to fix. We need to ensure that the payload never exceeds 4 MB. Ling previously did the Large Message Support work to ensure this is not possible for any individual row. The purpose of this PR is simply to measure the aggregate of multiple rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for clarifying that for me. I had the same feeling. I will remove this logic. Thank you!
@@ -502,6 +520,43 @@ public override Task StartAsync() | |||
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); | |||
} | |||
|
|||
private async Task<(int, string)> GetDataByteCountAndETag(string data, int estimatedBytes, string instanceId, string executionId, TableBatchOperation historyEventBatch, StringBuilder newEventListBuffer, IList<HistoryEvent> newEvents, string eTagValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I think a ref
parameter is a more appropriate way to update estimatedBytes
from within this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was my first idea as well but async
methods can't have out or ref parameters.
{ | ||
if(byteCount + estimatedBytes > 4 * 1024 * 1024) | ||
{ | ||
updatedETagValue = await this.UploadHistoryBatch(instanceId, executionId, historyEventBatch, newEventListBuffer, newEvents.Count, eTagValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we upload the history batch here instead of doing it outside the switch
statement in the calling code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right Chris. Now checking the code with a fresh look, I think these changes complicates and does not provide value. I will revert it. Thank you for providing very valuable feedback on that!
Hello @cgillum, Thank you for the great feedback. I will fix the issues. Thank you! |
Thanks. Generally speaking, I think the first iteration you had was sufficient, but let me know if you are confident about other gaps. I want to include this fix in the 1.5 release, so I will wait for this to get merged before doing the release. |
Hello @cgillum, I reverted the changes from the last commit. Thank you for taking time and pointing the issues it introduced. Thank you! Kanio |
Hello @cgillum, I wanted to ask you for clarification. When we check the value of If this idea is useful, we can change a bit the compress method to return a the bytes for the blob name (always async Task<int> CompressLargeMessageAsync(DynamicTableEntity entity)
{
int byteSize = 0;
string propertyKey = this.GetLargeTableEntity(entity);
if (propertyKey != null)
{
string blobName = Guid.NewGuid().ToString();
byteSize = Encoding.Unicode.GetByteCount(blobName);
// e.g.InputBlobName, OutputBlobName, ResultBlobName
string blobNameKey = $"{propertyKey}{BlobNamePropertySuffix}";
byte[] messageBytes = this.GetPropertyMessageAsBytes(entity);
await this.messageManager.CompressAndUploadAsBytesAsync(messageBytes, blobName);
entity.Properties.Add(blobNameKey, new EntityProperty(blobName));
this.SetPropertyMessageToEmptyString(entity);
}
return byteSize;
} And then in for (int i = 0; i < newEvents.Count; i++)
{
HistoryEvent historyEvent = newEvents[i];
DynamicTableEntity entity = this.tableEntityConverter.ConvertToTableEntity(historyEvent);
int compressedLargeMessageNameByteSize = await this.CompressLargeMessageAsync(entity);
// Monitor for orchestration instance events
switch (historyEvent.EventType)
{
case EventType.ExecutionStarted:
estimatedBytes += compressedLargeMessageNameByteSize > 0 ? compressedLargeMessageNameByteSize : Encoding.Unicode.GetByteCount(executionStartedEvent.Input);
break;
case EventType.ExecutionCompleted:
estimatedBytes += compressedLargeMessageNameByteSize > 0 ? compressedLargeMessageNameByteSize : Encoding.Unicode.GetByteCount(executionCompletedEvent.Result);
break;
case EventType.ExecutionTerminated:
estimatedBytes += compressedLargeMessageNameByteSize > 0 ? compressedLargeMessageNameByteSize : Encoding.Unicode.GetByteCount(executionTerminatedEvent.Input);
break;
case EventType.ContinueAsNew:
estimatedBytes += compressedLargeMessageNameByteSize > 0 ? compressedLargeMessageNameByteSize : Encoding.Unicode.GetByteCount(executionCompletedEvent.Result);
break;
}
// Table storage only supports inserts of up to 100 entities at a time or 4 MB at a time.
if (historyEventBatch.Count == 99 || estimatedBytes > 3 * 1024 * 1024 /* 3 MB */)
{
}
} So for example if I have an input that is I am interested to hear your opinion on this suggestion. Thank you! |
It's true that in some cases these properties might be null. That could be the case for large message payloads like you mentioned, or it could be true because a function was given null as an input or output. The code we write needs to work with way (i.e. it shouldn't throw any exceptions). I didn't understand the part about the optimization. How is it we can reduce calls to storage? I'm not sure how valuable it is to return the number of bytes for the blob name. Won't it always be a small fixed size? If the name is 72 bytes and the maximum batch size is 100, isn't that at most ~7 KB? Do you think it's important for us to account for this when we've already given ourselves a 1 MB buffer (we upload early only if the total size reaches 3 MB)? |
Hello @cgillum, I am sorry for not explaining in more understandable way. The use case I was thinking is as follows:
switch (historyEvent.EventType)
{
case EventType.ExecutionStarted:
estimatedBytes += Encoding.Unicode.GetByteCount(executionStartedEvent.Input) > MaxStorageQueuePayloadSizeInBytes ? 72 : Encoding.Unicode.GetByteCount(executionStartedEvent.Input);
break;
case EventType.ExecutionCompleted:
estimatedBytes += Encoding.Unicode.GetByteCount(executionCompleted.Result) > MaxStorageQueuePayloadSizeInBytes ? 72 : Encoding.Unicode.GetByteCount(executionCompleted.Result);
break;
case EventType.ExecutionTerminated:
estimatedBytes += Encoding.Unicode.GetByteCount(executionTerminatedEvent.Input) > MaxStorageQueuePayloadSizeInBytes ? 72 : Encoding.Unicode.GetByteCount(executionTerminatedEvent.Input);
break;
case EventType.ContinueAsNew:
estimatedBytes += Encoding.Unicode.GetByteCount(executionCompletedEvent.Result) > MaxStorageQueuePayloadSizeInBytes ? 72 : Encoding.Unicode.GetByteCount(executionCompletedEvent.Result);
break;
} You are right that we don't need to change the Let me know if you find value in this approach or we can leave the code in the current state. Thank you! |
Ah, I understand now. You're saying we will overcount in the case of large messages. Yes, in that case I think it makes sense to check for this condition and return an appropriate byte size like you suggested so that we don't upload to the history table unnecessarily. |
Hello @cgillum, Thank you for the quick reply! I will add the changes. In terms of integration tests do you think we will need additional ones to be added? Lin added great tests for both large text / binary messages so I think we don't need new tests for this PR. Moreover, I think it will be difficult to detect the batch size from the integration tests. Thank you! |
Hello @cgillum, I add the optimization for the bytes calculation. When you have time, may I ask you to review once more? It will be great if you can enable the CI for this pull request so I can validate that all tests are passing on the server as well. Thank you! |
I'll take a look. Unfortunately there is no CI on this branch, so we have to validate manually. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the simplicity of the new iteration - thanks!
I have one piece of performance-related feedback that I would like you to consider. However, for now I'm going to accept this change so we can move forward with the release - so maybe we can have a followup PR to make it more efficient.
void UpdateEstimatedBytes(ref int estimatedBytes, string payload) | ||
{ | ||
int payloadBytes = Encoding.Unicode.GetByteCount(payload); | ||
estimatedBytes += payloadBytes > MaxStorageQueuePayloadSizeInBytes ? GuidByteSize : payloadBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more efficient to examine the entity to know if there is a large message payload first before counting bytes. These bytes were already counted earlier when deciding if we need to upload to blob storage, so we're basically double-counting.
Unfortunately I found another issue, which is that this PR does not count activity function return values (i.e. the |
* DurableTask.AzureStorage API to enumerate instances (#187) * DurableTask.AzureStorage ETW trace improvements (#192) * Adding 4MB limit check for Azure Storage Table batch insert (#191) * DurableTask.AzureStorage: Alternate fix for the 4 MB max entity size which covers more scenarios. (#194) * Updated Newtonsoft.Json to v11.0.2, WindowsAzure.Storage to v8.6.0. (#193) * Fixed issues with the ETW event source and added test reliability improvements.
Hello @cgillum,
I am opening this PR to give your early visibility on the first changes for #339.
Let me know if you spot something that it needs to be fixed immediately.
I plan to double check the code and add tests for the change next.
Thank you!