Skip to content

Commit

Permalink
This is an automated cherry-pick of #8589
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
amyangfei authored and ti-chi-bot committed Mar 22, 2023
1 parent 4b60531 commit 23f4447
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 0 deletions.
43 changes: 43 additions & 0 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ import (
"path/filepath"
"time"

"github.com/dustin/go-humanize"
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
<<<<<<< HEAD:cdc/server.go
=======
"github.com/pingcap/tiflow/cdc"
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589)):cdc/server/server.go
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/sorter/unified"
Expand All @@ -36,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/tcpserver"
"github.com/pingcap/tiflow/pkg/util"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/client/pkg/v3/logutil"
Expand Down Expand Up @@ -160,7 +166,44 @@ func (s *Server) Run(ctx context.Context) error {

s.capture = capture.NewCapture(s.pdEndpoints, s.etcdClient, s.grpcService)

<<<<<<< HEAD:cdc/server.go
err = s.startStatusHTTP(s.tcpServer.HTTP1Listener())
=======
return nil
}

func (s *server) createSortEngineFactory() error {
conf := config.GetGlobalServerConfig()
if s.sortEngineFactory != nil {
if err := s.sortEngineFactory.Close(); err != nil {
log.Error("fails to close sort engine manager", zap.Error(err))
}
s.sortEngineFactory = nil
}

// Sorter dir has been set and checked when server starts.
// See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
totalMemory, err := util.GetMemoryLimit()
if err != nil {
return errors.Trace(err)
}
memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100
memInBytes := uint64(float64(totalMemory) * memPercentage)
s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB)
log.Info("sorter engine memory limit", zap.String("memory", humanize.Bytes(memInBytes)))

return nil
}

// Run runs the server.
func (s *server) Run(ctx context.Context) error {
if err := s.prepare(ctx); err != nil {
return err
}

err := s.startStatusHTTP(s.tcpServer.HTTP1Listener())
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589)):cdc/server/server.go
if err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
require (
github.com/BurntSushi/toml v1.1.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/KimMachineGun/automemlimit v0.2.4
github.com/Shopify/sarama v1.36.0
github.com/apache/pulsar-client-go v0.6.0
github.com/aws/aws-sdk-go v1.35.3
Expand Down Expand Up @@ -122,10 +123,16 @@ require (
github.com/carlmjohnson/flagext v0.21.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/pb/v3 v3.0.8 // indirect
<<<<<<< HEAD
=======
github.com/cilium/ebpf v0.4.0 // indirect
github.com/cloudfoundry/gosigar v1.3.6 // indirect
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589))
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 // indirect
github.com/coocood/freecache v1.2.1 // indirect
github.com/coocood/rtutil v0.0.0-20190304133409-c84515f646f2 // indirect
Expand Down Expand Up @@ -153,9 +160,14 @@ require (
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
<<<<<<< HEAD
github.com/go-playground/validator/v10 v10.9.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
=======
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589))
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down Expand Up @@ -201,6 +213,7 @@ require (
github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 // indirect
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/basictracer-go v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
Expand Down
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/KimMachineGun/automemlimit v0.2.4 h1:GBty8TK8k0aJer1Pq5/3Vdt2ef+YpLhcqNo+PSD5CoI=
github.com/KimMachineGun/automemlimit v0.2.4/go.mod h1:38QAnnnNhnFuAIW3+aPlaVUHqzE9buJYZK3m/jsra8E=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
Expand Down Expand Up @@ -183,6 +185,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5O
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.4.0 h1:QlHdikaxALkqWasW8hAC1mfR0jdmvbfaBdBPFmRSglA=
github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -214,6 +218,8 @@ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA=
github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ=
github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64/go.mod h1:F86k/6c7aDUdwSUevnLpHS/3Q9hzYCE99jGk2xsHnt0=
github.com/coocood/freecache v1.2.1 h1:/v1CqMq45NFH9mp/Pt142reundeBM0dVUD3osQBeu/U=
Expand Down Expand Up @@ -427,8 +433,14 @@ github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/goccy/go-graphviz v0.0.9/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQFC6TlNvLhk=
github.com/goccy/go-json v0.7.8/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
<<<<<<< HEAD
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
=======
github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589))
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/gateway v1.1.0 h1:u0SuhL9+Il+UbjM9VIE3ntfRujKbvVpFvNB4HbjeVQ0=
Expand Down Expand Up @@ -912,6 +924,12 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
<<<<<<< HEAD
=======
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0=
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
>>>>>>> 7dc2617115 (sorter(ticdc): use correct cgroup memory limit in some scenarios (#8589))
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0=
github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc=
Expand Down
69 changes: 69 additions & 0 deletions pkg/util/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"math"
"time"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/pingcap/log"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/shirou/gopsutil/v3/mem"
"go.uber.org/zap"
)

const memoryMax uint64 = math.MaxUint64

// GetMemoryLimit gets the memory limit of current process based on cgroup.
// If the cgourp is not set or memory.max is set to max, returns the available
// memory of host.
func GetMemoryLimit() (uint64, error) {
totalMemory, err := memlimit.FromCgroup()
if err != nil || totalMemory == memoryMax {
log.Info("no cgroup memory limit", zap.Error(err))
totalMemory, err = memory.MemTotal()
if err != nil {
return 0, errors.Trace(err)
}
}
return totalMemory, nil
}

// CheckMemoryUsage checks if the memory usage is less than the limit.
func CheckMemoryUsage(limit float64) (bool, error) {
stat, err := mem.VirtualMemory()
if err != nil {
return false, err
}
return stat.UsedPercent < limit, nil
}

// WaitMemoryAvailable waits until the memory usage is less than the limit.
func WaitMemoryAvailable(limit float64, timeout time.Duration) error {
start := time.Now()
for {
hasFreeMemory, err := CheckMemoryUsage(limit)
if err != nil {
return err
}
if hasFreeMemory {
return nil
}
if time.Since(start) > timeout {
return errors.ErrWaitFreeMemoryTimeout.GenWithStackByArgs()
}
}
}
27 changes: 27 additions & 0 deletions pkg/util/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetMemoryLimit(t *testing.T) {
t.Parallel()
limit, err := GetMemoryLimit()
require.NoError(t, err)
require.Less(t, limit, memoryMax)
}

0 comments on commit 23f4447

Please sign in to comment.