Skip to content

Commit

Permalink
Expose close connection (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
AxelJunker authored Oct 22, 2020
1 parent f3bb98e commit 5fb327c
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions src/Insurello.RabbitMqClient/MqClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ module MqClient =
type private ModelData =
{ channelConsumer: AsyncEventingBasicConsumer
rpcConsumer: AsyncEventingBasicConsumer
pendingRequests: System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource> }
pendingRequests: System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
connection: IConnection }

and Message<'event> = private | Message of 'event * ModelData

and ReceivedMessage = Message<BasicDeliverEventArgs>

type RawBody = string

type Model = private | Model of ModelData
type Model = private Model of ModelData

/// <summary>
/// The maximum number of MQ messages to be fetched from queues and get processed at a time by the RabbitMQ client.
Expand Down Expand Up @@ -195,6 +196,18 @@ module MqClient =
else
()

let private closeRpcConsumer: Model -> unit =
fun (Model model) -> if model.rpcConsumer.Model.IsOpen then model.rpcConsumer.Model.Close() else ()

let private closeChannelConsumer: Model -> unit =
fun (Model model) -> if model.channelConsumer.Model.IsOpen then model.channelConsumer.Model.Close() else ()

let close: Model -> unit =
fun (Model model) ->
closeChannelConsumer (Model model)
closeRpcConsumer (Model model)
closeConnection 0 model.connection

let private createChannel: ChannelConfig -> ExceptionCallback -> IConnection -> Result<IModel, string> =
fun config exCallback connection ->
try
Expand Down Expand Up @@ -310,7 +323,10 @@ module MqClient =
(PublishResult.ReturnError
(sprintf
"Failed to publish to queue: ReplyCode: %i, ReplyText: %s, Exchange: %s, RoutingKey: %s"
args.ReplyCode args.ReplyText args.Exchange args.RoutingKey))
args.ReplyCode
args.ReplyText
args.Exchange
args.RoutingKey))
|> ignore
else
())
Expand Down Expand Up @@ -395,11 +411,15 @@ module MqClient =

createChannel
{ withConfirmSelect = true
prefetchCount = prefetchCount } exCallback connection
prefetchCount = prefetchCount }
exCallback
connection
|> Result.bind (fun channel ->
createChannel
{ withConfirmSelect = false
prefetchCount = prefetchCount } exCallback connection
prefetchCount = prefetchCount }
exCallback
connection
|> Result.map (fun rpcChannel ->
(connection,
Model
Expand All @@ -409,7 +429,8 @@ module MqClient =

pendingRequests =
System.Collections.Concurrent.ConcurrentDictionary<string, Result<ReceivedMessage, string> System.Threading.Tasks.TaskCompletionSource>
() }))))
()
connection = connection }))))
|> Result.bind (fun (connection, model) ->
let declareAQueue = declareQueue model
let bindAQueue = bindQueueToExchange model
Expand Down Expand Up @@ -442,7 +463,9 @@ module MqClient =
(callback =
(fun () ->
tcs.SetResult
((sprintf "Publish to queue '%s' timedout after %ss" routingKey
((sprintf
"Publish to queue '%s' timedout after %ss"
routingKey
(timeout.TotalSeconds.ToString()))
|> PublishResult.Timeout)
|> ignore),
Expand Down Expand Up @@ -494,7 +517,9 @@ module MqClient =
let! publishResult = tcs.Task |> (Async.AwaitTask >> Async.Catch)

model.channelConsumer.Model.BasicAcks.RemoveHandler basicAckEventHandler

model.channelConsumer.Model.BasicNacks.RemoveHandler basicNackEventHandler

model.channelConsumer.Model.BasicReturn.RemoveHandler basicReturnEventHandler

return match publishResult with
Expand Down Expand Up @@ -566,7 +591,9 @@ module MqClient =

tcs.TrySetResult
(Error
(sprintf "Publish to queue '%s' timedout after %ss" routingKey
(sprintf
"Publish to queue '%s' timedout after %ss"
routingKey
(timeout.TotalSeconds.ToString())))
|> ignore),
useSynchronizationContext = false)
Expand Down

0 comments on commit 5fb327c

Please sign in to comment.