diff --git a/cdc/sorter/leveldb/system/system.go b/cdc/sorter/leveldb/system/system.go index 1a269ccc8f5..108ecd1dd1e 100644 --- a/cdc/sorter/leveldb/system/system.go +++ b/cdc/sorter/leveldb/system/system.go @@ -22,13 +22,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb/util/memory" lsorter "github.com/pingcap/tiflow/cdc/sorter/leveldb" "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" "github.com/pingcap/tiflow/pkg/actor" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/db" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -131,7 +131,7 @@ func (s *System) Start(ctx context.Context) error { s.dbSystem.Start(ctx) s.WriterSystem.Start(ctx) s.ReaderSystem.Start(ctx) - totalMemory, err := memory.MemTotal() + totalMemory, err := util.MemTotal() if err != nil { return errors.Trace(err) } diff --git a/go.mod b/go.mod index 63dbc964c3d..0e2174ccaf5 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/chaos-mesh/go-sqlsmith v0.0.0-20220512075501-53f2916ae240 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/cockroachdb/pebble v0.0.0-20211124172904-3ca75111760c + github.com/containerd/cgroups/v3 v3.0.1 github.com/coreos/go-semver v0.3.0 github.com/davecgh/go-spew v1.1.1 github.com/deepmap/oapi-codegen v1.9.0 @@ -41,6 +42,7 @@ require ( github.com/jarcoal/httpmock v1.0.8 github.com/jmoiron/sqlx v1.3.3 github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d + github.com/labstack/gommon v0.3.0 github.com/linkedin/goavro/v2 v2.11.1 github.com/mailru/easyjson v0.7.7 github.com/mattn/go-shellwords v1.0.12 @@ -96,8 +98,6 @@ require ( upper.io/db.v3 v3.7.1+incompatible ) -require github.com/labstack/gommon v0.3.0 - require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.2.0 // indirect @@ -122,6 +122,7 @@ require ( github.com/carlmjohnson/flagext v0.21.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cheggaaa/pb/v3 v3.0.8 // indirect + github.com/cilium/ebpf v0.9.1 // indirect github.com/cockroachdb/errors v1.8.1 // indirect github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect github.com/cockroachdb/redact v1.0.8 // indirect @@ -155,6 +156,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.9.0 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -201,6 +203,7 @@ require ( github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 // indirect github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect + github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/basictracer-go v1.1.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/philhofer/fwd v1.1.1 // indirect diff --git a/go.sum b/go.sum index cc4eaef9219..e504565592e 100644 --- a/go.sum +++ b/go.sum @@ -183,6 +183,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5O github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4= +github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -214,6 +216,8 @@ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= +github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5ToenJRaE= +github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64/go.mod h1:F86k/6c7aDUdwSUevnLpHS/3Q9hzYCE99jGk2xsHnt0= github.com/coocood/freecache v1.2.1 h1:/v1CqMq45NFH9mp/Pt142reundeBM0dVUD3osQBeu/U= @@ -330,8 +334,8 @@ github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9 github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= @@ -429,6 +433,7 @@ github.com/goccy/go-graphviz v0.0.9/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQF github.com/goccy/go-json v0.7.8/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= +github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/gateway v1.1.0 h1:u0SuhL9+Il+UbjM9VIE3ntfRujKbvVpFvNB4HbjeVQ0= @@ -912,6 +917,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= +github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc= diff --git a/pkg/util/cgroup/cgroups.go b/pkg/util/cgroup/cgroups.go new file mode 100644 index 00000000000..182e9b3dd1d --- /dev/null +++ b/pkg/util/cgroup/cgroups.go @@ -0,0 +1,80 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// +// ============================================================ +// Forked from https://github.com/KimMachineGun/automemlimit +// +// Written by Geon Kim +// limitations under the MIT License. + +//go:build linux +// +build linux + +package cgroup + +import ( + "github.com/containerd/cgroups/v3" + "github.com/containerd/cgroups/v3/cgroup1" + "github.com/containerd/cgroups/v3/cgroup2" +) + +// FromCgroup returns the memory limit based on the cgroups version on this system. +func FromCgroup() (uint64, error) { + switch cgroups.Mode() { + case cgroups.Legacy: + return FromCgroupV1() + case cgroups.Hybrid, cgroups.Unified: + return FromCgroupV2() + } + return 0, ErrNoCgroup +} + +// FromCgroupV1 returns the memory limit from the cgroup v1. +func FromCgroupV1() (uint64, error) { + cg, err := cgroup1.Load(cgroup1.RootPath, cgroup1.WithHiearchy( + cgroup1.SingleSubsystem(cgroup1.Default, cgroup1.Memory), + )) + if err != nil { + return 0, err + } + + metrics, err := cg.Stat(cgroup1.IgnoreNotExist) + if err != nil { + return 0, err + } else if metrics.Memory == nil { + return 0, ErrNoLimit + } + + return metrics.Memory.HierarchicalMemoryLimit, nil +} + +// FromCgroupV2 returns the memory limit from the cgroup v2. +func FromCgroupV2() (uint64, error) { + path, err := cgroup2.NestedGroupPath("") + if err != nil { + return 0, err + } + + m, err := cgroup2.Load(path, cgroup2.WithMountpoint(cgroupMountPoint)) + if err != nil { + return 0, err + } + + stats, err := m.Stat() + if err != nil { + return 0, err + } else if stats.Memory == nil { + return 0, ErrNoLimit + } + + return stats.Memory.UsageLimit, nil +} diff --git a/pkg/util/cgroup/cgroups_test.go b/pkg/util/cgroup/cgroups_test.go new file mode 100644 index 00000000000..f840dbfef91 --- /dev/null +++ b/pkg/util/cgroup/cgroups_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// +// ============================================================ +// Forked from https://github.com/KimMachineGun/automemlimit +// +// Written by Geon Kim +// limitations under the MIT License. + +//go:build linux +// +build linux + +package cgroup + +import ( + "flag" + "log" + "os" + "testing" + + "github.com/containerd/cgroups/v3" +) + +var ( + cgVersion cgroups.CGMode + expected uint64 +) + +func TestMain(m *testing.M) { + flag.Uint64Var(&expected, "expected", 0, "Expected cgroup's memory limit") + flag.Parse() + + cgVersion = cgroups.Mode() + log.Println("Cgroups version:", cgVersion) + + os.Exit(m.Run()) +} + +func TestFromCgroup(t *testing.T) { + _, err := FromCgroup() + if cgVersion == cgroups.Unavailable && err != ErrNoCgroup { + t.Fatalf("FromCgroup() error = %v, wantErr %v", err, ErrNoCgroup) + } + + if err != nil { + t.Fatalf("FromCgroup() error = %v, wantErr %v", err, nil) + } +} + +func TestFromCgroupV1(t *testing.T) { + if cgVersion != cgroups.Legacy { + t.Skip("cgroups v1 is not supported") + } + _, err := FromCgroupV1() + if err != nil { + t.Fatalf("FromCgroupV1() error = %v, wantErr %v", err, nil) + } +} + +func TestFromCgroupV2(t *testing.T) { + if cgVersion != cgroups.Hybrid && cgVersion != cgroups.Unified { + t.Skip("cgroups v2 is not supported") + } + _, err := FromCgroupV2() + if err != nil { + t.Fatalf("FromCgroupV2() error = %v, wantErr %v", err, nil) + } +} diff --git a/pkg/util/cgroup/cgroups_unsupported.go b/pkg/util/cgroup/cgroups_unsupported.go new file mode 100644 index 00000000000..fcd4452f15a --- /dev/null +++ b/pkg/util/cgroup/cgroups_unsupported.go @@ -0,0 +1,34 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// +// ============================================================ +// Forked from https://github.com/KimMachineGun/automemlimit +// +// Written by Geon Kim +// limitations under the MIT License. + +//go:build !linux +// +build !linux + +package cgroup + +func FromCgroup() (uint64, error) { + return 0, ErrCgroupsNotSupported +} + +func FromCgroupV1() (uint64, error) { + return 0, ErrCgroupsNotSupported +} + +func FromCgroupV2() (uint64, error) { + return 0, ErrCgroupsNotSupported +} diff --git a/pkg/util/cgroup/cgroups_unsupported_test.go b/pkg/util/cgroup/cgroups_unsupported_test.go new file mode 100644 index 00000000000..d017b0e80fd --- /dev/null +++ b/pkg/util/cgroup/cgroups_unsupported_test.go @@ -0,0 +1,56 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// +// ============================================================ +// Forked from https://github.com/KimMachineGun/automemlimit +// +// Written by Geon Kim +// limitations under the MIT License. + +//go:build !linux +// +build !linux + +package cgroup + +import ( + "testing" +) + +func TestFromCgroup(t *testing.T) { + limit, err := FromCgroup() + if err != ErrCgroupsNotSupported { + t.Fatalf("FromCgroup() error = %v, wantErr %v", err, ErrCgroupsNotSupported) + } + if limit != 0 { + t.Fatalf("FromCgroup() got = %v, want %v", limit, 0) + } +} + +func TestFromCgroupV1(t *testing.T) { + limit, err := FromCgroupV1() + if err != ErrCgroupsNotSupported { + t.Fatalf("FromCgroupV1() error = %v, wantErr %v", err, ErrCgroupsNotSupported) + } + if limit != 0 { + t.Fatalf("FromCgroupV1() got = %v, want %v", limit, 0) + } +} + +func TestFromCgroupV2(t *testing.T) { + limit, err := FromCgroupV2() + if err != ErrCgroupsNotSupported { + t.Fatalf("FromCgroupV2() error = %v, wantErr %v", err, ErrCgroupsNotSupported) + } + if limit != 0 { + t.Fatalf("FromCgroupV2() got = %v, want %v", limit, 0) + } +} diff --git a/pkg/util/cgroup/constant.go b/pkg/util/cgroup/constant.go new file mode 100644 index 00000000000..fbfbde08dc7 --- /dev/null +++ b/pkg/util/cgroup/constant.go @@ -0,0 +1,34 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// +// ============================================================ +// Forked from https://github.com/KimMachineGun/automemlimit +// +// Written by Geon Kim +// limitations under the MIT License. + +package cgroup + +import "errors" + +const ( + cgroupMountPoint = "/sys/fs/cgroup" +) + +var ( + // ErrNoLimit is returned when the memory limit is not set. + ErrNoLimit = errors.New("memory is not limited") + // ErrNoCgroup is returned when the process is not in cgroup. + ErrNoCgroup = errors.New("process is not in cgroup") + // ErrCgroupsNotSupported is returned when the system does not support cgroups. + ErrCgroupsNotSupported = errors.New("cgroups is not supported on this system") +) diff --git a/pkg/util/memory.go b/pkg/util/memory.go new file mode 100644 index 00000000000..4bcb610f855 --- /dev/null +++ b/pkg/util/memory.go @@ -0,0 +1,44 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "math" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util/cgroup" + "go.uber.org/zap" +) + +const memoryMax uint64 = math.MaxUint64 + +// MemTotal returns the memory limit of current process based on cgroup. +var MemTotal func() (uint64, error) = GetMemoryLimit + +// GetMemoryLimit gets the memory limit of current process based on cgroup. +// If the cgourp is not set or memory.max is set to max, returns the available +// memory of host. +func GetMemoryLimit() (uint64, error) { + totalMemory, err := cgroup.FromCgroup() + if err != nil || totalMemory == memoryMax { + log.Info("no cgroup memory limit", zap.Error(err)) + totalMemory, err = memory.MemTotal() + if err != nil { + return 0, errors.Trace(err) + } + } + return totalMemory, nil +} diff --git a/pkg/util/memory_test.go b/pkg/util/memory_test.go new file mode 100644 index 00000000000..310048a1ebd --- /dev/null +++ b/pkg/util/memory_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetMemoryLimit(t *testing.T) { + t.Parallel() + limit, err := MemTotal() + require.NoError(t, err) + require.Less(t, limit, memoryMax) +}