-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delay creating handlers for interrupt signals until after client connects to Kafka #55
Comments
glad it is of use to you @echojc :) haven't gotten time to standardize the shutdown behavior. was thinking of doing it the same across all commands: so probably register a listener in main, and then pass a not sure if i'll have time for this before next week, but i'm hoping it's not too urgent for you? |
No, not urgent. What do you think about registering the listener for interrupts only after the Kafka client has connected to the cluster? |
ah i was assuming (without checking) that not every command needs a client (e.g. produce) and that i would prefer a consistent approach. or do you see a way of handling that the same across commands? |
If I understand you correctly, how about passing a function that creates the close channel through to each command instead of just the channel? Then the commands that need to do cleanup can register the listener after the initial setup, and the commands that don't need cleanup don't need to listen for signals at all. Something like func createListenForInterrupt() func() chan struct{} {
var closer chan struct{}
return func() chan struct{} {
if closer == nil {
closer = make(chan struct{})
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
fmt.Fprintf(os.Stderr, "received interrupt - shutting down\n")
close(closer)
}()
}
return closer
}
}
func main() {
cmd := parseArgs()
closerFn := createListenForInterrupt()
cmd.run(os.Args[2:], closerFn)
} |
@echojc sorry, finally getting some headspace for this. if i understand you correctly, then the function is used to delay listening for the signals or differently to delay overriding the default listeners. thus the command can choose to override the default listeners when applicable. i think i like it :) will give it a try, unless you're interested? |
Yep that's what I mean. All yours 😅 |
@echojc let me know if the above doesn't work for you :) |
Thanks for making and maintaining kt! 😄
At the moment kt starts capturing interrupt signals before the client connects means that ^C while attempting to connect to Kafka will stall kt until the server responds. If the server doesn't respond (e.g.,
-brokers 8.8.8.8
) kt will hang until the connection attempt times out.You handle this in the topic command by spinning off a goroutine that waits for the interrupt channel to close but the other commands don't have that at the moment.
I'm also not sure that's the right way to handle it? That particular goroutine calls
failf
, which callsos.Exit
, which means deferred functions don't get executed, in which case, is that the same as using the default interrupt handler?The text was updated successfully, but these errors were encountered: