Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ketama hashring replication #5427

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5339](https://github.com/thanos-io/thanos/pull/5339) Receive: Fix deadlock on interrupt in routerOnly mode
- [#5357](https://github.com/thanos-io/thanos/pull/5357) Store: fix groupcache handling of slashes
- [#5427](https://github.com/thanos-io/thanos/pull/5427) Receive: Fix Ketama hashring replication consistency

### Added

9 changes: 2 additions & 7 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
@@ -106,11 +106,6 @@ type ketamaHashring struct {
}

func newKetamaHashring(endpoints []string, sectionsPerNode int) *ketamaHashring {
// Replication works by choosing subsequent nodes in the ring.
// In order to improve consistency, we avoid relying on the ordering of the endpoints
// and sort them lexicographically.
sort.Strings(endpoints)

numSections := len(endpoints) * sectionsPerNode
ring := ketamaHashring{
endpoints: endpoints,
@@ -156,8 +151,8 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
i = 0
}

nodeIndex := (c.sections[i].endpointIndex + n) % c.numEndpoints

i = (i + n) % numSections
nodeIndex := c.sections[i].endpointIndex
return c.endpoints[nodeIndex], nil
}

61 changes: 44 additions & 17 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
@@ -154,7 +154,7 @@ func TestHashringGet(t *testing.T) {
}
}

func TestConsistentHashringGet(t *testing.T) {
func TestKetamaHashringGet(t *testing.T) {
baseTS := &prompb.TimeSeries{
Labels: []labelpb.ZLabel{
{
@@ -181,21 +181,21 @@ func TestConsistentHashringGet(t *testing.T) {
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 1,
expectedNode: "node-3",
expectedNode: "node-1",
},
{
name: "base case with replication",
nodes: []string{"node-1", "node-2", "node-3"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with replication and reordered nodes",
nodes: []string{"node-1", "node-3", "node-2"},
ts: baseTS,
n: 2,
expectedNode: "node-1",
expectedNode: "node-2",
},
{
name: "base case with new node at beginning of ring",
@@ -234,8 +234,8 @@ func TestConsistentHashringGet(t *testing.T) {
}
}

func TestConsistentHashringConsistency(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringConsistency(t *testing.T) {
series := makeSeries()

ringA := []string{"node-1", "node-2", "node-3"}
a1, err := assignSeries(series, ringA)
@@ -254,8 +254,8 @@ func TestConsistentHashringConsistency(t *testing.T) {
}
}

func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringIncreaseAtEnd(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-2", "node-3"}
initialAssignments, err := assignSeries(series, initialRing)
@@ -274,8 +274,8 @@ func TestConsistentHashringIncreaseAtEnd(t *testing.T) {
}
}

func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
series := makeSeries(10000)
func TestKetamaHashringIncreaseInMiddle(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-3"}
initialAssignments, err := assignSeries(series, initialRing)
@@ -294,7 +294,28 @@ func TestConsistentHashringIncreaseInMiddle(t *testing.T) {
}
}

func makeSeries(numSeries int) []*prompb.TimeSeries {
func TestKetamaHashringReplicationConsistency(t *testing.T) {
series := makeSeries()

initialRing := []string{"node-1", "node-4", "node-5"}
initialAssignments, err := assignReplicatedSeries(series, initialRing, 2)
require.NoError(t, err)

resizedRing := []string{"node-4", "node-3", "node-1", "node-2", "node-5"}
reassignments, err := assignReplicatedSeries(series, resizedRing, 2)
require.NoError(t, err)

// Assert that the initial nodes have no new keys after increasing the ring size
for _, node := range initialRing {
for _, ts := range reassignments[node] {
foundInInitialAssignment := findSeries(initialAssignments, node, ts)
require.True(t, foundInInitialAssignment, "node %s contains new series after resizing", node)
}
}
}

func makeSeries() []*prompb.TimeSeries {
numSeries := 10000
series := make([]*prompb.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
series[i] = &prompb.TimeSeries{
@@ -322,15 +343,21 @@ func findSeries(initialAssignments map[string][]*prompb.TimeSeries, node string,
}

func assignSeries(series []*prompb.TimeSeries, nodes []string) (map[string][]*prompb.TimeSeries, error) {
return assignReplicatedSeries(series, nodes, 0)
}

func assignReplicatedSeries(series []*prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]*prompb.TimeSeries, error) {
hashRing := newKetamaHashring(nodes, SectionsPerNode)
assignments := make(map[string][]*prompb.TimeSeries)
for _, ts := range series {
result, err := hashRing.Get("tenant", ts)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)
for i := uint64(0); i < replicas; i++ {
for _, ts := range series {
result, err := hashRing.GetN("tenant", ts, i)
if err != nil {
return nil, err
}
assignments[result] = append(assignments[result], ts)

}
}

return assignments, nil