Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More diagnostics tooling #182

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/Metering.BaseTypes/MeteringUpdateEvent.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ type MeteringUpdateEvent =
| RemoveUnprocessedMessages _ -> ""
| Ping x -> x.PartitionID.value

member this.MarketplaceResourceID
with get() : MarketplaceResourceId option =
match this with
| SubscriptionPurchased x -> x.Subscription.MarketplaceResourceId |> Some
| SubscriptionDeletion x -> x |> Some
| UsageReported x -> x.MarketplaceResourceId |> Some
| UsageSubmittedToAPI x ->
match x.Result with
| Ok x -> x.RequestData.MarketplaceResourceId |> Some
| Error e ->
match e with
| DuplicateSubmission d -> d.FailedRequest.MarketplaceResourceId |> Some
| ResourceNotFound d -> d.RequestData.MarketplaceResourceId |> Some
| Expired d -> d.RequestData.MarketplaceResourceId |> Some
| Generic d -> d.RequestData.MarketplaceResourceId |> Some
| UnprocessableMessage _ -> None
| RemoveUnprocessedMessages _ -> None
| Ping x -> None

override this.ToString() =
match this with
| SubscriptionPurchased x -> x.ToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ open System.Text.RegularExpressions
// and stores the events and the state after applying a certain event, as JSON files in the file system.
//

let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: EventData -> 'TEvent) (partitionId: PartitionID) (cancellationToken: CancellationToken) : IEnumerable<EventHubEvent<'TEvent>> =
let readAllEvents<'TEvent> (captureDirectory: string) (blobPrefix: string) (convert: EventData -> 'TEvent) (partitionId: PartitionID) (cancellationToken: CancellationToken) : IEnumerable<EventHubEvent<'TEvent>> =
seq {
// p0--2023-08-22--19-57-36.avro
let extractTime (captureFileNameFormat: string) (partitionId: PartitionID) (blobName: string) : MeteringDateTime option =
Expand Down Expand Up @@ -45,7 +45,7 @@ let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: Ev
let time = extractTime "p{PartitionId}--{Year}-{Month}-{Day}--{Hour}-{Minute}-{Second}" partitionId

let files =
Directory.GetFiles(path = directory, searchPattern = "*.avro") // all avro files
Directory.GetFiles(path = captureDirectory, searchPattern = "*.avro") // all avro files
|> Array.map (fun filename -> (filename, time filename)) // extract time from filename
|> Array.choose (fun (filename, time) -> time |> Option.map (fun t -> (filename, t))) // filter out files without time
|> Seq.sortBy (fun (_, time) -> time.LocalDateTime) // sort by time
Expand All @@ -61,34 +61,137 @@ let readAllEvents<'TEvent> (directory: string) (blobPrefix: string) (convert: Ev
|> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition e) None blobName )
}

let recreateLatestState directory partitionId stateFile =
let recreateLatestState captureDirectory partitionId stateFile =
File.WriteAllText(
path = stateFile,
contents =
(readAllEvents<MeteringUpdateEvent>
directory
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.scan
MeterCollectionLogic.handleMeteringEvent
MeterCollection.Empty
//|> Seq.map (fun state ->
// match state.LastUpdate with
// | None -> ()
// | Some mu ->
// if (int mu.SequenceNumber) % 100 = 0
// then printfn "%d" (mu.SequenceNumber)
// else ()
// state
//)
|> Seq.map (fun state ->
match state.LastUpdate with
| None -> ()
| Some mu ->
if (int mu.SequenceNumber) % 1000 = 0
then printf "%d " (mu.SequenceNumber)
else ()
state
)
|> Seq.last
|> Json.toStr 2))
|> Json.toStr 0))

// Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory
let directory = @"..\..\..\..\..\..\testcaptures"
let partitionId = PartitionID.create "0"
let stateFile = $"..\..\..\..\..\..\state{ partitionId.value }.json"
let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId eventDirectory =
if Directory.Exists(eventDirectory)
then
Directory.CreateDirectory(path = eventDirectory) |> ignore

recreateLatestState directory partitionId stateFile
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.filter (fun e ->
match e.EventData with
| UsageReported _ -> false
| Ping _ -> false
| _ -> true)
|> Seq.iter(fun e ->
let filename = $"{eventDirectory}\\p{e.MessagePosition.PartitionID.value}--{e.MessagePosition.SequenceNumber}--{e.MessagePosition.PartitionTimestamp |> MeteringDateTime.blobName}--{e.EventData.MessageType}.json"
let content = e.EventData |> Json.toStr 2

File.WriteAllText(
path = filename,
contents = content)
)

let printEventsWithoutPartitionKey captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.filter (fun e -> e.MessagePosition.PartitionID.value = null)
|> Seq.iter(fun e ->
printf "%d " e.MessagePosition.SequenceNumber
)

let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.exists (fun e -> e.MessagePosition.PartitionID.value <> null)

let numberOfMessages captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.length

let marketplaceResourceIDs captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.choose (fun e -> e.EventData.MarketplaceResourceID)
|> Seq.distinct
|> Seq.sort

let numberOfUnprocessedMessagesInStateFile stateFile =
let state = File.ReadAllText(stateFile) |> Json.fromStr<MeterCollection>
state.UnprocessableMessages |> List.length

let numberOfPartitions = 3
let captureDirectory = @"..\..\..\..\..\..\testcaptures"

[ 0 .. numberOfPartitions - 1 ]
|> Seq.map(sprintf "%d")
|> Seq.map(PartitionID.create)
|> Seq.iter(fun partitionId ->
// Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory

//// Start with an empty state and apply all events, to get the latest state
let stateFile = $"{captureDirectory}\state\state-p{ partitionId.value }.json"

// recreateLatestState captureDirectory partitionId stateFile

// printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile)

//// Expand the capture to individual JSON files, one for each event
//expandCaptureToEventsInIndividualJsonFiles
// captureDirectory
// partitionId
// $"{captureDirectory}\events\{partitionId.value}"

//// Print the sequence numbers of all events that do not have a partition key
//printEventsWithoutPartitionKey
// captureDirectory
// partitionId

//// Check if there are any events that do not have a partition key
//if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId
//then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key"
//else printfn $"No event in partition {partitionId.value} has a partition key"


// printfn $"Partition {partitionId.value} has {numberOfMessages captureDirectory partitionId} messages"

// Print all marketplace resource IDs by partition
marketplaceResourceIDs captureDirectory partitionId
|> Seq.iter (fun id -> printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}" )
)