diff --git a/README.md b/README.md index e439db3..d431422 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,8 @@ dotnet run --project src/PathlingS3Import/ -- import \ --s3-object-name-prefix=staging/ \ --enable-metrics=true \ --pushgateway-endpoint=http://localhost:9091/ \ - --dry-run=false + --dry-run=true \ + --enable-merging=true ``` Or to test importing from a checkpoint: @@ -59,19 +60,6 @@ dotnet run --project src/PathlingS3Import/ -- import \ --dry-run=false ``` -### merge - -```sh -dotnet run --project src/PathlingS3Import/ -- merge \ - --s3-endpoint=http://localhost:9000 \ - --s3-access-key=admin \ - --s3-secret-key=miniopass \ - --s3-bucket-name=fhir \ - --s3-object-name-prefix=staging/ \ - --max-merged-bundle-size=10 \ - --dry-run=true -``` - ### Run E2E Tests ```sh diff --git a/src/PathlingS3Import/Commands/ImportCommand.cs b/src/PathlingS3Import/Commands/ImportCommand.cs index f83212e..0977b32 100644 --- a/src/PathlingS3Import/Commands/ImportCommand.cs +++ b/src/PathlingS3Import/Commands/ImportCommand.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Diagnostics; using System.Reactive.Linq; using System.Text; @@ -10,8 +11,9 @@ using Hl7.Fhir.Serialization; using Microsoft.Extensions.Logging; using Minio; +using Minio.DataModel; using Minio.DataModel.Args; -using Polly; +using Minio.Exceptions; using Prometheus.Client; using Prometheus.Client.Collectors; using Prometheus.Client.MetricPusher; @@ -65,14 +67,29 @@ public ImportCommand() [CliOption(Description = "The type of FHIR resource to import")] public ResourceType ImportResourceType { get; set; } = ResourceType.Patient; - [CliOption(Description = "Delay to wait after importing a bundle")] + [CliOption(Description = "Delay to wait after importing an NDJSON file")] public TimeSpan SleepAfterImport { get; set; } = TimeSpan.FromSeconds(10); + [CliOption(Description = "Name of the checkpoint file", Name = "--checkpoint-file-name")] + public string CheckpointFileName { get; set; } = "_last-import-checkpoint.json"; + [CliOption( - Description = "Name of the import checkpoint file", - Name = "--import-checkpoint-file-name" + Description = "The maximum number of resources a merged NDJSON may contain." + + " Files that are already larger than this value might be merged into files exceeding this value." )] - public string CheckpointFileName { get; set; } = "_last-import-checkpoint.json"; + public int MaxMergedBundleSize { get; set; } = 100; + + [CliOption(Description = "The maximum size of the merged NDJSON file in bytes. Default: 1 GiB")] + public int MaxMergedBundleSizeInBytes { get; set; } = 1 * 1024 * 1024 * 1024; + + [CliOption( + Description = "Enable merging of multiple NDJSON files before importing.", + Name = "--enable-merging" + )] + public bool IsMergingEnabled { get; set; } = false; + + private JsonSerializerOptions FhirJsonOptions { get; } = + new JsonSerializerOptions().ForFhir(ModelInfo.ModelInspector); public async Task RunAsync() { @@ -122,21 +139,17 @@ public async Task RunAsync() var pushServer = new MetricPushServer(metricPusher, TimeSpan.FromSeconds(10)); pushServer.Start(); - await DoAsync(minio, fhirClient, RetryPipeline); + await DoAsync(minio, fhirClient); metricPusher.PushAsync().Wait(); pushServer.Stop(); } else { - await DoAsync(minio, fhirClient, RetryPipeline); + await DoAsync(minio, fhirClient); } } - private async Task DoAsync( - IMinioClient minio, - FhirClient fhirClient, - ResiliencePipeline retryPipeline - ) + private async Task DoAsync(IMinioClient minio, FhirClient fhirClient) { log.LogInformation( "Checking if bucket {S3BucketName} exists in {S3BaseUrl}.", @@ -193,11 +206,11 @@ await allObjects.CountAsync() }) .ToListAsync(); - var index = 0; - foreach (var o in objectsToProcess) + log.LogInformation("Listing all matching items in bucket"); + + foreach (var (index, item) in objectsToProcess.Select((value, index) => (index, value))) { - log.LogInformation("{Index}. {Key}", index, o.Key); - index++; + log.LogInformation("{Index}. {Key}", index, item.Key); } var checkpointObjectName = $"{prefix}{CheckpointFileName}"; @@ -209,68 +222,17 @@ await allObjects.CountAsync() if (IsContinueFromLastCheckpointEnabled) { - log.LogInformation( - "Reading last checkpoint file from {CheckpointObjectName}", + objectsToProcess = await GetItemsToProcessAfterCheckpointAsync( + minio, + objectsToProcess, checkpointObjectName ); - var lastProcessedFile = string.Empty; - // read the contents of the last checkpoint file - var getArgs = new GetObjectArgs() - .WithBucket(S3BucketName) - .WithObject(checkpointObjectName) - .WithCallbackStream( - async (stream, ct) => - { - using var reader = new StreamReader(stream, Encoding.UTF8); - - var checkpointJson = await reader.ReadToEndAsync(ct); - var checkpoint = JsonSerializer.Deserialize( - checkpointJson - ); - - log.LogInformation("Last checkpoint: {CheckpointJson}", checkpointJson); - - if (checkpoint is not null) - { - lastProcessedFile = checkpoint.LastProcessedObjectUrl; - } - else - { - log.LogError( - "Failed to read checkpoint file: deserialized object is null" - ); - } - - await stream.DisposeAsync(); - } - ); - await minio.GetObjectAsync(getArgs); - - if (string.IsNullOrEmpty(lastProcessedFile)) - { - throw new InvalidDataException( - "Failed to read last processed file. Contents are null or empty." - ); - } - - log.LogInformation("Continuing after {LastProcessedFile}", lastProcessedFile); + log.LogInformation("Listing actual objects to process after checkpoint"); - objectsToProcess = objectsToProcess - .SkipWhile(item => $"s3://{S3BucketName}/{item.Key}" != lastProcessedFile) - // SkipWhile stops if we reach the lastProcessedFile, but includes the entry itself in the - // result, so we need to skip that as well. - // Ideally, we'd say `SkipWhile(item.key-timestamp <= lastProcessedFile-timestamp)`. - .Skip(1) - .ToList(); - - log.LogInformation("Listing actual objects to process"); - - index = 0; - foreach (var o in objectsToProcess) + foreach (var (index, item) in objectsToProcess.Select((value, index) => (index, value))) { - log.LogInformation("{Index}. {Key}", index, o.Key); - index++; + log.LogInformation("{Index}. {Key}", index, item.Key); } } @@ -280,13 +242,31 @@ await allObjects.CountAsync() var stopwatch = new Stopwatch(); var importedCount = 0; + var currentMergedResources = new ConcurrentDictionary(); + var estimatedSizeInBytes = 0; + var mergedItemsCount = 0; + var lastMergedObjectUrl = string.Empty; + var lastProcessedObjectUrl = string.Empty; + foreach (var item in objectsToProcess) { var objectUrl = $"s3://{S3BucketName}/{item.Key}"; - using (log.BeginScope("[Importing ndjson file {NdjsonObjectUrl}]", objectUrl)) + lastProcessedObjectUrl = objectUrl; + + using var _ = log.BeginScope("[Processing ndjson file {ObjectUrl}]", objectUrl); + + int resourceCountInFile = await CountResourcesInNDJsonAsync(minio, item.Key); + + log.LogInformation( + "{ObjectUrl} contains {ResourceCount} resources.", + objectUrl, + resourceCountInFile + ); + + if (IsMergingEnabled) { - var resourceCountInFile = 0; + // TODO: this could be consolidated in CountResourcesInNDJsonAsync var getArgs = new GetObjectArgs() .WithBucket(S3BucketName) .WithObject(item.Key) @@ -294,146 +274,385 @@ await allObjects.CountAsync() async (stream, ct) => { using var reader = new StreamReader(stream, Encoding.UTF8); - while (await reader.ReadLineAsync(ct) is not null) + while (await reader.ReadLineAsync(ct) is { } line) { - resourceCountInFile++; + var resource = JsonSerializer.Deserialize( + line, + FhirJsonOptions + ); + + if (resource is null) + { + log.LogWarning("Read a resource that is null"); + continue; + } + // adds or updates the resource by its id in the dictionary. + // store the plaintext resource to avoid serializing again later + currentMergedResources[resource.Id] = line; + estimatedSizeInBytes += Encoding.UTF8.GetByteCount(line); } } ); - stopwatch.Restart(); await minio.GetObjectAsync(getArgs); - stopwatch.Stop(); log.LogInformation( - "{ObjectUrl} contains {ResourceCount} resources. Counting took {ResourceCountDuration}", + "{ObjectUrl} contains {ResourceCount} resources. " + + "Current bundle total so far: {CurrentMergedResourcesCount} of {MaxMergedBundleSize}", objectUrl, resourceCountInFile, - stopwatch + currentMergedResources.Count, + MaxMergedBundleSize ); - var parameter = new Parameters.ParameterComponent() + mergedItemsCount++; + + if ( + currentMergedResources.Count >= MaxMergedBundleSize + || estimatedSizeInBytes >= MaxMergedBundleSizeInBytes + ) { - Name = "source", - Part = - [ - new Parameters.ParameterComponent() - { - Name = "resourceType", - Value = new Code(ImportResourceType.ToString()), - }, - new Parameters.ParameterComponent() - { - Name = "mode", - Value = new Code("merge"), - }, - new Parameters.ParameterComponent() - { - Name = "url", - Value = new FhirUrl(objectUrl), - }, - ], - }; + log.LogInformation( + "Created merged bundle of {Count} resources. " + + "Estimated size: {EstimatedSizeInBytes} B ({EstimatedSizeInMebiBytes} MiB). " + + "Limit: {MaxMergedBundleSizeInBytes} B ({MaxMergedBundleSizeInMebiBytes} MiB)", + currentMergedResources.Count, + estimatedSizeInBytes, + estimatedSizeInBytes / 1024 / 1024, + MaxMergedBundleSizeInBytes, + MaxMergedBundleSizeInBytes / 1024 / 1024 + ); - var importParameters = new Parameters(); - // for now, create one Import request per file. In the future, - // we might want to add multiple ndjson files at once in batches. - importParameters.Parameter.Add(parameter); + var mergedObjectName = await PutMergedBundleAsync( + minio, + currentMergedResources + ); - log.LogInformation("{ImportParameters}", await importParameters.ToJsonAsync()); + lastMergedObjectUrl = $"s3://{S3BucketName}/{mergedObjectName}"; - log.LogInformation( - "Starting {PathlingServerBaseUrl}/$import for {ObjectUrl}", - PathlingServerBaseUrl, - objectUrl + // overriding this feels a bit hacky... + resourceCountInFile = currentMergedResources.Count; + + currentMergedResources.Clear(); + estimatedSizeInBytes = 0; + } + else + { + // continue in the foreach loop. + // exiting this block will execute the import + continue; + } + } + + stopwatch.Restart(); + if (!IsDryRun) + { + var objectUrlToImport = IsMergingEnabled ? lastMergedObjectUrl : objectUrl; + await ImportNdjsonFileAsync(fhirClient, objectUrlToImport, resourceCountInFile); + // always checkpoint progress against the object URL, i.e. not the merged one + await CheckpointProgressAsync(minio, checkpointObjectName, objectUrl); + } + else + { + log.LogInformation("In dry-run mode. Sleeping for 5s."); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + stopwatch.Stop(); + + importedCount += IsMergingEnabled ? mergedItemsCount : 1; + log.LogInformation( + "Imported {ImportedCount} / {ObjectsToProcessCount}", + importedCount, + objectsToProcessCount + ); + + log.LogInformation( + "Sleeping after import for {SleepAfterImportSeconds} s", + SleepAfterImport.TotalSeconds + ); + + await Task.Delay(SleepAfterImport); + } + + // will only ever be not-empty if merging is actually enabled + if (!currentMergedResources.IsEmpty) + { + log.LogInformation( + "Resources remaining: {Count}. Uploading as smaller bundle.", + currentMergedResources.Count + ); + + var mergedObjectName = await PutMergedBundleAsync(minio, currentMergedResources); + var mergedObjectUrl = $"s3://{S3BucketName}/{mergedObjectName}"; + + if (!IsDryRun) + { + await ImportNdjsonFileAsync( + fhirClient, + mergedObjectUrl, + currentMergedResources.Count ); - if (!IsDryRun) + // checkpoint against the last processed object, not the merged object url + await CheckpointProgressAsync(minio, checkpointObjectName, lastProcessedObjectUrl); + } + else + { + log.LogInformation("In dry-run mode. Sleeping for 5s."); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + currentMergedResources.Clear(); + estimatedSizeInBytes = 0; + } + + log.LogInformation("Done importing."); + } + + private async Task ImportNdjsonFileAsync( + FhirClient fhirClient, + string objectUrl, + int resourceCountInFile + ) + { + // if merging is enabled, the objectUrl points to the merged NDJSON, + // otherwise it points to the current item in objectsToProcess. + var importParameters = CreateImportParameters(objectUrl); + + log.LogInformation("{ImportParameters}", await importParameters.ToJsonAsync()); + + log.LogInformation( + "Starting {PathlingServerBaseUrl}/$import for {ObjectUrl}", + PathlingServerBaseUrl, + objectUrl + ); + + var stopwatch = new Stopwatch(); + + stopwatch.Restart(); + var response = await RetryPipeline.ExecuteAsync(async token => + { + return await fhirClient.WholeSystemOperationAsync("import", importParameters); + }); + stopwatch.Stop(); + + importDurationHistogram + .WithLabels(ImportResourceType.ToString()) + .Observe(stopwatch.Elapsed.TotalSeconds); + + // resource import throughput + var resourcesPerSecond = resourceCountInFile / stopwatch.Elapsed.TotalSeconds; + + log.LogInformation("{ImportResponse}", response.ToJson()); + log.LogInformation( + "Import took {ImportDuration} for a bundle of {ResourceCountInFile}. {ResourcesPerSecond} resources/s", + stopwatch.Elapsed, + resourceCountInFile, + resourcesPerSecond + ); + + bundlesImportedCounter.WithLabels(ImportResourceType.ToString()).Inc(); + resourcesImportedCounter.WithLabels(ImportResourceType.ToString()).Inc(resourceCountInFile); + } + + private async Task CheckpointProgressAsync( + IMinioClient minio, + string checkpointObjectName, + string objectUrl + ) + { + log.LogInformation( + "Checkpointing progress '{ObjectUrl}' as '{S3BucketName}/{CheckpointObjectName}'", + objectUrl, + S3BucketName, + checkpointObjectName + ); + + var checkpoint = new ProgressCheckpoint() { LastProcessedObjectUrl = objectUrl }; + + var jsonString = JsonSerializer.Serialize(checkpoint); + var bytes = Encoding.UTF8.GetBytes(jsonString); + using var memoryStream = new MemoryStream(bytes); + + // persist progress + var putArgs = new PutObjectArgs() + .WithBucket(S3BucketName) + .WithObject(checkpointObjectName) + .WithContentType("application/json") + .WithStreamData(memoryStream) + .WithObjectSize(bytes.LongLength); + + await RetryPipeline.ExecuteAsync(async token => + { + await minio.PutObjectAsync(putArgs, token); + }); + } + + private async Task CountResourcesInNDJsonAsync(IMinioClient minio, string objectName) + { + var resourceCountInFile = 0; + var getArgs = new GetObjectArgs() + .WithBucket(S3BucketName) + .WithObject(objectName) + .WithCallbackStream( + async (stream, ct) => { - stopwatch.Restart(); - var response = await retryPipeline.ExecuteAsync(async token => + using var reader = new StreamReader(stream, Encoding.UTF8); + while (await reader.ReadLineAsync(ct) is not null) { - return await fhirClient.WholeSystemOperationAsync( - "import", - importParameters - ); - }); - stopwatch.Stop(); + resourceCountInFile++; + } + } + ); + await minio.GetObjectAsync(getArgs); + return resourceCountInFile; + } + + private Parameters CreateImportParameters(string objectUrl) + { + var parameter = new Parameters.ParameterComponent() + { + Name = "source", + Part = + [ + new Parameters.ParameterComponent() + { + Name = "resourceType", + Value = new Code(ImportResourceType.ToString()), + }, + new Parameters.ParameterComponent() { Name = "mode", Value = new Code("merge") }, + new Parameters.ParameterComponent() + { + Name = "url", + Value = new FhirUrl(objectUrl), + }, + ], + }; + + var importParameters = new Parameters(); + // for now, create one Import request per file. In the future, + // we might want to add multiple ndjson files at once in batches. + importParameters.Parameter.Add(parameter); + return importParameters; + } + + private async Task> GetItemsToProcessAfterCheckpointAsync( + IMinioClient minio, + List allItems, + string checkpointObjectName + ) + { + log.LogInformation( + "Reading last checkpoint file from {CheckpointObjectName}", + checkpointObjectName + ); - importDurationHistogram - .WithLabels(ImportResourceType.ToString()) - .Observe(stopwatch.Elapsed.TotalSeconds); + var lastProcessedFile = string.Empty; + // read the contents of the last checkpoint file + var getArgs = new GetObjectArgs() + .WithBucket(S3BucketName) + .WithObject(checkpointObjectName) + .WithCallbackStream( + async (stream, ct) => + { + using var reader = new StreamReader(stream, Encoding.UTF8); - // resource import throughput - var resourcesPerSecond = resourceCountInFile / stopwatch.Elapsed.TotalSeconds; + var checkpointJson = await reader.ReadToEndAsync(ct); + var checkpoint = JsonSerializer.Deserialize(checkpointJson); - log.LogInformation("{ImportResponse}", response.ToJson()); - log.LogInformation( - "Import took {ImportDuration} for a bundle of {ResourceCountInFile}. {ResourcesPerSecond} resources/s", - stopwatch.Elapsed, - resourceCountInFile, - resourcesPerSecond - ); - log.LogInformation( - "Checkpointing progress '{ObjectUrl}' as '{S3BucketName}/{CheckpointObjectName}'", - objectUrl, - S3BucketName, - checkpointObjectName - ); + log.LogInformation("Last checkpoint: {CheckpointJson}", checkpointJson); - var checkpoint = new ProgressCheckpoint() + if (checkpoint is not null) { - LastProcessedObjectUrl = objectUrl, - }; - var jsonString = JsonSerializer.Serialize(checkpoint); - var bytes = Encoding.UTF8.GetBytes(jsonString); - using var memoryStream = new MemoryStream(bytes); - - // persist progress - var putArgs = new PutObjectArgs() - .WithBucket(S3BucketName) - .WithObject(checkpointObjectName) - .WithContentType("application/json") - .WithStreamData(memoryStream) - .WithObjectSize(bytes.LongLength); - - stopwatch.Restart(); - await retryPipeline.ExecuteAsync(async token => + lastProcessedFile = checkpoint.LastProcessedObjectUrl; + } + else { - await minio.PutObjectAsync(putArgs, token); - }); - stopwatch.Stop(); - log.LogInformation( - "Persisting progress took {PutProgressDuration}", - stopwatch.Elapsed - ); - } - else - { - log.LogInformation("Running import in dry run mode. Waiting a few seconds."); - await Task.Delay(TimeSpan.FromSeconds(5)); + log.LogError("Failed to read checkpoint file: deserialized object is null"); + } + + await stream.DisposeAsync(); } + ); + try + { + await minio.GetObjectAsync(getArgs); + } + catch (ObjectNotFoundException e) + { + log.LogWarning(e, "Checkpoint object not found. Returning all objects to process."); + return allItems; + } - bundlesImportedCounter.WithLabels(ImportResourceType.ToString()).Inc(); - resourcesImportedCounter - .WithLabels(ImportResourceType.ToString()) - .Inc(resourceCountInFile); + if (string.IsNullOrEmpty(lastProcessedFile)) + { + throw new InvalidDataException( + "Failed to read last processed file. Contents are null or empty." + ); + } - importedCount++; - log.LogInformation( - "Imported {ImportedCount} / {ObjectsToProcessCount}", - importedCount, - objectsToProcessCount - ); + log.LogInformation("Continuing after {LastProcessedFile}", lastProcessedFile); - log.LogInformation( - "Sleeping after import for {SleepAfterImportSeconds} s", - SleepAfterImport.TotalSeconds - ); + var itemsAfterCheckpoint = allItems + .SkipWhile(item => $"s3://{S3BucketName}/{item.Key}" != lastProcessedFile) + // SkipWhile stops if we reach the lastProcessedFile, but includes the entry itself in the + // result, so we need to skip that as well. + // Ideally, we'd say `SkipWhile(item.key-timestamp <= lastProcessedFile-timestamp)`. + .Skip(1) + .ToList(); - await Task.Delay(SleepAfterImport); - } + return itemsAfterCheckpoint; + } + + private async Task PutMergedBundleAsync( + IMinioClient minio, + IDictionary mergedBundle + ) + { + var objectName = + $"{S3ObjectNamePrefix}{ImportResourceType}/_merged/bundle-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}.ndjson"; + + // a fairly naive in-memory implementation + using var memoryStream = new MemoryStream(); + using var writer = new StreamWriter(memoryStream, Encoding.UTF8); + + foreach (var kvp in mergedBundle) + { + await writer.WriteLineAsync(kvp.Value); } - log.LogInformation("Done importing."); + await writer.FlushAsync(); + + log.LogInformation( + "Uploading merged bundle with size {SizeInBytes} B as {ObjectName} to {S3BucketName}", + memoryStream.Length, + objectName, + S3BucketName + ); + + memoryStream.Position = 0; + + var putArgs = new PutObjectArgs() + .WithBucket(S3BucketName) + .WithObject(objectName) + .WithContentType("application/x-ndjson") + .WithStreamData(memoryStream) + .WithObjectSize(memoryStream.Length); + + if (!IsDryRun) + { + await RetryPipeline.ExecuteAsync(async token => + { + await minio.PutObjectAsync(putArgs, token); + }); + } + else + { + log.LogInformation( + "Running in dry-run mode. Not putting the merged bundle back in storage." + ); + } + + return objectName; } } diff --git a/src/PathlingS3Import/Commands/MergeCommand.cs b/src/PathlingS3Import/Commands/MergeCommand.cs deleted file mode 100644 index d3712cf..0000000 --- a/src/PathlingS3Import/Commands/MergeCommand.cs +++ /dev/null @@ -1,377 +0,0 @@ -using System.Collections.Concurrent; -using System.Reactive.Linq; -using System.Text; -using System.Text.Json; -using DotMake.CommandLine; -using Hl7.Fhir.Model; -using Hl7.Fhir.Serialization; -using Microsoft.Extensions.Logging; -using Minio; -using Minio.DataModel.Args; -using Prometheus.Client; -using Prometheus.Client.Collectors; - -namespace PathlingS3Import; - -[CliCommand( - Description = "Merges all FHIR resources in NDJSON files into larger ones up to the given size", - Parent = typeof(RootCommand) -)] -public partial class MergeCommand : CommandBase -{ - private readonly IMetricFamily> bundlesMergedCounter; - - private readonly ILogger log; - - private JsonSerializerOptions FhirJsonOptions { get; } = - new JsonSerializerOptions().ForFhir(ModelInfo.ModelInspector); - - [CliOption( - Description = "The maximum number of resources a merged bundle may contain. Bundles that are already larger than this value might be merged to files exceeding this value.", - Required = true - )] - public int MaxMergedBundleSize { get; set; } = 1; - - [CliOption( - Description = "The type of FHIR resources to merge. Sets the correct prefix for the resources folder." - )] - public ResourceType ResourceType { get; set; } = ResourceType.Patient; - - [CliOption(Description = "The maximum size of the merged bundle in bytes. Default: 1 GiB")] - public int MaxMergedBundleSizeInBytes { get; set; } = 1 * 1024 * 1024 * 1024; - - [CliOption( - Description = "Name of the merge checkpoint file", - Name = "--merge-checkpoint-file-name" - )] - public string CheckpointFileName { get; set; } = "_last-merge-checkpoint.json"; - - public MergeCommand() - { - log = LogFactory.CreateLogger(); - - var collectorRegistry = new CollectorRegistry(); - var metricFactory = new MetricFactory(collectorRegistry); - - bundlesMergedCounter = metricFactory.CreateCounter( - "pathlings3import_bundles_merged_total", - "Total number of bundles merged to larger ones by resource type.", - "resourceType" - ); - } - - public async System.Threading.Tasks.Task RunAsync() - { - log.LogInformation("Minio endpoint set to {S3Endpoint}", S3Endpoint); - var minio = new MinioClient() - .WithEndpoint(S3Endpoint) - .WithCredentials(S3AccessKey, S3SecretKey) - .Build(); - - var bucketExistsArgs = new BucketExistsArgs().WithBucket(S3BucketName); - bool found = await minio.BucketExistsAsync(bucketExistsArgs); - if (!found) - { - throw new ArgumentException($"Bucket {S3BucketName} doesn't exist."); - } - - var prefix = $"{S3ObjectNamePrefix}{ResourceType}/"; - - log.LogInformation( - "Listing objects in {S3BaseUrl}/{S3BucketName}/{Prefix}.", - minio.Config.BaseUrl, - S3BucketName, - prefix - ); - - var listArgs = new ListObjectsArgs() - .WithBucket(S3BucketName) - .WithPrefix(prefix) - .WithRecursive(false); - - var allObjects = - minio.ListObjectsEnumAsync(listArgs) - ?? throw new InvalidOperationException("observable for listing buckets is null"); - - log.LogInformation( - "Found a total of {ObjectCount} matching objects. Ordering by timestamp ascending", - await allObjects.CountAsync() - ); - - var objectsToProcess = await allObjects - // skip over the checkpoint file (or anything that isn't ndjson) - .Where(o => o.Key.EndsWith(".ndjson")) - .OrderBy(o => - { - var match = BundleObjectNameRegex().Match(o.Key); - if (match.Success) - { - return Convert.ToDouble(match.Groups["timestamp"].Value); - } - - throw new InvalidOperationException( - $"allObjects contains an item whose key doesn't match the regex: {o.Key}" - ); - }) - .ToListAsync(); - - var checkpointObjectName = $"{prefix}{CheckpointFileName}"; - - log.LogInformation( - "Name of the current progress checkpoint object set to {CheckpointObjectName}.", - checkpointObjectName - ); - - if (IsContinueFromLastCheckpointEnabled) - { - log.LogInformation( - "Reading last checkpoint file from {CheckpointObjectName}", - checkpointObjectName - ); - - var lastProcessedFile = string.Empty; - // read the contents of the last checkpoint file - var getArgs = new GetObjectArgs() - .WithBucket(S3BucketName) - .WithObject(checkpointObjectName) - .WithCallbackStream( - async (stream, ct) => - { - using var reader = new StreamReader(stream, Encoding.UTF8); - - var checkpointJson = await reader.ReadToEndAsync(ct); - var checkpoint = JsonSerializer.Deserialize( - checkpointJson - ); - - log.LogInformation("Last checkpoint: {CheckpointJson}", checkpointJson); - - if (checkpoint is not null) - { - lastProcessedFile = checkpoint.LastProcessedObjectUrl; - } - else - { - log.LogError( - "Failed to read checkpoint file: deserialized object is null" - ); - } - - await stream.DisposeAsync(); - } - ); - await minio.GetObjectAsync(getArgs); - - if (string.IsNullOrEmpty(lastProcessedFile)) - { - throw new InvalidDataException( - "Failed to read last processed file. Contents are null or empty." - ); - } - - log.LogInformation("Continuing after {LastProcessedFile}", lastProcessedFile); - - objectsToProcess = objectsToProcess - .SkipWhile(item => $"s3://{S3BucketName}/{item.Key}" != lastProcessedFile) - // SkipWhile stops if we reach the lastProcessedFile, but includes the entry itself in the - // result, so we need to skip that as well. - // Ideally, we'd say `SkipWhile(item.key-timestamp <= lastProcessedFile-timestamp)`. - .Skip(1) - .ToList(); - - log.LogInformation("Listing actual objects to process"); - } - - var objectsToProcessCount = objectsToProcess.Count; - log.LogInformation("Actually processing {ObjectsToProcessCount}", objectsToProcessCount); - - var currentMergedResources = new ConcurrentDictionary(); - var estimatedSizeInBytes = 0; - var processedCount = 0; - string lastProcessedObjectUrl = string.Empty; - - foreach (var item in objectsToProcess) - { - var objectUrl = $"s3://{S3BucketName}/{item.Key}"; - - lastProcessedObjectUrl = objectUrl; - - using (log.BeginScope("[Merging ndjson file {NdjsonObjectUrl}]", objectUrl)) - { - var resourceCountInFile = 0; - var getArgs = new GetObjectArgs() - .WithBucket(S3BucketName) - .WithObject(item.Key) - .WithCallbackStream( - async (stream, ct) => - { - using var reader = new StreamReader(stream, Encoding.UTF8); - while (await reader.ReadLineAsync(ct) is { } line) - { - var resource = JsonSerializer.Deserialize( - line, - FhirJsonOptions - ); - - if (resource is null) - { - log.LogWarning("Read a resource that is null"); - continue; - } - // adds or updates the resource by its id in the dictionary. - // store the plaintext resource to avoid serializing again later - currentMergedResources[resource.Id] = line; - estimatedSizeInBytes += Encoding.UTF8.GetByteCount(line); - resourceCountInFile++; - } - } - ); - await minio.GetObjectAsync(getArgs); - - log.LogInformation( - "{ObjectUrl} contains {ResourceCount} resources. " - + "Current bundle total so far: {CurrentMergedResourcesCount} of {MaxMergedBundleSize}", - objectUrl, - resourceCountInFile, - currentMergedResources.Count, - MaxMergedBundleSize - ); - - if ( - currentMergedResources.Count >= MaxMergedBundleSize - || estimatedSizeInBytes >= MaxMergedBundleSizeInBytes - ) - { - log.LogInformation( - "Created merged bundle of {Count} resources. " - + "Estimated size: {EstimatedSizeInBytes} B ({EstimatedSizeInMebiBytes} MiB). " - + "Limit: {MaxMergedBundleSizeInBytes} B ({MaxMergedBundleSizeInMebiBytes} MiB)", - currentMergedResources.Count, - estimatedSizeInBytes, - estimatedSizeInBytes / 1024 / 1024, - MaxMergedBundleSizeInBytes, - MaxMergedBundleSizeInBytes / 1024 / 1024 - ); - - await PutMergedBundleAsync(minio, currentMergedResources); - currentMergedResources.Clear(); - estimatedSizeInBytes = 0; - await CheckpointProgressAsync(minio, checkpointObjectName, objectUrl); - } - - processedCount++; - bundlesMergedCounter.WithLabels(ResourceType.ToString()).Inc(); - log.LogInformation( - "Merged {ProcessedCount} of {ObjectsToProcess} ", - processedCount, - objectsToProcess.Count - ); - } - } - - if (!currentMergedResources.IsEmpty) - { - log.LogInformation( - "Resources remaining: {Count}. Uploading as smaller bundle.", - currentMergedResources.Count - ); - - await PutMergedBundleAsync(minio, currentMergedResources); - currentMergedResources.Clear(); - estimatedSizeInBytes = 0; - await CheckpointProgressAsync(minio, checkpointObjectName, lastProcessedObjectUrl); - } - } - - private async System.Threading.Tasks.Task PutMergedBundleAsync( - IMinioClient minio, - IDictionary mergedBundle - ) - { - var objectName = - $"{S3ObjectNamePrefix}merged/{ResourceType}/bundle-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}.ndjson"; - - // a fairly naive in-memory implementation - using var memoryStream = new MemoryStream(); - using var writer = new StreamWriter(memoryStream, Encoding.UTF8); - - foreach (var kvp in mergedBundle) - { - await writer.WriteLineAsync(kvp.Value); - } - - await writer.FlushAsync(); - - log.LogInformation( - "Uploading merged bundle with size {SizeInBytes} as {ObjectName} to {S3BucketName}", - memoryStream.Length, - objectName, - S3BucketName - ); - - memoryStream.Position = 0; - - var putArgs = new PutObjectArgs() - .WithBucket(S3BucketName) - .WithObject(objectName) - .WithContentType("application/x-ndjson") - .WithStreamData(memoryStream) - .WithObjectSize(memoryStream.Length); - - if (!IsDryRun) - { - await RetryPipeline.ExecuteAsync(async token => - { - await minio.PutObjectAsync(putArgs, token); - }); - } - else - { - log.LogInformation( - "Running in dry-run mode. Not putting the merged bundle back in storage." - ); - } - } - - private async System.Threading.Tasks.Task CheckpointProgressAsync( - IMinioClient minio, - string checkpointObjectName, - string lastProcessedObjectUrl - ) - { - log.LogInformation( - "Checkpointing progress '{ObjectUrl}' as '{S3BucketName}/{CheckpointObjectName}'", - lastProcessedObjectUrl, - S3BucketName, - checkpointObjectName - ); - - var checkpoint = new ProgressCheckpoint() - { - LastProcessedObjectUrl = lastProcessedObjectUrl, - }; - var jsonString = JsonSerializer.Serialize(checkpoint); - var bytes = Encoding.UTF8.GetBytes(jsonString); - using var memoryStream = new MemoryStream(bytes); - - // persist progress - var putArgs = new PutObjectArgs() - .WithBucket(S3BucketName) - .WithObject(checkpointObjectName) - .WithContentType("application/json") - .WithStreamData(memoryStream) - .WithObjectSize(bytes.LongLength); - - if (!IsDryRun) - { - await RetryPipeline.ExecuteAsync(async token => - { - await minio.PutObjectAsync(putArgs, token); - }); - } - else - { - log.LogInformation("Running in dry-run mode. Not updating the checkpoint file."); - } - } -}