Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix and cleanup #183

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Metering.Runtime/RehydratedFromCaptureEventData.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type RehydratedFromCaptureEventData(
member this.BlobName = blobName

module RehydratedFromCaptureEventData =
let getMessagePosition (e: EventData) : MessagePosition =
{ PartitionID = PartitionID.create e.PartitionKey
let getMessagePosition (partitionId: PartitionID) (e: EventData) : MessagePosition =
{ PartitionID = partitionId
SequenceNumber = e.SequenceNumber
PartitionTimestamp = MeteringDateTime.fromDateTimeOffset e.EnqueuedTime}
PartitionTimestamp = MeteringDateTime.fromDateTimeOffset e.EnqueuedTime}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
open System
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

open System
open System.Collections.Generic
open System.IO
open System.Threading
Expand Down Expand Up @@ -58,33 +61,38 @@ let readAllEvents<'TEvent> (captureDirectory: string) (blobPrefix: string) (conv
yield!
stream
|> CaptureProcessor.ReadEventDataFromAvroStream blobName
|> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition e) None blobName )
|> Seq.map (fun e -> EventHubEvent.createFromCapture (convert e) (RehydratedFromCaptureEventData.getMessagePosition partitionId e) None blobName )
}

let recreateLatestState captureDirectory partitionId stateFile =
File.WriteAllText(
path = stateFile,
contents =
(readAllEvents<MeteringUpdateEvent>
let lastState =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
CaptureProcessor.toMeteringUpdateEvent
partitionId
CancellationToken.None
|> Seq.scan
MeterCollectionLogic.handleMeteringEvent
MeterCollection.Empty
|> Seq.map (fun state ->
match state.LastUpdate with
| None -> ()
| Some mu ->
if (int mu.SequenceNumber) % 1000 = 0
then printf "%d " (mu.SequenceNumber)
else ()
state
)
|> Seq.last
|> Json.toStr 0))
|> Seq.scan
MeterCollectionLogic.handleMeteringEvent
MeterCollection.Empty
|> Seq.map (fun state ->
match state.LastUpdate with
| None -> ()
| Some mu ->
if (int mu.SequenceNumber) % 1000 = 0
then printf "%d " (mu.SequenceNumber)
else ()
state
)
|> Seq.last

File.WriteAllText(
path = stateFile,
contents = (lastState |> Json.toStr 0))

printfn "Created state file %s" (new FileInfo(stateFile)).FullName

lastState

let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId eventDirectory =
if Directory.Exists(eventDirectory)
Expand All @@ -98,6 +106,7 @@ let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId even
partitionId
CancellationToken.None
|> Seq.filter (fun e ->
// Skip UsageReported and Ping events
match e.EventData with
| UsageReported _ -> false
| Ping _ -> false
Expand All @@ -111,6 +120,7 @@ let expandCaptureToEventsInIndividualJsonFiles captureDirectory partitionId even
contents = content)
)

/// Print the sequence numbers of all events that do not have a partition key
let printEventsWithoutPartitionKey captureDirectory partitionId =
readAllEvents<MeteringUpdateEvent>
captureDirectory
Expand All @@ -123,7 +133,8 @@ let printEventsWithoutPartitionKey captureDirectory partitionId =
printf "%d " e.MessagePosition.SequenceNumber
)

let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool =
/// Check if there are any events that do have a partition key
let oneOrMoreEventsHaveAPartitionID (captureDirectory: string) (partitionId: PartitionID) : bool =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
Expand All @@ -132,7 +143,7 @@ let oneOrMoreEventsHaveAPartitionID captureDirectory partitionId : bool =
CancellationToken.None
|> Seq.exists (fun e -> e.MessagePosition.PartitionID.value <> null)

let numberOfMessages captureDirectory partitionId =
let numberOfMessages (captureDirectory: string) (partitionId: PartitionID) : int =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
Expand All @@ -141,7 +152,8 @@ let numberOfMessages captureDirectory partitionId =
CancellationToken.None
|> Seq.length

let marketplaceResourceIDs captureDirectory partitionId =
/// Retrieve all unique marketplace resource IDs from the capture
let marketplaceResourceIDs (captureDirectory: string) (partitionId: PartitionID) =
readAllEvents<MeteringUpdateEvent>
captureDirectory
"whatever"
Expand All @@ -152,25 +164,44 @@ let marketplaceResourceIDs captureDirectory partitionId =
|> Seq.distinct
|> Seq.sort

let numberOfUnprocessedMessagesInStateFile stateFile =
let state = File.ReadAllText(stateFile) |> Json.fromStr<MeterCollection>
state.UnprocessableMessages |> List.length
let numberOfUnprocessedMessagesInStateFile (stateFile: string) =
File.ReadAllText(stateFile)
|> Json.fromStr<MeterCollection>
|> (fun state -> state.UnprocessableMessages)
|> List.length




let numberOfPartitions = 3
let captureDirectory = @"..\..\..\..\..\..\testcaptures"
let captureDirectory = @"..\..\..\..\..\..\testcaptures" // This folder isn't checked in, so you need to create it yourself

[ 0 .. numberOfPartitions - 1 ]
|> Seq.map(sprintf "%d")
|> Seq.map(PartitionID.create)
|> Seq.iter(fun partitionId ->

//
// The code snippets below are independent of each other, so you can comment out the ones you don't need
// Also, they iterate over all Avro files in the capture directory (again and again), so it's slightly inefficient
//
printfn $"Processing partition {partitionId.value}"

// Processing partion 0 consumed 1.0 GB, partition 1 consumed 3.5 GB, and partition 2 consumed 1.7 GB of memory

//// Start with an empty state and apply all events, to get the latest state
// Start with an empty state and apply all events, to get the latest state
let stateFile = $"{captureDirectory}\state\state-p{ partitionId.value }.json"

// recreateLatestState captureDirectory partitionId stateFile
let lastState =
recreateLatestState captureDirectory partitionId stateFile

// printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile)
// Print the number of unprocessed messages in the state file
lastState
|> (fun state -> state.UnprocessableMessages)
|> List.length
|> printfn "Statefile %s has %d unprocessed messages" stateFile

//printfn "Statefile %s has %d unprocessed messages" stateFile (numberOfUnprocessedMessagesInStateFile stateFile)

//// Expand the capture to individual JSON files, one for each event
//expandCaptureToEventsInIndividualJsonFiles
Expand All @@ -179,19 +210,25 @@ let captureDirectory = @"..\..\..\..\..\..\testcaptures"
// $"{captureDirectory}\events\{partitionId.value}"

//// Print the sequence numbers of all events that do not have a partition key
//printEventsWithoutPartitionKey
// captureDirectory
// partitionId
printEventsWithoutPartitionKey
captureDirectory
partitionId

//// Check if there are any events that do not have a partition key
//if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId
//then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key"
//else printfn $"No event in partition {partitionId.value} has a partition key"

if oneOrMoreEventsHaveAPartitionID captureDirectory partitionId
then printfn $"Partition {partitionId.value} has some events that *DO* have a partition key"
else printfn $"No event in partition {partitionId.value} has a partition key"

// printfn $"Partition {partitionId.value} has {numberOfMessages captureDirectory partitionId} messages"

// Print all marketplace resource IDs by partition
marketplaceResourceIDs captureDirectory partitionId
|> Seq.iter (fun id -> printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}" )
|> Seq.map (fun id ->
printfn $"Partition {partitionId.value} has marketplace resource ID {id.ToString()}"
id
)
|> Seq.length
|> (fun num ->
printfn $"Partition {partitionId.value} has {num} different "
)
)