Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[utils] Implement ShardedMap #248

Merged
merged 8 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77
github.com/mitchellh/mapstructure v1.3.3
github.com/pkg/errors v0.9.1 // indirect
github.com/segmentio/fasthash v1.0.3
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ github.com/rs/cors v0.0.0-20160617231935-a62a804a8a00/go.mod h1:gFx+x8UowdsKA9Ac
github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/shirou/gopsutil v2.20.5+incompatible h1:tYH07UPoQt0OCQdgWWMgYHy3/a9bcxNpBIysykNIP7I=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
Expand Down
File renamed without changes.
File renamed without changes.
85 changes: 85 additions & 0 deletions utils/sharded_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"github.com/segmentio/fasthash/fnv1a"
)

const (
// DefaultShards is the default number of shards
// to use in ShardedMap.
DefaultShards = 256
)

// shardMapEntry governs access to the shard of
// the map contained at a particular index.
type shardMapEntry struct {
mutex *PriorityMutex
entries map[string]interface{}
}

// ShardedMap allows concurrent writes
// to a map by sharding the map into some
// number of independently locked subsections.
type ShardedMap struct {
shards []*shardMapEntry
}

// NewShardedMap creates a new *ShardedMap
// with some number of shards. The larger the
// number provided for shards, the less lock
// contention there will be.
//
// As a rule of thumb, shards should usually
// be set to the concurrency of the caller.
func NewShardedMap(shards int) *ShardedMap {
m := &ShardedMap{
shards: make([]*shardMapEntry, shards),
}

for i := 0; i < shards; i++ {
m.shards[i] = &shardMapEntry{
entries: map[string]interface{}{},
mutex: new(PriorityMutex),
}
}

return m
}

// shardIndex returns the index of the shard
// that could contain the key.
func (m *ShardedMap) shardIndex(key string) int {
return int(fnv1a.HashString32(key) % uint32(len(m.shards)))
}

// Lock acquires the lock for a shard that could contain
// the key. This syntax allows the caller to perform multiple
// operations while holding the lock for a single shard.
func (m *ShardedMap) Lock(key string, priority bool) map[string]interface{} {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observe that nothing about this package uses the resulting map[string]interface{} as a map. This could conceivably be a more general ShardedContainer API, where Lock returns an interface{} and it's up to the caller to type-assert it to whatever kind of container they're sharding.

This means you'd also have to parameterize the constructor to construct caller-specified container shards rather than assuming they're maps.

One nice benefit of this approach is that (apart from the need to type-assert the result of Lock) it's more typesafe, in that the caller can say it's a map[string]uint64 instead of having to settle for a map[string]interface{}.

Copy link
Contributor Author

@patrick-ogrady patrick-ogrady Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observe that nothing about this package uses the resulting map[string]interface{} as a map. This could conceivably be a more general ShardedContainer API, where Lock returns an interface{} and it's up to the caller to type-assert it to whatever kind of container they're sharding.

With the current API, the only thing that we would probably want to be asserted is that interface{} is a pointer (like json.Unmarshal). Otherwise, any modifications to the returned object would be ineffectual (or we return a pointer to what is stored, which may get hairy).

One nice benefit of this approach is that (apart from the need to type-assert the result of Lock) it's more typesafe, in that the caller can say it's a map[string]uint64 instead of having to settle for a map[string]interface{}.

Because the caller (me in MutexMap) doesn't need to parse the value in the map if it doesn't exist, this API allows the caller to avoid unnecessary reflection. However, I see your point about why a ShardedContainer would be cool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a change we could make in a future PR once we have a clear use case in mind (likely just wrapping a ShardedContainer implementation). For now, I'd like to keep this optimized for the use case I have in mind (optimization of the reconciler and storage packages).

shardIndex := m.shardIndex(key)
shard := m.shards[shardIndex]
shard.mutex.Lock(priority)
return shard.entries
}

// Unlock releases the lock for a shard that could contain
// the key.
func (m *ShardedMap) Unlock(key string) {
shardIndex := m.shardIndex(key)
shard := m.shards[shardIndex]
shard.mutex.Unlock()
}
69 changes: 69 additions & 0 deletions utils/sharded_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

func TestShardedMap(t *testing.T) {
m := NewShardedMap(2)
g, _ := errgroup.WithContext(context.Background())

// To test locking, we use channels
// that will cause deadlock if not executed
// concurrently.
a := make(chan struct{})
b := make(chan struct{})

g.Go(func() error {
s := m.Lock("a", false)
assert.Len(t, s, 0)
s["test"] = "a"
<-a
close(b)
m.Unlock("a")
return nil
})

g.Go(func() error {
s := m.Lock("b", false)
assert.Len(t, s, 0)
s["test"] = "b"
close(a)
<-b
m.Unlock("b")
return nil
})

time.Sleep(1 * time.Second)
assert.NoError(t, g.Wait())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like none of the goroutines in g actually need to return an error, which means you could simplify a bit by using sync.WaitGroup instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding sync.WaitGroup may take more lines/be slightly more complex than just returning nil. I think we would need a wg.Add before spawning each goroutine and would need to call wg.Done at the end of each goroutine. Going to just leave as is for now!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the careful review!


// Ensure keys set correctly
s := m.Lock("a", false)
assert.Len(t, s, 1)
assert.Equal(t, s["test"], "a")
m.Unlock("a")

s = m.Lock("b", false)
assert.Len(t, s, 1)
assert.Equal(t, s["test"], "b")
m.Unlock("b")
}