Skip to content

Commit

Permalink
Simplified async blob handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
cdmdotnet committed Oct 17, 2024
1 parent e679fb5 commit 1d052c4
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
Expand Down Expand Up @@ -428,7 +429,7 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
(Func<BlobItem, bool> predicate = null, string blobPrefix = null, string folderName = null)
{
IList<Stream> results = null;
for(int i = 0; i < 3; i++)
for(int i = 0; i < 10; i++)
{
AsyncPageable<BlobItem> blobs;
if (!string.IsNullOrWhiteSpace(folderName))
Expand All @@ -438,13 +439,12 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
var query = new Dictionary<string, BlobItem>();
#if NET472
Task.Run(async () =>
#endif
{
#endif
await foreach (BlobItem blob in blobs)
query.Add(blob.Name, blob);
}
#if NET472
).Wait();
}).Wait();
#endif

IEnumerable<BlobItem> sourceQuery;
Expand All @@ -458,37 +458,41 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
IList<Task> downloadTasks = new List<Task>();
foreach (BlobItem x in source)
{
#if NET472
downloadTasks.Add
(
Task.Run(async () =>
{
#endif
BlobClient blobClient = ReadableSource.GetBlobClient(x.Name);
BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();
BinaryData as1 = downloadResult.Content;
results.Add(as1.ToStream());
BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();
BinaryData as1 = downloadResult.Content;
results.Add(as1.ToStream());
#if NET472
})
);
#endif
}

bool hasFinished = false;
#if NET472
bool hasFinished = false;
Task.Run(async () =>
#endif
{
await Task.WhenAll(downloadTasks).ContinueWith(state => { hasFinished = !state.IsFaulted; });
}
#if NET472
).Wait();
#endif

if (!hasFinished)
{
Logger.LogError("Loading streams faulted.");
throw new Exception("Did not read all blobs.");
}
#endif

// We discovered that sometimes getting blobs can return null streams... not helpful. Seems to be a race condition
if (results.Count == source.Count && !results.Any(x => x == null))
break;
Thread.Sleep(150);
results = null;
}
if (results == null)
Expand Down

0 comments on commit 1d052c4

Please sign in to comment.