From 39d9c5976050ae25c370644bfddc8691f62475ea Mon Sep 17 00:00:00 2001 From: "Dr. Christian Geuer-Pollmann" Date: Thu, 14 Sep 2023 22:02:40 +0200 Subject: [PATCH] Bugfix and cleanup --- .../RehydratedFromCaptureEventData.fs | 6 +- ...processLocalEventHubCaptureFilesProgram.fs | 113 ++++++++++++------ 2 files changed, 78 insertions(+), 41 deletions(-) diff --git a/src/Metering.Runtime/RehydratedFromCaptureEventData.fs b/src/Metering.Runtime/RehydratedFromCaptureEventData.fs index 80a1242..80b06cd 100644 --- a/src/Metering.Runtime/RehydratedFromCaptureEventData.fs +++ b/src/Metering.Runtime/RehydratedFromCaptureEventData.fs @@ -17,7 +17,7 @@ type RehydratedFromCaptureEventData( member this.BlobName = blobName module RehydratedFromCaptureEventData = - let getMessagePosition (e: EventData) : MessagePosition = - { PartitionID = PartitionID.create e.PartitionKey + let getMessagePosition (partitionId: PartitionID) (e: EventData) : MessagePosition = + { PartitionID = partitionId SequenceNumber = e.SequenceNumber - PartitionTimestamp = MeteringDateTime.fromDateTimeOffset e.EnqueuedTime} \ No newline at end of file + PartitionTimestamp = MeteringDateTime.fromDateTimeOffset e.EnqueuedTime} diff --git a/src/Tools/ReprocessLocalEventHubCaptureFiles/ReprocessLocalEventHubCaptureFilesProgram.fs b/src/Tools/ReprocessLocalEventHubCaptureFiles/ReprocessLocalEventHubCaptureFilesProgram.fs index d948336..5f4900d 100644 --- a/src/Tools/ReprocessLocalEventHubCaptureFiles/ReprocessLocalEventHubCaptureFilesProgram.fs +++ b/src/Tools/ReprocessLocalEventHubCaptureFiles/ReprocessLocalEventHubCaptureFilesProgram.fs @@ -1,4 +1,7 @@ -open System +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +open System open System.Collections.Generic open System.IO open System.Threading @@ -58,33 +61,38 @@ let readAllEvents<'TEvent> (captureDirectory: string) (blobPrefix: string) (conv yield! stream |> CaptureProcessor.ReadEventDataFromAvroStream blobName - |> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition e) None blobName ) + |> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition partitionId e) None blobName ) } let recreateLatestState captureDirectory partitionId stateFile = - File.WriteAllText( - path = stateFile, - contents = - (readAllEvents + let lastState = + readAllEvents captureDirectory "whatever" CaptureProcessor.toMeteringUpdateEvent partitionId CancellationToken.None - |> Seq.scan - MeterCollectionLogic.handleMeteringEvent - MeterCollection.Empty - |> Seq.map (fun state -> - match state.LastUpdate with - | None -> () - | Some mu -> - if (int mu.SequenceNumber) % 1000 = 0 - then printf "%d " (mu.SequenceNumber) - else () - state - ) - |> Seq.last - |> Json.toStr 0)) + |> Seq.scan + MeterCollectionLogic.handleMeteringEvent + MeterCollection.Empty + |> Seq.map (fun state -> + match state.LastUpdate with + | None -> () + | Some mu -> + if (int mu.SequenceNumber) % 1000 = 0 + then printf "%d " (mu.SequenceNumber) + else () + state + ) + |> Seq.last + + File.WriteAllText( + path = stateFile, + contents = (lastState |> Json.toStr 0)) + + printfn "Created state file %s" (new FileInfo(stateFile)).FullName + + lastState let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId eventDirectory = if Directory.Exists(eventDirectory) @@ -98,6 +106,7 @@ let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId even partitionId CancellationToken.None |> Seq.filter (fun e -> + // Skip UsageReported and Ping events match e.EventData with | UsageReported _ -> false | Ping _ -> false @@ -111,6 +120,7 @@ let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId even contents = content) ) +/// Print the sequence numbers of all events that do not have a partition key let printEventsWithoutPartitionKey captureDirectory partitionId = readAllEvents captureDirectory @@ -123,7 +133,8 @@ let printEventsWithoutPartitionKey captureDirectory partitionId = printf "%d " e.MessagePosition.SequenceNumber ) -let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool = +/// Check if there are any events that do have a partition key +let oneOrMoreEventsHaveAPartitionID (captureDirectory: string) (partitionId: PartitionID) : bool = readAllEvents captureDirectory "whatever" @@ -132,7 +143,7 @@ let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool = CancellationToken.None |> Seq.exists (fun e -> e.MessagePosition.PartitionID.value <> null) -let numberOfMessages captureDirectory partitionId = +let numberOfMessages (captureDirectory: string) (partitionId: PartitionID) : int = readAllEvents captureDirectory "whatever" @@ -141,7 +152,8 @@ let numberOfMessages captureDirectory partitionId = CancellationToken.None |> Seq.length -let marketplaceResourceIDs captureDirectory partitionId = +/// Retrieve all unique marketplace resource IDs from the capture +let marketplaceResourceIDs (captureDirectory: string) (partitionId: PartitionID) = readAllEvents captureDirectory "whatever" @@ -152,25 +164,44 @@ let marketplaceResourceIDs captureDirectory partitionId = |> Seq.distinct |> Seq.sort -let numberOfUnprocessedMessagesInStateFile stateFile = - let state = File.ReadAllText(stateFile) |> Json.fromStr - state.UnprocessableMessages |> List.length +let numberOfUnprocessedMessagesInStateFile (stateFile: string) = + File.ReadAllText(stateFile) + |> Json.fromStr + |> (fun state -> state.UnprocessableMessages) + |> List.length + + + let numberOfPartitions = 3 -let captureDirectory = @"..\..\..\..\..\..\testcaptures" +let captureDirectory = @"..\..\..\..\..\..\testcaptures" // This folder isn't checked in, so you need to create it yourself [ 0 .. numberOfPartitions - 1 ] |> Seq.map(sprintf "%d") |> Seq.map(PartitionID.create) |> Seq.iter(fun partitionId -> + + // + // The code snippets below are independent of each other, so you can comment out the ones you don't need + // Also, they iterate over all Avro files in the capture directory (again and again), so it's slightly inefficient + // + printfn $"Processing partition {partitionId.value}" + // Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory - //// Start with an empty state and apply all events, to get the latest state + // Start with an empty state and apply all events, to get the latest state let stateFile = $"{captureDirectory}\state\state-p{ partitionId.value }.json" - // recreateLatestState captureDirectory partitionId stateFile + let lastState = + recreateLatestState captureDirectory partitionId stateFile - // printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile) + // Print the number of unprocessed messages in the state file + lastState + |> (fun state -> state.UnprocessableMessages) + |> List.length + |> printfn "Statefile %s has %d unprocessed messages" stateFile + + //printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile) //// Expand the capture to individual JSON files, one for each event //expandCaptureToEventsInIndividualJsonFiles @@ -179,19 +210,25 @@ let captureDirectory = @"..\..\..\..\..\..\testcaptures" // $"{captureDirectory}\events\{partitionId.value}" //// Print the sequence numbers of all events that do not have a partition key - //printEventsWithoutPartitionKey - // captureDirectory - // partitionId + printEventsWithoutPartitionKey + captureDirectory + partitionId //// Check if there are any events that do not have a partition key - //if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId - //then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key" - //else printfn $"No event in partition {partitionId.value} has a partition key" - + if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId + then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key" + else printfn $"No event in partition {partitionId.value} has a partition key" // printfn $"Partition {partitionId.value} has {numberOfMessages captureDirectory partitionId} messages" // Print all marketplace resource IDs by partition marketplaceResourceIDs captureDirectory partitionId - |> Seq.iter (fun id -> printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}" ) + |> Seq.map (fun id -> + printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}" + id + ) + |> Seq.length + |> (fun num -> + printfn $"Partition {partitionId.value} has {num} different " + ) ) \ No newline at end of file