Skip to content

Commit

Permalink
server (ticdc): add timeout for statusServer. (#5332) (#5346)
Browse files Browse the repository at this point in the history
close #5303
  • Loading branch information
ti-chi-bot authored Jun 27, 2022
1 parent 6b77bff commit 4d7dc3d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) {
return
}

// c does not have a cancel() func and its Done() method always return nil,
// so we should not use c as a context.
// Ref:https://github.com/gin-gonic/gin/blob/92eeaa4ebbadec2376e2ca5f5749888da1a42e24/context.go#L1157
ctx := c.Request.Context()
var changefeedConfig model.ChangefeedConfig
if err := c.BindJSON(&changefeedConfig); err != nil {
Expand Down
23 changes: 22 additions & 1 deletion cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/failpoint"
Expand All @@ -35,12 +36,27 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/net/netutil"
)

const (
// maxHTTPConnection is used to limits the max concurrent connections of http server.
maxHTTPConnection = 1000
// httpConnectionTimeout is used to limits a connection max alive time of http server.
httpConnectionTimeout = 10 * time.Minute
)

// startStatusHTTP starts the HTTP server.
// `lis` is a listener that gives us plain-text HTTP requests.
// TODO can we decouple the HTTP server from the capture server?
func (s *Server) startStatusHTTP(lis net.Listener) error {
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener. Connections that exceed the
// limit will wait in a queue and no new goroutines will be created until
// a connection is processed.
// We use it here to limit the max concurrent connections of statusServer.
lis = netutil.LimitListener(lis, maxHTTPConnection)

conf := config.GetGlobalServerConfig()

// OpenAPI handling logic is injected here.
Expand All @@ -65,7 +81,12 @@ func (s *Server) startStatusHTTP(lis net.Listener) error {
router.Any("/metrics", gin.WrapH(promhttp.Handler()))

// No need to configure TLS because it is already handled by `s.tcpServer`.
s.statusServer = &http.Server{Handler: router}
// Add ReadTimeout and WriteTimeout to avoid some abnormal connections never close.
s.statusServer = &http.Server{
Handler: router,
ReadTimeout: httpConnectionTimeout,
WriteTimeout: httpConnectionTimeout,
}

go func() {
log.Info("http server is running", zap.String("addr", conf.Addr))
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config,
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
// we must close adminClient when this func return cause by an error
// otherwise the adminClient will never be closed and lead to an goroutine leak
defer func() {
if err != nil && admin != nil {
admin.Close()
}
}()

if err := validateAndCreateTopic(admin, topic, config, cfg, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down

0 comments on commit 4d7dc3d

Please sign in to comment.