diff --git a/src/Insurello.RabbitMqClient/MqClient.fs b/src/Insurello.RabbitMqClient/MqClient.fs index ef179b3..fac86fb 100644 --- a/src/Insurello.RabbitMqClient/MqClient.fs +++ b/src/Insurello.RabbitMqClient/MqClient.fs @@ -13,7 +13,8 @@ module MqClient = type private ModelData = { channelConsumer: AsyncEventingBasicConsumer rpcConsumer: AsyncEventingBasicConsumer - pendingRequests: System.Collections.Concurrent.ConcurrentDictionary System.Threading.Tasks.TaskCompletionSource> } + pendingRequests: System.Collections.Concurrent.ConcurrentDictionary System.Threading.Tasks.TaskCompletionSource> + connection: IConnection } and Message<'event> = private | Message of 'event * ModelData @@ -21,7 +22,7 @@ module MqClient = type RawBody = string - type Model = private | Model of ModelData + type Model = private Model of ModelData /// /// The maximum number of MQ messages to be fetched from queues and get processed at a time by the RabbitMQ client. @@ -195,6 +196,18 @@ module MqClient = else () + let private closeRpcConsumer: Model -> unit = + fun (Model model) -> if model.rpcConsumer.Model.IsOpen then model.rpcConsumer.Model.Close() else () + + let private closeChannelConsumer: Model -> unit = + fun (Model model) -> if model.channelConsumer.Model.IsOpen then model.channelConsumer.Model.Close() else () + + let close: Model -> unit = + fun (Model model) -> + closeChannelConsumer (Model model) + closeRpcConsumer (Model model) + closeConnection 0 model.connection + let private createChannel: ChannelConfig -> ExceptionCallback -> IConnection -> Result = fun config exCallback connection -> try @@ -310,7 +323,10 @@ module MqClient = (PublishResult.ReturnError (sprintf "Failed to publish to queue: ReplyCode: %i, ReplyText: %s, Exchange: %s, RoutingKey: %s" - args.ReplyCode args.ReplyText args.Exchange args.RoutingKey)) + args.ReplyCode + args.ReplyText + args.Exchange + args.RoutingKey)) |> ignore else ()) @@ -395,11 +411,15 @@ module MqClient = createChannel { withConfirmSelect = true - prefetchCount = prefetchCount } exCallback connection + prefetchCount = prefetchCount } + exCallback + connection |> Result.bind (fun channel -> createChannel { withConfirmSelect = false - prefetchCount = prefetchCount } exCallback connection + prefetchCount = prefetchCount } + exCallback + connection |> Result.map (fun rpcChannel -> (connection, Model @@ -409,7 +429,8 @@ module MqClient = pendingRequests = System.Collections.Concurrent.ConcurrentDictionary System.Threading.Tasks.TaskCompletionSource> - () })))) + () + connection = connection })))) |> Result.bind (fun (connection, model) -> let declareAQueue = declareQueue model let bindAQueue = bindQueueToExchange model @@ -442,7 +463,9 @@ module MqClient = (callback = (fun () -> tcs.SetResult - ((sprintf "Publish to queue '%s' timedout after %ss" routingKey + ((sprintf + "Publish to queue '%s' timedout after %ss" + routingKey (timeout.TotalSeconds.ToString())) |> PublishResult.Timeout) |> ignore), @@ -494,7 +517,9 @@ module MqClient = let! publishResult = tcs.Task |> (Async.AwaitTask >> Async.Catch) model.channelConsumer.Model.BasicAcks.RemoveHandler basicAckEventHandler + model.channelConsumer.Model.BasicNacks.RemoveHandler basicNackEventHandler + model.channelConsumer.Model.BasicReturn.RemoveHandler basicReturnEventHandler return match publishResult with @@ -566,7 +591,9 @@ module MqClient = tcs.TrySetResult (Error - (sprintf "Publish to queue '%s' timedout after %ss" routingKey + (sprintf + "Publish to queue '%s' timedout after %ss" + routingKey (timeout.TotalSeconds.ToString()))) |> ignore), useSynchronizationContext = false)