Skip to content

Commit

Permalink
Fix shutdown semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 13, 2023
1 parent 631b930 commit 72102ba
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 15 deletions.
10 changes: 7 additions & 3 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ type [<AbstractClass; Sealed>] ChangeFeedProcessor private () =
return! observers.Ingest(ctx, changes, checkpointAsync, ct)
with Exception.Log log () -> () }
let notifyError =
let log = match notifyError with Some f -> f | None -> Action<_, _>(fun i ex -> observers.LogReaderExn(i, ex))
let isNoise: exn -> bool = function // TODO tell MS to stop misreporting it, and remove the filter when that's released
| :? Microsoft.Azure.Cosmos.ChangeFeedProcessorUserException as e when (e.InnerException :? OperationCanceledException) -> true
| _ -> false
let log = match notifyError with Some f -> f | None -> Action<_, _>(fun i ex -> observers.LogReaderExn(i, ex, isNoise ex))
fun (TokenRangeId rangeId) ex -> log.Invoke(rangeId, ex); Task.CompletedTask
let logStateChange acquired (TokenRangeId rangeId) = observers.RecordStateChange(rangeId, acquired); Task.CompletedTask
monitored
Expand Down Expand Up @@ -72,6 +75,7 @@ type [<AbstractClass; Sealed>] ChangeFeedProcessor private () =
return! processor.StartAsync() }
let shutdown () = task {
try do! processor.StopAsync() with _ -> ()
(observers : IDisposable).Dispose() // Stop the ingesters
do! estimateAndLog CancellationToken.None }
Propulsion.PipelineFactory.PrepareSource2(log, startup, shutdown)
// On shutdown, Readers waiting for Ingestion capacity need to be released
let stopIngesters = (observers : IDisposable).Dispose
Propulsion.PipelineFactory.PrepareSource2(log, startup, shutdown, stopIngesters)
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/FeedObserver.fs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ type internal Observers<'Items>(log: Serilog.ILogger, processorName, buildObserv
member _.LogStart(leaseAcquireInterval: TimeSpan, leaseTtl: TimeSpan, leaseRenewInterval: TimeSpan, feedPollInterval: TimeSpan, startFromTail: bool, ?maxItems) =
log.Information("ChangeFeed {processorName} Lease acquire {leaseAcquireIntervalS:n0}s ttl {ttlS:n0}s renew {renewS:n0}s feedPollInterval {feedPollIntervalS:n0}s Items limit {maxItems} fromTail {fromTail}",
processorName, leaseAcquireInterval.TotalSeconds, leaseTtl.TotalSeconds, leaseRenewInterval.TotalSeconds, feedPollInterval.TotalSeconds, Option.toNullable maxItems, startFromTail)
member _.LogReaderExn(rangeId: int, ex: exn) =
log.Error(ex, "ChangeFeed {processorName}/{partition} error", processorName, rangeId)
member _.LogReaderExn(rangeId: int, ex: exn, isNoise: bool) =
log.Write((if isNoise then LogEventLevel.Debug else LogEventLevel.Error), ex, "ChangeFeed {processorName}/{partition} error", processorName, rangeId)
member _.LogHandlerExn(rangeId: int, ex: exn) =
log.Error(ex, "ChangeFeed {processorName}/{partition} Handler Threw", processorName, rangeId)
member _.Ingest(context, docs, checkpoint, ct) =
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type FeedReader
do! submitPage (readLatency, batch)
currentPos <- batch.checkpoint
lastWasTail <- batch.isTail
if stopAtTail then
if not ct.IsCancellationRequested && stopAtTail then
stats.EnteringShutdown()
let! struct (cur, max) = ingester.AwaitCompleted()
stats.ShutdownCompleted(cur, max) }
5 changes: 3 additions & 2 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type FeedSourceBase internal
try let! pos = checkpoints.Start(sourceId, trancheId, establishOrigin = (establishOrigin |> Option.map establishTrancheOrigin), ct = ct)
reader.LogPartitionStarting(pos)
return! reader.Pump(pos, ct)
with//:? System.Threading.Tasks.TaskCanceledException when ct.IsCancellationRequested -> ()
with:? System.Threading.Tasks.TaskCanceledException | :? OperationCanceledException -> ()
| Exception.Log reader.LogPartitionExn () -> ()
finally ingester.Stop() }

Expand Down Expand Up @@ -66,7 +66,8 @@ type FeedSourceBase internal

/// Would be protected if that existed - derived types are expected to use this in implementing a parameterless `Start()`
member x.Start(pump): SourcePipeline =
let machine, triggerStop, outcomeTask = PipelineFactory.PrepareSource(log, pump)
let stopIngesters () = for i, _ in partitions do i.Stop()
let machine, triggerStop, outcomeTask = PipelineFactory.PrepareSource(log, pump, stopIngesters)
let monitor = lazy FeedMonitor(log, positions.Current, sink, fun () -> outcomeTask.IsCompleted)
new SourcePipeline<_, _>(Task.run machine, x.Checkpoint, triggerStop, monitor)

Expand Down
18 changes: 11 additions & 7 deletions src/Propulsion/Pipeline.fs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type SinkPipeline<'Ingester> internal (task: Task<unit>, triggerStop, startInges

type [<AbstractClass; Sealed>] PipelineFactory private () =

static member PrepareSource(log: Serilog.ILogger, pump: CancellationToken -> Task<unit>) =
static member PrepareSource(log: Serilog.ILogger, pump: CancellationToken -> Task<unit>, markCompleted: unit -> unit) =
let ct, stop =
let cts = new System.Threading.CancellationTokenSource()
cts.Token, fun disposing ->
Expand All @@ -58,14 +58,16 @@ type [<AbstractClass; Sealed>] PipelineFactory private () =

let inner, outcomeTask, markCompleted =
let tcs = System.Threading.Tasks.TaskCompletionSource<unit>()
let markCompleted () = tcs.TrySetResult () |> ignore
let markCompleted () =
markCompleted ()
tcs.TrySetResult () |> ignore
let recordExn (e: exn) = tcs.TrySetException e |> ignore
let inner () = task {
try do! pump ct
// If the source completes all reading cleanly, declare completion
log.Information "Source drained..."
// If the source completes all reading cleanly, convey that fact ()
if not ct.IsCancellationRequested then log.Information "Source drained..."
markCompleted ()
with e ->
with e when not ct.IsCancellationRequested ->
// first exception from a supervised task becomes the outcome if that happens
log.Warning(e, "Exception encountered while running source, exiting loop")
recordExn e
Expand All @@ -81,7 +83,7 @@ type [<AbstractClass; Sealed>] PipelineFactory private () =
finally log.Information "Source stopped" }
machine, stop, outcomeTask

static member PrepareSource2(log: Serilog.ILogger, startup: CancellationToken -> Task<unit>, shutdown: unit -> Task<unit>) =
static member PrepareSource2(log: Serilog.ILogger, startup: CancellationToken -> Task<unit>, shutdown: unit -> Task<unit>, markCompleted) =
let ct, stop =
let cts = new System.Threading.CancellationTokenSource()
cts.Token, fun disposing ->
Expand All @@ -91,7 +93,9 @@ type [<AbstractClass; Sealed>] PipelineFactory private () =

let outcomeTask, markCompleted =
let tcs = System.Threading.Tasks.TaskCompletionSource<unit>()
let markCompleted () = tcs.TrySetResult () |> ignore
let markCompleted () =
markCompleted ()
tcs.TrySetResult () |> ignore
tcs.Task, markCompleted

let machine () = task {
Expand Down

0 comments on commit 72102ba

Please sign in to comment.