Skip to content

Commit

Permalink
refactored RealtimeStats (improved performance for cluster with many …
Browse files Browse the repository at this point in the history
…agents)
  • Loading branch information
AntyaDev committed Jul 4, 2022
1 parent b077265 commit e8a18c7
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 98 deletions.
12 changes: 0 additions & 12 deletions src/NBomber/Contracts.fs
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ type StepResponse = {
[<DataMember(Order = 1)>] ClientResponse: Response
[<DataMember(Order = 2)>] EndTimeMs: float
[<DataMember(Order = 3)>] LatencyMs: float
} with

static member clean (response: StepResponse) =
let clientResponse = { response.ClientResponse with Message = ""; Payload = null }
{ response with ClientResponse = clientResponse }

[<CLIMutable>]
[<DataContract>]
type ScenarioRawStats = {
[<DataMember(Order = 0)>] ScenarioName: string
[<DataMember(Order = 1)>] StepResponses: StepResponse list
[<DataMember(Order = 2)>] Duration: TimeSpan
}

// we keep ClientFactorySettings settings here instead of take them from ScenariosSettings
Expand Down
22 changes: 2 additions & 20 deletions src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ open FSharp.Control.Reactive

open NBomber
open NBomber.Contracts
open NBomber.Contracts.Internal
open NBomber.Contracts.Stats
open NBomber.Domain
open NBomber.Domain.DomainTypes
Expand Down Expand Up @@ -97,15 +96,12 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
let _tcs = TaskCompletionSource()
let _randomGen = Random()

// we use scenarioClusterCount only to display on console the correct numbers (awareness of cluster execution)
let getConstantActorCount () = _constantScheduler.ScheduledActorCount * scenarioClusterCount
let getOneTimeActorCount () = _oneTimeScheduler.ScheduledActorCount * scenarioClusterCount

let getCurrentSimulationStats () =
LoadTimeLine.createSimulationStats(
_currentSimulation,
getConstantActorCount(),
getOneTimeActorCount()
)
LoadTimeLine.createSimulationStats(_currentSimulation, _constantScheduler.ScheduledActorCount, _oneTimeScheduler.ScheduledActorCount)

let prepareForRealtimeStats () =
_cachedSimulationStats <- getCurrentSimulationStats()
Expand All @@ -123,14 +119,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
_scnDep.ScenarioStatsActor.Publish FlushTempBuffer
reply.Task

let getRawStats (duration: TimeSpan) =
_scnDep.ScenarioStatsActor.AllRawStats.TryFind duration

let getRemainedRawStats () =
let reply = TaskCompletionSource<ScenarioRawStats>()
_scnDep.ScenarioStatsActor.Publish(GetRemainedRawStats reply)
reply.Task

let getFinalStats () =
let simulationStats = getCurrentSimulationStats()
let duration = Scenario.getExecutedDuration _scenario
Expand Down Expand Up @@ -207,7 +195,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
member _.EventStream = _eventStream :> IObservable<_>
member _.Scenario = _scenario
member _.AllRealtimeStats = _scnDep.ScenarioStatsActor.AllRealtimeStats
member _.AllRawStats = _scnDep.ScenarioStatsActor.AllRawStats

member _.Start() = start()
member _.Stop() = stop()
Expand All @@ -216,11 +203,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
member _.PrepareForRealtimeStats() = prepareForRealtimeStats()
member _.CommitRealtimeStats(duration) = commitRealtimeStats duration
member _.BuildRealtimeStats(duration) = buildRealtimeStats duration

member _.GetRawStats(duration) = getRawStats duration
member _.DelRawStats(duration) = _scnDep.ScenarioStatsActor.Publish(DelRawStats duration)
member _.GetRemainedRawStats() = getRemainedRawStats()

member _.GetFinalStats() = getFinalStats()

interface IDisposable with
Expand Down
66 changes: 32 additions & 34 deletions src/NBomber/Domain/Stats/ScenarioStatsActor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ open NBomber.Domain.DomainTypes
open NBomber.Domain.Stats.Statistics

type ActorMessage =
| AddResponse of StepResponse
| AddFromAgent of StepResponse list
| AddResponse of StepResponse
| AddFromAgent of realtimeStats:ScenarioStats
| StartUseTempBuffer
| FlushTempBuffer
| BuildRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetRemainedRawStats of reply:TaskCompletionSource<ScenarioRawStats>
| DelRawStats of duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| BuildRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan

type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan, keepRawStats: bool) =
type ScenarioStatsActor(logger: ILogger,
scenario: Scenario,
reportingInterval: TimeSpan,
?mergeStatsFn: ScenarioStats list -> ScenarioStats) =

let _log = logger.ForContext<ScenarioStatsActor>()
let _allStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _intervalStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
let mutable _intervalRawStats = List.empty<StepResponse>
let mutable _allRealtimeStats = Map.empty<TimeSpan,ScenarioStats>
let mutable _allRawStats = Map.empty<TimeSpan,ScenarioRawStats>
let mutable _agentsRealtimeStats = List.empty<ScenarioStats>
let mutable _tempBuffer = List.empty<StepResponse>
let mutable _useTempBuffer = false

Expand All @@ -40,26 +40,17 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
_allStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
_intervalStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp

if keepRawStats then
let cleanedResp = StepResponse.clean resp
_intervalRawStats <- cleanedResp :: _intervalRawStats

let createRealtimeStats (simulationStats) (duration) (stepsData) =
ScenarioStats.create scenario stepsData simulationStats OperationType.Bombing %duration reportingInterval

let createFinalStats (simulationStats) (duration) (stepsData) =
ScenarioStats.create scenario stepsData simulationStats OperationType.Complete %duration duration

let addToCache (realtimeStats: ScenarioStats) =
let addToCacheAndReset (realtimeStats: ScenarioStats) =
_allRealtimeStats <- _allRealtimeStats.Add(realtimeStats.Duration, realtimeStats)
// reset interval steps data
_intervalStepsData <- Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())

if keepRawStats then
let rawStats = { ScenarioName = scenario.ScenarioName; StepResponses = _intervalRawStats; Duration = realtimeStats.Duration }
_allRawStats <- _allRawStats.Add(realtimeStats.Duration, rawStats)
// reset interval raw steps data
_intervalRawStats <- List.empty
_agentsRealtimeStats <- List.empty

let _actor = ActionBlock(fun msg ->
try
Expand All @@ -68,8 +59,8 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
if _useTempBuffer then _tempBuffer <- response :: _tempBuffer
else addResponse response

| AddFromAgent responses ->
responses |> List.iter addResponse
| AddFromAgent realtimeStats ->
_agentsRealtimeStats <- realtimeStats :: _agentsRealtimeStats

| StartUseTempBuffer ->
_useTempBuffer <- true
Expand All @@ -80,16 +71,23 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
_tempBuffer <- List.empty

| BuildRealtimeStats (reply, simulationStats, duration) ->
let realtimeStats = _intervalStepsData |> createRealtimeStats simulationStats duration
addToCache realtimeStats
reply.TrySetResult(realtimeStats) |> ignore
let cordStats = _intervalStepsData |> createRealtimeStats simulationStats duration

let allRealtimeStats =
if scenario.IsEnabled then cordStats :: _agentsRealtimeStats
else _agentsRealtimeStats

| GetRemainedRawStats reply ->
let rawStats = { ScenarioName = scenario.ScenarioName; StepResponses = _intervalRawStats; Duration = TimeSpan.MaxValue }
reply.TrySetResult(rawStats) |> ignore
if allRealtimeStats.Length > 0 then
let merged =
mergeStatsFn
|> Option.map(fun merge -> merge allRealtimeStats)
|> Option.defaultValue cordStats

| DelRawStats duration ->
_allRawStats <- _allRawStats.Remove duration
addToCacheAndReset merged
reply.TrySetResult(merged) |> ignore
else
addToCacheAndReset cordStats
reply.TrySetResult(cordStats) |> ignore

| GetFinalStats (reply, simulationStats, duration) ->
let finalStats = _allStepsData |> createFinalStats simulationStats duration
Expand All @@ -99,13 +97,13 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
)

member _.AllRealtimeStats = _allRealtimeStats
member _.AllRawStats = _allRawStats

[<MethodImpl(MethodImplOptions.AggressiveInlining)>]
member _.Publish(msg) = _actor.Post(msg) |> ignore

let createDefault (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = false)
ScenarioStatsActor(logger, scenario, reportingInterval)

let createWithRawStats (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = true)
let createForCoordinator (mergeStats: ScenarioStats list -> ScenarioStats)
(logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
ScenarioStatsActor(logger, scenario, reportingInterval, mergeStats)
1 change: 1 addition & 0 deletions src/NBomber/DomainServices/TestHost/ReportingManager.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ let getFinalStats (dep: IGlobalDependency)

let! scenarioStats =
schedulers
|> List.filter(fun x -> x.Scenario.IsEnabled)
|> List.map(fun x -> x.GetFinalStats())
|> Task.WhenAll

Expand Down
7 changes: 2 additions & 5 deletions src/NBomber/DomainServices/TestHost/TestHost.fs
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,8 @@ type internal TestHost(dep: IGlobalDependency,
let createStatsActor = createStatsActor |> Option.defaultValue ScenarioStatsActor.createDefault
createScenarioSchedulers scenarios operation getScenarioClusterCount createStatsActor getStepOrder execSteps

member _.GetRawStats(duration) =
_currentSchedulers |> List.map(fun x -> x.GetRawStats duration) |> Option.sequence

member _.DelRawStats(duration) =
_currentSchedulers |> List.iter(fun x -> x.DelRawStats duration)
member _.GetRealtimeStats(duration) =
_currentSchedulers |> List.map(fun x -> x.AllRealtimeStats.TryFind duration) |> Option.sequence

member _.RunSession(sessionArgs: SessionArgs) = taskResult {
let! initializedScenarios = this.StartInit sessionArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let internal baseActorDep = {
ScenarioCancellationToken = new CancellationTokenSource()
ScenarioTimer = Stopwatch()
ScenarioOperation = ScenarioOperation.Bombing
ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval, keepRawStats = false)
ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval)
ExecStopCommand = fun _ -> ()
}
GetStepOrder = Scenario.getStepOrder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ let internal baseActorDep = {
ScenarioCancellationToken = new CancellationTokenSource()
ScenarioTimer = Stopwatch()
ScenarioOperation = ScenarioOperation.Bombing
ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval, keepRawStats = false)
ScenarioStatsActor = ScenarioStatsActor(logger, baseScenario, Constants.DefaultReportingInterval)
ExecStopCommand = fun _ -> ()
}
GetStepOrder = Scenario.getStepOrder
Expand Down
28 changes: 3 additions & 25 deletions tests/NBomber.IntegrationTests/ScenarioStatsActorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ let internal baseScenario =
|> Result.getOk

[<Fact>]
let ``with keepRawStats = true should cache raw stats`` () =
let ``AllRealtimeStats should contain cached realtime stats`` () =

let env = Dependency.createWithInMemoryLogger NodeType.SingleNode
let statsActor = ScenarioStatsActor(env.Dep.Logger, baseScenario, reportingInterval = seconds 5, keepRawStats = true)
let statsActor = ScenarioStatsActor(env.Dep.Logger, baseScenario, reportingInterval = seconds 5)

for i in [1..10] do
let res = { StepIndex = 0; ClientResponse = Response.ok(); EndTimeMs = float(100 + i); LatencyMs = i }
Expand All @@ -40,35 +40,14 @@ let ``with keepRawStats = true should cache raw stats`` () =
statsActor.Publish(BuildRealtimeStats(tcs, loadStats, duration))
let realtimeStats = tcs.Task.Result

test <@ statsActor.AllRawStats[duration].StepResponses.Length = 10 @>
test <@ statsActor.AllRealtimeStats[duration].RequestCount = 10 @>
test <@ realtimeStats.RequestCount = 10 @>

[<Fact>]
let ``with keepRawStats = false should not cache raw stats`` () =

let env = Dependency.createWithInMemoryLogger NodeType.SingleNode
let statsActor = ScenarioStatsActor(env.Dep.Logger, baseScenario, reportingInterval = seconds 5, keepRawStats = false)

for i in [1..10] do
let res = { StepIndex = 0; ClientResponse = Response.ok(); EndTimeMs = float(100 + i); LatencyMs = i }
statsActor.Publish(AddResponse res)

let tcs = TaskCompletionSource<ScenarioStats>()
let loadStats = { SimulationName = ""; Value = 10 }
let duration = seconds 10
statsActor.Publish(BuildRealtimeStats(tcs, loadStats, duration))
let realtimeStats = tcs.Task.Result

test <@ statsActor.AllRawStats.Count = 0 @>
test <@ statsActor.AllRealtimeStats[duration].RequestCount = 10 @>
test <@ realtimeStats.RequestCount = 10 @>

[<Fact>]
let ``TempBuffer should work correctly`` () =

let env = Dependency.createWithInMemoryLogger NodeType.SingleNode
let statsActor = ScenarioStatsActor(env.Dep.Logger, baseScenario, reportingInterval = seconds 5, keepRawStats = false)
let statsActor = ScenarioStatsActor(env.Dep.Logger, baseScenario, reportingInterval = seconds 5)

statsActor.Publish StartUseTempBuffer

Expand All @@ -92,6 +71,5 @@ let ``TempBuffer should work correctly`` () =

test <@ statsBufferEnabled.RequestCount = 0 @>
test <@ statsBufferFlushed.RequestCount = 10 @>
test <@ statsActor.AllRawStats.Count = 0 @>
test <@ statsActor.AllRealtimeStats[duration].RequestCount = 10 @>

0 comments on commit e8a18c7

Please sign in to comment.