-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix pipe compose and rethink the log error model #378
Conversation
@haf hi, And i am a litter bit confusing about this /// Special case: e.g. Fatal messages.
let logAck (logger: Logger) level messageFactory: Promise<unit> =
let ack = IVar ()
let inner =
logger.logWithAck (true, level) messageFactory ^=> function
| Ok promise ->
Job.start (promise ^=> IVar.fill ack)
| Result.Error Rejected ->
IVar.fill ack ()
| Result.Error (BufferFull target) ->
//let e = exn (sprintf "logWithAck (true, _) should have waited for the RingBuffer(s) to accept the Message. Target(%s)" target)
//IVar.fillFailure ack e
IVar.fill ack ()
start inner
ack :> Promise<_> or how about this ? /// log start when user read the promise returned by this method
let logAckCold (logger: Logger) level messageFactory: Promise<unit> =
logger.logWithAck (true, level) messageFactory
>>=* function
| Ok promise -> promise
| Result.Error e -> Promise.unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alt.always (Result.Error e)) | ||
) | ||
|> Seq.Con.mapJob (fun msg -> next msg |> PipeResult.orDefault LogResult.success) | ||
>>= IVar.fill putAllPromises |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just bind reduce
as the callback of Seq.Con.mapJob
's return value? Why start in parallel and join the result with a promise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since if we bind reduce as callback of Con.mapJob
, this will chain these jobs till we reture alt
. And this alt
will be used for Alt.prepareJob
, this means that it will block until all Alt
finished (Result.sequence ) then reduce their results to commit an always Alt
here.
#r "Hopac"
#r "Hopac.Core"
open Hopac
open Hopac.Infixes
let slow = Alt.prepareJob (fun _ ->
timeOutMillis 5000 >>-. Alt.always ("slow")
)
let slowWithIVar = Alt.prepareJob (fun _ ->
let v = IVar ()
timeOutMillis 5000
>>= fun _ -> IVar.fill v ("slow")
|> Job.start
>>-. v
)
let quick = timeOutMillis 500 ^->. "quick"
let test = (slow <|> quick) ^-> printfn "%s"
let testWithIVar = (slowWithIVar <|> quick) ^-> printfn "%s"
run test
run testWithIVar
i write a test and it gives like this:
val slow : Hopac.Alt<string>
val slowWithIVar : Hopac.Alt<string>
val quick : Hopac.Alt<string>
val test : Hopac.Alt<unit>
val testWithIVar : Hopac.Alt<unit>
> run test;;
Binding session to '/Users/lust/.nuget/packages/hopac/0.4.1/lib/netstandard2.0/Hopac.Platform.dll'...
slow
val it : unit = ()
> run testWithIVar;;
quick
val it : unit = ()
@@ -275,25 +275,8 @@ and SpanInfo = | |||
static member formatId (id: Guid) = | |||
id.ToString("n") | |||
|
|||
/// Why was the message/metric/event not logged? | |||
[<Struct>] | |||
type LogError = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thing is, if we change this, there's a whole heap of work to do in the Facade as well. I'm OK with having this very simple, like now, because we have only a few cases that happen:
- target fails terminally due to target's state (e.g. out of disk)
- target fails grey due to e.g. due to too slow processing
- connectivity broken / DNS lookup failure, etc; all inside the logary target, and those may be cached, so a service restart is needed
- rate of logging > rate of sending, dropping in part
- misconfiguration
We don't really want to have timeouts in the pipeline if we can avoid it, because every time we add another timeout, it's a work item that Hopac's scheduler has to track. If the Hopac scheduler's queue can't keep up, it means the buffers will fill up, which means we'll run out of memory and crash due to that (since we still input messages into the buffer pipeline); that was the reason of the refactor this summer, away from 'log simple' having a timeout alternative, towards this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a litter difference here, in old one: message will not be dropped when the timeout alternative committed, since we have more than one targets. but right now, if timeout, then message will not be append to the buffer, and we will get a timeout error message to return to the client. if we config a global logging error handler on the registry, we can notify this situation, then adjust the target buffer length ,or speed up the processing.
since we have a timeout protocol, user can do backpressure with some balance, no need to wait all the time (since logging is not such important as the business code), but we still want to give some time to buffer if they can't catch up in a short time.
false | ||
|
||
let private printDotOnOverflow success = | ||
if not success then System.Console.Error.Write '.' else () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is/was also a good signal on when another Hopac job crashed the scheduling of tasks; by crashing one of the worker threads. If we don't print something here, we'll never be able to see this error case from the logs, since Logary is now indisposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg
|> Target.logAllReduce putBufferTimeOut targets
|> Alt.afterFun (fun result ->
start (Job.thunk (fun _ ->
try
conf.logResultHandler result
with
| e ->
eprintfn "%A" e ))
this has been implement by registry logging error handler in here https://github.com/logary/logary/blob/fix-pipe-compose/src/Logary/Configuration/Config.fs#L49
logResultHandler = function | Result.Error error -> System.Console.Error.Write (MessageWriter.singleLineNoContext.format error) | _ -> ()
src/Logary/TargetModule.fs
Outdated
else | ||
RingBuffer.put x.api.requests targetMsg ^->. Result.Ok (upcast ack) | ||
<|> | ||
timeOut (putBufferTimeOut.ToTimeSpan()) ^-> fun _ -> LogError.timeOutToPutBuffer x.name putBufferTimeOut.TotalSeconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what caused our cascading failures of logging high-throughput at tradera.com. We can't go back to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have some perf-test to check this situation? like this one ? #322
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's benchmark it.
I'm not sure without reading its call-sites. I'll have to go through them. But generally, you're right; it's a hot function and should probably not be unless a call site specifically requires it to be. |
This PR, given that it changes public API, needs to update the facade, and the facade adapter to match. I think you've done a good job, but I would prefer to have the bool in place, and not change the public API; we can make the wait duration configurable with an environment variable instead. |
yes, you are right. i have notice that it will affect facade and its adapter. maybe we can add an extension method to add this duration in message context, not change the origin public api, if waitforbuffer is true, we can check whether we have this duration timeout. |
@haf fix adapter and facade. In facade I change the In logary at this refactor, use |
This error on appveyor looks a bit like a race condition... Will you check it out, or should I?
|
I think we can keep it v4 based on that it's in beta. For my own use-case it's easy enough to switch. |
Sure |
I really have no idea about what is going on here, I don't know if this has anything to do with hopac scheduling timeout. since we do |
@haf can you help release a beta version ? so that i can fill the gap between https://github.com/logary/AspNetCore and the newest logary |
@haf > Yes, let's benchmark it. fix the |
Let's make the CI pass, so I can merge it. I'll look at this ASAP; sorry for the delay. |
seems that :
and if we expose i think, what do you think ? |
Sounds well reasoned to me.
But often when users don't have access to a synchronisation context they don't want to care, so not exposing this forces something on the user and causes questions they may not need to understand. We could have configuration availble possible that changes the default when the ring buffer is full, though. |
Releasing this as beta-29 now. |
fix pipe compose
Events.build
here means dispatch each event/message to all pipes, not chains them.use message as the error model for logging error
since this can express more situations and can take with more info. if some error occurs, we can print the message to see if it is registry closed, or timeout, or partially success (and show which targets/message failed).
support timeout the right way
each put message to buffer can have it's own waiting time, if timeout, giving up putting messages return a timeout error message to the client. But this doesn't give putting message to all targets a transaction semantic meaning.
TODO:
LoggerModule
exposing a friendly api to help client handle logging errors
fixed: #294 #303