Skip to content

Commit

Permalink
update, ref tikv#4399
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Feb 16, 2022
1 parent 1b8f823 commit e9d55b6
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 122 deletions.
136 changes: 136 additions & 0 deletions pkg/heap/indexed_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heap

import "container/heap"

// TopNItem represents a single object in TopN.
type TopNItem interface {
// ID is used to check identity.
ID() uint64
// Less tests whether the current item is less than the given argument in the `k`th dimension.
Less(k int, than TopNItem) bool
}

// IndexedHeap is a heap with index.
type IndexedHeap struct {
k int
rev bool
items []TopNItem
index map[uint64]int
}

// NewMinHeap returns a min heap with the given hint.
func NewMinHeap(k, hint int) *IndexedHeap {
return &IndexedHeap{
k: k,
rev: false,
items: make([]TopNItem, 0, hint),
index: map[uint64]int{},
}
}

// NewMaxHeap returns a max heap with the given hint.
func NewMaxHeap(k, hint int) *IndexedHeap {
return &IndexedHeap{
k: k,
rev: true,
items: make([]TopNItem, 0, hint),
index: map[uint64]int{},
}
}

// Implementing heap.Interface.
func (hp *IndexedHeap) Len() int {
return len(hp.items)
}

// Implementing heap.Interface.
func (hp *IndexedHeap) Less(i, j int) bool {
if !hp.rev {
return hp.items[i].Less(hp.k, hp.items[j])
}
return hp.items[j].Less(hp.k, hp.items[i])
}

// Implementing heap.Interface.
func (hp *IndexedHeap) Swap(i, j int) {
lid := hp.items[i].ID()
rid := hp.items[j].ID()
hp.items[i], hp.items[j] = hp.items[j], hp.items[i]
hp.index[lid] = j
hp.index[rid] = i
}

// Implementing heap.Interface.
func (hp *IndexedHeap) Push(x interface{}) {
item := x.(TopNItem)
hp.index[item.ID()] = hp.Len()
hp.items = append(hp.items, item)
}

// Implementing heap.Interface.
func (hp *IndexedHeap) Pop() interface{} {
l := hp.Len()
item := hp.items[l-1]
hp.items = hp.items[:l-1]
delete(hp.index, item.ID())
return item
}

// Top returns the top item.
func (hp *IndexedHeap) Top() TopNItem {
if hp.Len() <= 0 {
return nil
}
return hp.items[0]
}

// Get returns item with the given ID.
func (hp *IndexedHeap) Get(id uint64) TopNItem {
idx, ok := hp.index[id]
if !ok {
return nil
}
item := hp.items[idx]
return item
}

// GetAll returns all the items.
func (hp *IndexedHeap) GetAll() []TopNItem {
all := make([]TopNItem, len(hp.items))
copy(all, hp.items)
return all
}

// Put inserts item or updates the old item if it exists.
func (hp *IndexedHeap) Put(item TopNItem) (isUpdate bool) {
if idx, ok := hp.index[item.ID()]; ok {
hp.items[idx] = item
heap.Fix(hp, idx)
return true
}
heap.Push(hp, item)
return false
}

// Remove deletes item by ID and returns it.
func (hp *IndexedHeap) Remove(id uint64) TopNItem {
if idx, ok := hp.index[id]; ok {
item := heap.Remove(hp, idx)
return item.(TopNItem)
}
return nil
}
83 changes: 83 additions & 0 deletions pkg/heap/indexed_heap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heap

import (
"testing"

. "github.com/pingcap/check"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testIndexedHeapSuite{})

type testIndexedHeapSuite struct {
}

type testItem struct {
id uint64
value float64
}

func (it *testItem) ID() uint64 {
return it.id
}

func (it *testItem) Less(k int, than TopNItem) bool {
return it.value < than.(*testItem).value
}

func NewTestItem(id uint64, value float64) *testItem {
return &testItem{
id: id,
value: value,
}
}

func (s *testIndexedHeapSuite) Test(c *C) {
h := NewMinHeap(0, 10)
v := h.Top()
c.Assert(v, Equals, nil)

h.Put(NewTestItem(1, 1.0))
h.Put(NewTestItem(2, 2.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, 1.0)

h.Put(NewTestItem(2, 0.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, 0.0)

h.Put(NewTestItem(1, -1.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, -1.0)

h = NewMaxHeap(0, 10)
h.Put(NewTestItem(1, 1.0))
h.Put(NewTestItem(2, 2.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, 2.0)

h.Put(NewTestItem(2, 3.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, 3.0)

h.Put(NewTestItem(1, 4.0))
v = h.Top()
c.Assert(v.(*testItem).value, Equals, 4.0)
}
3 changes: 2 additions & 1 deletion server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/heap"
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/pkg/slice"
"go.uber.org/zap"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (stat *HotPeerStat) ID() uint64 {
}

// Less compares two HotPeerStat.Implementing TopNItem.
func (stat *HotPeerStat) Less(k int, than TopNItem) bool {
func (stat *HotPeerStat) Less(k int, than heap.TopNItem) bool {
return stat.GetLoad(RegionStatKind(k)) < than.(*HotPeerStat).GetLoad(RegionStatKind(k))
}

Expand Down
127 changes: 6 additions & 121 deletions server/statistics/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,9 @@ import (
"fmt"
"sync"
"time"
)

// TopNItem represents a single object in TopN.
type TopNItem interface {
// ID is used to check identity.
ID() uint64
// Less tests whether the current item is less than the given argument in the `k`th dimension.
Less(k int, than TopNItem) bool
}
. "github.com/tikv/pd/pkg/heap"
)

// TopN maintains the N largest items of multiple dimensions.
type TopN struct {
Expand Down Expand Up @@ -130,16 +124,16 @@ func (tn *TopN) maintain() {
type singleTopN struct {
k int
n int
topn *indexedHeap
rest *indexedHeap
topn *IndexedHeap
rest *IndexedHeap
}

func newSingleTopN(k, n int) *singleTopN {
return &singleTopN{
k: k,
n: n,
topn: newTopNHeap(k, n),
rest: newRevTopNHeap(k, n),
topn: NewMinHeap(k, n),
rest: NewMaxHeap(k, n),
}
}

Expand Down Expand Up @@ -211,115 +205,6 @@ func (stn *singleTopN) maintain() {
}
}

// indexedHeap is a heap with index.
type indexedHeap struct {
k int
rev bool
items []TopNItem
index map[uint64]int
}

func newTopNHeap(k, hint int) *indexedHeap {
return &indexedHeap{
k: k,
rev: false,
items: make([]TopNItem, 0, hint),
index: map[uint64]int{},
}
}

func newRevTopNHeap(k, hint int) *indexedHeap {
return &indexedHeap{
k: k,
rev: true,
items: make([]TopNItem, 0, hint),
index: map[uint64]int{},
}
}

// Implementing heap.Interface.
func (hp *indexedHeap) Len() int {
return len(hp.items)
}

// Implementing heap.Interface.
func (hp *indexedHeap) Less(i, j int) bool {
if !hp.rev {
return hp.items[i].Less(hp.k, hp.items[j])
}
return hp.items[j].Less(hp.k, hp.items[i])
}

// Implementing heap.Interface.
func (hp *indexedHeap) Swap(i, j int) {
lid := hp.items[i].ID()
rid := hp.items[j].ID()
hp.items[i], hp.items[j] = hp.items[j], hp.items[i]
hp.index[lid] = j
hp.index[rid] = i
}

// Implementing heap.Interface.
func (hp *indexedHeap) Push(x interface{}) {
item := x.(TopNItem)
hp.index[item.ID()] = hp.Len()
hp.items = append(hp.items, item)
}

// Implementing heap.Interface.
func (hp *indexedHeap) Pop() interface{} {
l := hp.Len()
item := hp.items[l-1]
hp.items = hp.items[:l-1]
delete(hp.index, item.ID())
return item
}

// Top returns the top item.
func (hp *indexedHeap) Top() TopNItem {
if hp.Len() <= 0 {
return nil
}
return hp.items[0]
}

// Get returns item with the given ID.
func (hp *indexedHeap) Get(id uint64) TopNItem {
idx, ok := hp.index[id]
if !ok {
return nil
}
item := hp.items[idx]
return item
}

// GetAll returns all the items.
func (hp *indexedHeap) GetAll() []TopNItem {
all := make([]TopNItem, len(hp.items))
copy(all, hp.items)
return all
}

// Put inserts item or updates the old item if it exists.
func (hp *indexedHeap) Put(item TopNItem) (isUpdate bool) {
if idx, ok := hp.index[item.ID()]; ok {
hp.items[idx] = item
heap.Fix(hp, idx)
return true
}
heap.Push(hp, item)
return false
}

// Remove deletes item by ID and returns it.
func (hp *indexedHeap) Remove(id uint64) TopNItem {
if idx, ok := hp.index[id]; ok {
item := heap.Remove(hp, idx)
return item.(TopNItem)
}
return nil
}

type ttlItem struct {
id uint64
expire time.Time
Expand Down
Loading

0 comments on commit e9d55b6

Please sign in to comment.