From 1fcd0795632751f95299fdeda52fc416d178992b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Dec 2015 17:42:36 +0000 Subject: [PATCH 1/5] Use Readdirnames to reduce number of stats we're doing. --- common/fs/fs.go | 15 +++++++++++++++ probe/endpoint/procspy/proc.go | 14 +++++++------- probe/process/walker_linux.go | 5 ++--- test/fs/fs.go | 23 +++++++++++++++++++++++ 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/common/fs/fs.go b/common/fs/fs.go index da4d903faf..6b9eb41629 100644 --- a/common/fs/fs.go +++ b/common/fs/fs.go @@ -10,6 +10,7 @@ import ( // Interface is the filesystem interface type. type Interface interface { ReadDir(string) ([]os.FileInfo, error) + ReadDirNames(string) ([]string, error) ReadFile(string) ([]byte, error) Lstat(string, *syscall.Stat_t) error Stat(string, *syscall.Stat_t) error @@ -25,6 +26,15 @@ func (realFS) ReadDir(path string) ([]os.FileInfo, error) { return ioutil.ReadDir(path) } +func (realFS) ReadDirNames(path string) ([]string, error) { + fh, err := os.Open(path) + if err != nil { + return nil, err + } + defer fh.Close() + return fh.Readdirnames(-1) +} + func (realFS) ReadFile(path string) ([]byte, error) { return ioutil.ReadFile(path) } @@ -48,6 +58,11 @@ func ReadDir(path string) ([]os.FileInfo, error) { return fs.ReadDir(path) } +// ReadDirNames see os.File.ReadDirNames +func ReadDirNames(path string) ([]string, error) { + return fs.ReadDirNames(path) +} + // ReadFile see ioutil.ReadFile func ReadFile(path string) ([]byte, error) { return fs.ReadFile(path) diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index 312c0177ee..6823a3ede8 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -34,15 +34,10 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err walker.Walk(func(p process.Process) { dirName := strconv.Itoa(p.PID) fdBase := filepath.Join(procRoot, dirName, "fd") - fds, err := fs.ReadDir(fdBase) - if err != nil { - // Process is be gone by now, or we don't have access. - return - } // Read network namespace, and if we haven't seen it before, // read /proc//net/tcp - err = fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT) + err := fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT) if err != nil { return } @@ -53,9 +48,14 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf) } + fds, err := fs.ReadDirNames(fdBase) + if err != nil { + // Process is be gone by now, or we don't have access. + return + } for _, fd := range fds { // Direct use of syscall.Stat() to save garbage. - err = fs.Stat(filepath.Join(fdBase, fd.Name()), &statT) + err = fs.Stat(filepath.Join(fdBase, fd), &statT) if err != nil { continue } diff --git a/probe/process/walker_linux.go b/probe/process/walker_linux.go index 7b04311749..5eecc5d766 100644 --- a/probe/process/walker_linux.go +++ b/probe/process/walker_linux.go @@ -23,13 +23,12 @@ func NewWalker(procRoot string) Walker { // passes one-by-one to the supplied function. Walk is only made public // so that is can be tested. func (w *walker) Walk(f func(Process)) error { - dirEntries, err := fs.ReadDir(w.procRoot) + dirEntries, err := fs.ReadDirNames(w.procRoot) if err != nil { return err } - for _, dirEntry := range dirEntries { - filename := dirEntry.Name() + for _, filename := range dirEntries { pid, err := strconv.Atoi(filename) if err != nil { continue diff --git a/test/fs/fs.go b/test/fs/fs.go index 56b6a87d56..b59f37dc83 100644 --- a/test/fs/fs.go +++ b/test/fs/fs.go @@ -89,6 +89,24 @@ func (p dir) ReadDir(path string) ([]os.FileInfo, error) { return fs.ReadDir(tail) } +func (p dir) ReadDirNames(path string) ([]string, error) { + if path == "/" { + result := []string{} + for _, v := range p.entries { + result = append(result, v.Name()) + } + return result, nil + } + + head, tail := split(path) + fs, ok := p.entries[head] + if !ok { + return nil, fmt.Errorf("Not found: %s", path) + } + + return fs.ReadDirNames(tail) +} + func (p dir) ReadFile(path string) ([]byte, error) { if path == "/" { return nil, fmt.Errorf("I'm a directory!") @@ -156,6 +174,11 @@ func (p File) ReadDir(path string) ([]os.FileInfo, error) { return nil, fmt.Errorf("I'm a file!") } +// ReadDirNames implements FS +func (p File) ReadDirNames(path string) ([]string, error) { + return nil, fmt.Errorf("I'm a file!") +} + // ReadFile implements FS func (p File) ReadFile(path string) ([]byte, error) { if path != "/" { From cde5920f9add19acc9f66c1208aabcf1cdb72c7d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Dec 2015 17:43:19 +0000 Subject: [PATCH 2/5] Cache the contents of some files in /proc Also cache contents of /proc/foo/stat, but only for 10s. --- probe/process/cache.go | 72 ++++ probe/process/walker_linux.go | 16 +- vendor/github.com/coocood/freecache/LICENSE | 21 ++ vendor/github.com/coocood/freecache/README.md | 114 ++++++ vendor/github.com/coocood/freecache/cache.go | 135 ++++++++ .../coocood/freecache/cache_test.go | 249 +++++++++++++ .../github.com/coocood/freecache/ringbuf.go | 243 +++++++++++++ .../coocood/freecache/ringbuf_test.go | 35 ++ .../github.com/coocood/freecache/segment.go | 326 ++++++++++++++++++ .../coocood/freecache/server/main.go | 318 +++++++++++++++++ vendor/github.com/spaolacci/murmur3/LICENSE | 24 ++ vendor/github.com/spaolacci/murmur3/README.md | 84 +++++ vendor/github.com/spaolacci/murmur3/murmur.go | 65 ++++ .../github.com/spaolacci/murmur3/murmur128.go | 189 ++++++++++ .../github.com/spaolacci/murmur3/murmur32.go | 154 +++++++++ .../github.com/spaolacci/murmur3/murmur64.go | 45 +++ .../spaolacci/murmur3/murmur_test.go | 251 ++++++++++++++ vendor/manifest | 14 +- 18 files changed, 2341 insertions(+), 14 deletions(-) create mode 100644 probe/process/cache.go create mode 100644 vendor/github.com/coocood/freecache/LICENSE create mode 100644 vendor/github.com/coocood/freecache/README.md create mode 100644 vendor/github.com/coocood/freecache/cache.go create mode 100644 vendor/github.com/coocood/freecache/cache_test.go create mode 100644 vendor/github.com/coocood/freecache/ringbuf.go create mode 100644 vendor/github.com/coocood/freecache/ringbuf_test.go create mode 100644 vendor/github.com/coocood/freecache/segment.go create mode 100644 vendor/github.com/coocood/freecache/server/main.go create mode 100644 vendor/github.com/spaolacci/murmur3/LICENSE create mode 100644 vendor/github.com/spaolacci/murmur3/README.md create mode 100644 vendor/github.com/spaolacci/murmur3/murmur.go create mode 100644 vendor/github.com/spaolacci/murmur3/murmur128.go create mode 100644 vendor/github.com/spaolacci/murmur3/murmur32.go create mode 100644 vendor/github.com/spaolacci/murmur3/murmur64.go create mode 100644 vendor/github.com/spaolacci/murmur3/murmur_test.go diff --git a/probe/process/cache.go b/probe/process/cache.go new file mode 100644 index 0000000000..e17713e64e --- /dev/null +++ b/probe/process/cache.go @@ -0,0 +1,72 @@ +package process + +import ( + "strconv" + "strings" + "time" + + "github.com/armon/go-metrics" + "github.com/coocood/freecache" + + "github.com/weaveworks/scope/common/fs" +) + +const ( + generalTimeout = 30 // seconds + statsTimeout = 10 //seconds +) + +var ( + hitMetricsKey = []string{"process", "cache", "hit"} + missMetricsKey = []string{"process", "cache", "miss"} +) + +var fileCache = freecache.NewCache(1024 * 16) + +type entry struct { + buf []byte + err error + ts time.Time +} + +func cachedReadFile(path string) ([]byte, error) { + key := []byte(path) + if v, err := fileCache.Get(key); err == nil { + metrics.IncrCounter(hitMetricsKey, 1.0) + return v, nil + } + + buf, err := fs.ReadFile(path) + fileCache.Set(key, buf, generalTimeout) + metrics.IncrCounter(missMetricsKey, 1.0) + return buf, err +} + +// we cache the stats, but for a shorter period +func readStats(path string) (int, int, error) { + var ( + key = []byte(path) + buf []byte + err error + ) + if buf, err = fileCache.Get(key); err == nil { + metrics.IncrCounter(hitMetricsKey, 1.0) + } else { + buf, err = fs.ReadFile(path) + if err != nil { + return -1, -1, err + } + fileCache.Set(key, buf, statsTimeout) + metrics.IncrCounter(missMetricsKey, 1.0) + } + splits := strings.Fields(string(buf)) + ppid, err := strconv.Atoi(splits[3]) + if err != nil { + return -1, -1, err + } + threads, err := strconv.Atoi(splits[19]) + if err != nil { + return -1, -1, err + } + return ppid, threads, nil +} diff --git a/probe/process/walker_linux.go b/probe/process/walker_linux.go index 5eecc5d766..725545444a 100644 --- a/probe/process/walker_linux.go +++ b/probe/process/walker_linux.go @@ -34,29 +34,19 @@ func (w *walker) Walk(f func(Process)) error { continue } - stat, err := fs.ReadFile(path.Join(w.procRoot, filename, "stat")) + ppid, threads, err := readStats(path.Join(w.procRoot, filename, "stat")) if err != nil { continue } - splits := strings.Fields(string(stat)) - ppid, err := strconv.Atoi(splits[3]) - if err != nil { - return err - } - - threads, err := strconv.Atoi(splits[19]) - if err != nil { - return err - } cmdline := "" - if cmdlineBuf, err := fs.ReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil { + if cmdlineBuf, err := cachedReadFile(path.Join(w.procRoot, filename, "cmdline")); err == nil { cmdlineBuf = bytes.Replace(cmdlineBuf, []byte{'\000'}, []byte{' '}, -1) cmdline = string(cmdlineBuf) } comm := "(unknown)" - if commBuf, err := fs.ReadFile(path.Join(w.procRoot, filename, "comm")); err == nil { + if commBuf, err := cachedReadFile(path.Join(w.procRoot, filename, "comm")); err == nil { comm = strings.TrimSpace(string(commBuf)) } diff --git a/vendor/github.com/coocood/freecache/LICENSE b/vendor/github.com/coocood/freecache/LICENSE new file mode 100644 index 0000000000..fb5a06b4b9 --- /dev/null +++ b/vendor/github.com/coocood/freecache/LICENSE @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2015 Ewan Chou. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/vendor/github.com/coocood/freecache/README.md b/vendor/github.com/coocood/freecache/README.md new file mode 100644 index 0000000000..8b73c81e70 --- /dev/null +++ b/vendor/github.com/coocood/freecache/README.md @@ -0,0 +1,114 @@ +#FreeCache - A cache library for Go with zero GC overhead. + +Long lived objects in memory introduce expensive GC overhead, the GC latency can go up to hundreds of milliseconds with just a few millions of live objects. +With FreeCache, you can cache unlimited number of objects in memory without increased GC latency. + +[![Build Status](https://travis-ci.org/coocood/freecache.png?branch=master)](https://travis-ci.org/coocood/freecache) +[![GoCover](http://gocover.io/_badge/github.com/coocood/freecache)](http://gocover.io/github.com/coocood/freecache) +[![GoDoc](https://godoc.org/github.com/coocood/freecache?status.svg)](https://godoc.org/github.com/coocood/freecache) + +##About GC Pause Issue + +Here is the demo code for the GC pause issue, you can run it yourself. +On my laptop, GC pause with FreeCache is under 200us, but with map, it is more than 300ms. + +```go +package main + +import ( + "fmt" + "github.com/coocood/freecache" + "runtime" + "runtime/debug" + "time" +) + +var mapCache map[string][]byte + +func GCPause() time.Duration { + runtime.GC() + var stats debug.GCStats + debug.ReadGCStats(&stats) + return stats.Pause[0] +} + +func main() { + n := 3000 * 1000 + freeCache := freecache.NewCache(512 * 1024 * 1024) + debug.SetGCPercent(10) + for i := 0; i < n; i++ { + key := fmt.Sprintf("key%v", i) + val := make([]byte, 10) + freeCache.Set([]byte(key), val, 0) + } + fmt.Println("GC pause with free cache:", GCPause()) + freeCache = nil + mapCache = make(map[string][]byte) + for i := 0; i < n; i++ { + key := fmt.Sprintf("key%v", i) + val := make([]byte, 10) + mapCache[key] = val + } + fmt.Println("GC pause with map cache:", GCPause()) +} +``` + +##Features +* Store hundreds of millions of entries +* Zero GC overhead +* High concurrent thread-safe access +* Pure Go implementation +* Expiration support +* Nearly LRU algorithm +* Strictly limited memory usage +* Come with a toy server that supports a few basic Redis commands with pipeline + +##Performance +Here is the benchmark result compares to built-in map, `Set` performance is about 2x faster than built-in map, `Get` performance is about 1/2x slower than built-in map. Since it is single threaded benchmark, in multi-threaded environment, +FreeCache should be many times faster than single lock protected built-in map. + + BenchmarkCacheSet 3000000 446 ns/op + BenchmarkMapSet 2000000 861 ns/op + BenchmarkCacheGet 3000000 517 ns/op + BenchmarkMapGet 10000000 212 ns/op + +##Example Usage +```go +cacheSize := 100 * 1024 * 1024 +cache := freecache.NewCache(cacheSize) +debug.SetGCPercent(20) +key := []byte("abc") +val := []byte("def") +expire := 60 // expire in 60 seconds +cache.Set(key, val, expire) +got, err := cache.Get(key) +if err != nil { + fmt.Println(err) +} else { + fmt.Println(string(got)) +} +affected := cache.Del(key) +fmt.Println("deleted key ", affected) +fmt.Println("entry count ", cache.EntryCount()) +``` + +##Notice +* Recommended Go version is 1.4. +* Memory is preallocated. +* If you allocate large amount of memory, you may need to set `debug.SetGCPercent()` +to a much lower percentage to get a normal GC frequency. + +##How it is done +FreeCache avoids GC overhead by reducing the number of pointers. +No matter how many entries stored in it, there are only 512 pointers. +The data set is sharded into 256 segments by the hash value of the key. +Each segment has only two pointers, one is the ring buffer that stores keys and values, +the other one is the index slice which used to lookup for an entry. +Each segment has its own lock, so it supports high concurrent access. + +##TODO +* Support dump to file and load from file. +* Support resize cache size at runtime. + +##License +The MIT License diff --git a/vendor/github.com/coocood/freecache/cache.go b/vendor/github.com/coocood/freecache/cache.go new file mode 100644 index 0000000000..086ddea514 --- /dev/null +++ b/vendor/github.com/coocood/freecache/cache.go @@ -0,0 +1,135 @@ +package freecache + +import ( + "sync" + "sync/atomic" + + "github.com/spaolacci/murmur3" +) + +type Cache struct { + locks [256]sync.Mutex + segments [256]segment + hitCount int64 + missCount int64 +} + +func hashFunc(data []byte) uint64 { + return murmur3.Sum64(data) +} + +// The cache size will be set to 512KB at minimum. +// If the size is set relatively large, you should call +// `debug.SetGCPercent()`, set it to a much smaller value +// to limit the memory consumption and GC pause time. +func NewCache(size int) (cache *Cache) { + if size < 512*1024 { + size = 512 * 1024 + } + cache = new(Cache) + for i := 0; i < 256; i++ { + cache.segments[i] = newSegment(size/256, i) + } + return +} + +// If the key is larger than 65535 or value is larger than 1/1024 of the cache size, +// the entry will not be written to the cache. expireSeconds <= 0 means no expire, +// but it can be evicted when cache is full. +func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { + hashVal := hashFunc(key) + segId := hashVal & 255 + cache.locks[segId].Lock() + err = cache.segments[segId].set(key, value, hashVal, expireSeconds) + cache.locks[segId].Unlock() + return +} + +// Get the value or not found error. +func (cache *Cache) Get(key []byte) (value []byte, err error) { + hashVal := hashFunc(key) + segId := hashVal & 255 + cache.locks[segId].Lock() + value, err = cache.segments[segId].get(key, hashVal) + cache.locks[segId].Unlock() + if err == nil { + atomic.AddInt64(&cache.hitCount, 1) + } else { + atomic.AddInt64(&cache.missCount, 1) + } + return +} + +func (cache *Cache) Del(key []byte) (affected bool) { + hashVal := hashFunc(key) + segId := hashVal & 255 + cache.locks[segId].Lock() + affected = cache.segments[segId].del(key, hashVal) + cache.locks[segId].Unlock() + return +} + +func (cache *Cache) EvacuateCount() (count int64) { + for i := 0; i < 256; i++ { + count += atomic.LoadInt64(&cache.segments[i].totalEvacuate) + } + return +} + +func (cache *Cache) EntryCount() (entryCount int64) { + for i := 0; i < 256; i++ { + entryCount += atomic.LoadInt64(&cache.segments[i].entryCount) + } + return +} + +// The average unix timestamp when a entry being accessed. +// Entries have greater access time will be evacuated when it +// is about to be overwritten by new value. +func (cache *Cache) AverageAccessTime() int64 { + var entryCount, totalTime int64 + for i := 0; i < 256; i++ { + totalTime += atomic.LoadInt64(&cache.segments[i].totalTime) + entryCount += atomic.LoadInt64(&cache.segments[i].totalCount) + } + if entryCount == 0 { + return 0 + } else { + return totalTime / entryCount + } +} + +func (cache *Cache) HitCount() int64 { + return atomic.LoadInt64(&cache.hitCount) +} + +func (cache *Cache) LookupCount() int64 { + return atomic.LoadInt64(&cache.hitCount) + atomic.LoadInt64(&cache.missCount) +} + +func (cache *Cache) HitRate() float64 { + lookupCount := cache.LookupCount() + if lookupCount == 0 { + return 0 + } else { + return float64(cache.HitCount()) / float64(lookupCount) + } +} + +func (cache *Cache) OverwriteCount() (overwriteCount int64) { + for i := 0; i < 256; i++ { + overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites) + } + return +} + +func (cache *Cache) Clear() { + for i := 0; i < 256; i++ { + cache.locks[i].Lock() + newSeg := newSegment(len(cache.segments[i].rb.data), i) + cache.segments[i] = newSeg + cache.locks[i].Unlock() + } + atomic.StoreInt64(&cache.hitCount, 0) + atomic.StoreInt64(&cache.missCount, 0) +} diff --git a/vendor/github.com/coocood/freecache/cache_test.go b/vendor/github.com/coocood/freecache/cache_test.go new file mode 100644 index 0000000000..90e771b7c1 --- /dev/null +++ b/vendor/github.com/coocood/freecache/cache_test.go @@ -0,0 +1,249 @@ +package freecache + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "fmt" + "strings" + "testing" + "time" +) + +func TestFreeCache(t *testing.T) { + cache := NewCache(1024) + if cache.HitRate() != 0 { + t.Error("initial hit rate should be zero") + } + if cache.AverageAccessTime() != 0 { + t.Error("initial average access time should be zero") + } + key := []byte("abcd") + val := []byte("efghijkl") + err := cache.Set(key, val, 0) + if err != nil { + t.Error("err should be nil") + } + value, err := cache.Get(key) + if err != nil || !bytes.Equal(value, val) { + t.Error("value not equal") + } + affected := cache.Del(key) + if !affected { + t.Error("del should return affected true") + } + value, err = cache.Get(key) + if err != ErrNotFound { + t.Error("error should be ErrNotFound after being deleted") + } + affected = cache.Del(key) + if affected { + t.Error("del should not return affected true") + } + + cache.Clear() + n := 5000 + for i := 0; i < n; i++ { + keyStr := fmt.Sprintf("key%v", i) + valStr := strings.Repeat(keyStr, 10) + err = cache.Set([]byte(keyStr), []byte(valStr), 0) + if err != nil { + t.Error(err) + } + } + time.Sleep(time.Second) + for i := 1; i < n; i += 2 { + keyStr := fmt.Sprintf("key%v", i) + cache.Get([]byte(keyStr)) + } + + for i := 1; i < n; i += 8 { + keyStr := fmt.Sprintf("key%v", i) + cache.Del([]byte(keyStr)) + } + + for i := 0; i < n; i += 2 { + keyStr := fmt.Sprintf("key%v", i) + valStr := strings.Repeat(keyStr, 10) + err = cache.Set([]byte(keyStr), []byte(valStr), 0) + if err != nil { + t.Error(err) + } + } + for i := 1; i < n; i += 2 { + keyStr := fmt.Sprintf("key%v", i) + expectedValStr := strings.Repeat(keyStr, 10) + value, err = cache.Get([]byte(keyStr)) + if err == nil { + if string(value) != expectedValStr { + t.Errorf("value is %v, expected %v", string(value), expectedValStr) + } + } + } + + t.Logf("hit rate is %v, evacuates %v, entries %v, average time %v\n", + cache.HitRate(), cache.EvacuateCount(), cache.EntryCount(), cache.AverageAccessTime()) +} + +func TestOverwrite(t *testing.T) { + cache := NewCache(1024) + key := []byte("abcd") + var val []byte + cache.Set(key, val, 0) + val = []byte("efgh") + cache.Set(key, val, 0) + val = append(val, 'i') + cache.Set(key, val, 0) + if count := cache.OverwriteCount(); count != 0 { + t.Error("overwrite count is", count, "expected ", 0) + } + res, _ := cache.Get(key) + if string(res) != string(val) { + t.Error(string(res)) + } + val = append(val, 'j') + cache.Set(key, val, 0) + res, _ = cache.Get(key) + if string(res) != string(val) { + t.Error(string(res), "aaa") + } + val = append(val, 'k') + cache.Set(key, val, 0) + res, _ = cache.Get(key) + if string(res) != "efghijk" { + t.Error(string(res)) + } + val = append(val, 'l') + cache.Set(key, val, 0) + res, _ = cache.Get(key) + if string(res) != "efghijkl" { + t.Error(string(res)) + } + val = append(val, 'm') + cache.Set(key, val, 0) + if count := cache.OverwriteCount(); count != 3 { + t.Error("overwrite count is", count, "expected ", 3) + } + +} + +func TestExpire(t *testing.T) { + cache := NewCache(1024) + key := []byte("abcd") + val := []byte("efgh") + err := cache.Set(key, val, 1) + if err != nil { + t.Error("err should be nil") + } + time.Sleep(time.Second) + val, err = cache.Get(key) + if err == nil { + t.Fatal("key should be expired", string(val)) + } +} + +func TestLargeEntry(t *testing.T) { + cacheSize := 512 * 1024 + cache := NewCache(cacheSize) + key := make([]byte, 65536) + val := []byte("efgh") + err := cache.Set(key, val, 0) + if err != ErrLargeKey { + t.Error("large key should return ErrLargeKey") + } + val, err = cache.Get(key) + if val != nil { + t.Error("value should be nil when get a big key") + } + key = []byte("abcd") + maxValLen := cacheSize/1024 - ENTRY_HDR_SIZE - len(key) + val = make([]byte, maxValLen+1) + err = cache.Set(key, val, 0) + if err != ErrLargeEntry { + t.Error("err should be ErrLargeEntry", err) + } + val = make([]byte, maxValLen-2) + err = cache.Set(key, val, 0) + if err != nil { + t.Error(err) + } + val = append(val, 0) + err = cache.Set(key, val, 0) + if err != nil { + t.Error(err) + } + val = append(val, 0) + err = cache.Set(key, val, 0) + if err != nil { + t.Error(err) + } + if cache.OverwriteCount() != 1 { + t.Error("over write count should be one.") + } + val = append(val, 0) + err = cache.Set(key, val, 0) + if err != ErrLargeEntry { + t.Error("err should be ErrLargeEntry", err) + } +} + +func BenchmarkCacheSet(b *testing.B) { + cache := NewCache(256 * 1024 * 1024) + var key [8]byte + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + cache.Set(key[:], make([]byte, 8), 0) + } +} + +func BenchmarkMapSet(b *testing.B) { + m := make(map[string][]byte) + var key [8]byte + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + m[string(key[:])] = make([]byte, 8) + } +} + +func BenchmarkCacheGet(b *testing.B) { + b.StopTimer() + cache := NewCache(256 * 1024 * 1024) + var key [8]byte + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + cache.Set(key[:], make([]byte, 8), 0) + } + b.StartTimer() + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + cache.Get(key[:]) + } +} + +func BenchmarkMapGet(b *testing.B) { + b.StopTimer() + m := make(map[string][]byte) + var key [8]byte + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + m[string(key[:])] = make([]byte, 8) + } + b.StartTimer() + var hitCount int64 + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + if m[string(key[:])] != nil { + hitCount++ + } + } +} + +func BenchmarkHashFunc(b *testing.B) { + key := make([]byte, 8) + rand.Read(key) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + hashFunc(key) + } +} diff --git a/vendor/github.com/coocood/freecache/ringbuf.go b/vendor/github.com/coocood/freecache/ringbuf.go new file mode 100644 index 0000000000..b4e2632ce4 --- /dev/null +++ b/vendor/github.com/coocood/freecache/ringbuf.go @@ -0,0 +1,243 @@ +package freecache + +import ( + "bytes" + "errors" + "fmt" + "io" +) + +var ErrOutOfRange = errors.New("out of range") + +// Ring buffer has a fixed size, when data exceeds the +// size, old data will be overwritten by new data. +// It only contains the data in the stream from begin to end +type RingBuf struct { + begin int64 // beginning offset of the data stream. + end int64 // ending offset of the data stream. + data []byte + index int //range from '0' to 'len(rb.data)-1' +} + +func NewRingBuf(size int, begin int64) (rb RingBuf) { + rb.data = make([]byte, size) + rb.begin = begin + rb.end = begin + rb.index = 0 + return +} + +// Create a copy of the buffer. +func (rb *RingBuf) Dump() []byte { + dump := make([]byte, len(rb.data)) + copy(dump, rb.data) + return dump +} + +func (rb *RingBuf) String() string { + return fmt.Sprintf("[size:%v, start:%v, end:%v, index:%v]", len(rb.data), rb.begin, rb.end, rb.index) +} + +func (rb *RingBuf) Size() int64 { + return int64(len(rb.data)) +} + +func (rb *RingBuf) Begin() int64 { + return rb.begin +} + +func (rb *RingBuf) End() int64 { + return rb.end +} + +// read up to len(p), at off of the data stream. +func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) { + if off > rb.end || off < rb.begin { + err = ErrOutOfRange + return + } + var readOff int + if rb.end-rb.begin < int64(len(rb.data)) { + readOff = int(off - rb.begin) + } else { + readOff = rb.index + int(off-rb.begin) + } + if readOff >= len(rb.data) { + readOff -= len(rb.data) + } + readEnd := readOff + int(rb.end-off) + if readEnd <= len(rb.data) { + n = copy(p, rb.data[readOff:readEnd]) + } else { + n = copy(p, rb.data[readOff:]) + if n < len(p) { + n += copy(p[n:], rb.data[:readEnd-len(rb.data)]) + } + } + if n < len(p) { + err = io.EOF + } + return +} + +func (rb *RingBuf) Write(p []byte) (n int, err error) { + if len(p) > len(rb.data) { + err = ErrOutOfRange + return + } + for n < len(p) { + written := copy(rb.data[rb.index:], p[n:]) + rb.end += int64(written) + n += written + rb.index += written + if rb.index >= len(rb.data) { + rb.index -= len(rb.data) + } + } + if int(rb.end-rb.begin) > len(rb.data) { + rb.begin = rb.end - int64(len(rb.data)) + } + return +} + +func (rb *RingBuf) WriteAt(p []byte, off int64) (n int, err error) { + if off+int64(len(p)) > rb.end || off < rb.begin { + err = ErrOutOfRange + return + } + var writeOff int + if rb.end-rb.begin < int64(len(rb.data)) { + writeOff = int(off - rb.begin) + } else { + writeOff = rb.index + int(off-rb.begin) + } + if writeOff > len(rb.data) { + writeOff -= len(rb.data) + } + writeEnd := writeOff + int(rb.end-off) + if writeEnd <= len(rb.data) { + n = copy(rb.data[writeOff:writeEnd], p) + } else { + n = copy(rb.data[writeOff:], p) + if n < len(p) { + n += copy(rb.data[:writeEnd-len(rb.data)], p[n:]) + } + } + return +} + +func (rb *RingBuf) EqualAt(p []byte, off int64) bool { + if off+int64(len(p)) > rb.end || off < rb.begin { + return false + } + var readOff int + if rb.end-rb.begin < int64(len(rb.data)) { + readOff = int(off - rb.begin) + } else { + readOff = rb.index + int(off-rb.begin) + } + if readOff >= len(rb.data) { + readOff -= len(rb.data) + } + readEnd := readOff + len(p) + if readEnd <= len(rb.data) { + return bytes.Equal(p, rb.data[readOff:readEnd]) + } else { + firstLen := len(rb.data) - readOff + equal := bytes.Equal(p[:firstLen], rb.data[readOff:]) + if equal { + secondLen := len(p) - firstLen + equal = bytes.Equal(p[firstLen:], rb.data[:secondLen]) + } + return equal + } +} + +// Evacuate read the data at off, then write it to the the data stream, +// Keep it from being overwritten by new data. +func (rb *RingBuf) Evacuate(off int64, length int) (newOff int64) { + if off+int64(length) > rb.end || off < rb.begin { + return -1 + } + var readOff int + if rb.end-rb.begin < int64(len(rb.data)) { + readOff = int(off - rb.begin) + } else { + readOff = rb.index + int(off-rb.begin) + } + if readOff >= len(rb.data) { + readOff -= len(rb.data) + } + + if readOff == rb.index { + // no copy evacuate + rb.index += length + if rb.index >= len(rb.data) { + rb.index -= len(rb.data) + } + } else if readOff < rb.index { + var n = copy(rb.data[rb.index:], rb.data[readOff:readOff+length]) + rb.index += n + if rb.index == len(rb.data) { + rb.index = copy(rb.data, rb.data[readOff+n:readOff+length]) + } + } else { + var readEnd = readOff + length + var n int + if readEnd <= len(rb.data) { + n = copy(rb.data[rb.index:], rb.data[readOff:readEnd]) + rb.index += n + if rb.index == len(rb.data) { + rb.index = copy(rb.data, rb.data[readOff+n:readEnd]) + } + } else { + n = copy(rb.data[rb.index:], rb.data[readOff:]) + rb.index += n + var tail = length - n + n = copy(rb.data[rb.index:], rb.data[:tail]) + rb.index += n + if rb.index == len(rb.data) { + rb.index = copy(rb.data, rb.data[n:tail]) + } + } + } + newOff = rb.end + rb.end += int64(length) + if rb.begin < rb.end-int64(len(rb.data)) { + rb.begin = rb.end - int64(len(rb.data)) + } + return +} + +func (rb *RingBuf) Resize(newSize int) { + if len(rb.data) == newSize { + return + } + newData := make([]byte, newSize) + var offset int + if rb.end-rb.begin == int64(len(rb.data)) { + offset = rb.index + } + if int(rb.end-rb.begin) > newSize { + discard := int(rb.end-rb.begin) - newSize + offset = (offset + discard) % len(rb.data) + rb.begin = rb.end - int64(newSize) + } + n := copy(newData, rb.data[offset:]) + if n < newSize { + copy(newData[n:], rb.data[:offset]) + } + rb.data = newData + rb.index = 0 +} + +func (rb *RingBuf) Skip(length int64) { + rb.end += length + rb.index += int(length) + for rb.index >= len(rb.data) { + rb.index -= len(rb.data) + } + if int(rb.end-rb.begin) > len(rb.data) { + rb.begin = rb.end - int64(len(rb.data)) + } +} diff --git a/vendor/github.com/coocood/freecache/ringbuf_test.go b/vendor/github.com/coocood/freecache/ringbuf_test.go new file mode 100644 index 0000000000..0e7f7e81ee --- /dev/null +++ b/vendor/github.com/coocood/freecache/ringbuf_test.go @@ -0,0 +1,35 @@ +package freecache + +import ( + "testing" +) + +func TestRingBuf(t *testing.T) { + rb := NewRingBuf(16, 0) + rb.Write([]byte("fghibbbbccccddde")) + rb.Write([]byte("fghibbbbc")) + rb.Resize(16) + off := rb.Evacuate(9, 3) + t.Log(string(rb.Dump())) + if off != rb.End()-3 { + t.Log(string(rb.Dump()), rb.End()) + t.Fatalf("off got %v", off) + } + off = rb.Evacuate(15, 5) + t.Log(string(rb.Dump())) + if off != rb.End()-5 { + t.Fatalf("off got %v", off) + } + rb.Resize(64) + rb.Resize(32) + data := make([]byte, 5) + rb.ReadAt(data, off) + if string(data) != "efghi" { + t.Fatalf("read at should be efghi, got %v", string(data)) + } + + off = rb.Evacuate(0, 10) + if off != -1 { + t.Fatal("evacutate out of range offset should return error") + } +} diff --git a/vendor/github.com/coocood/freecache/segment.go b/vendor/github.com/coocood/freecache/segment.go new file mode 100644 index 0000000000..6ea89b69f7 --- /dev/null +++ b/vendor/github.com/coocood/freecache/segment.go @@ -0,0 +1,326 @@ +package freecache + +import ( + "errors" + "time" + "unsafe" +) + +const HASH_ENTRY_SIZE = 16 +const ENTRY_HDR_SIZE = 24 + +var ErrLargeKey = errors.New("The key is larger than 65535") +var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size") +var ErrNotFound = errors.New("Entry not found") + +// entry pointer struct points to an entry in ring buffer +type entryPtr struct { + offset int64 // entry offset in ring buffer + hash16 uint16 // entries are ordered by hash16 in a slot. + keyLen uint16 // used to compare a key + reserved uint32 +} + +// entry header struct in ring buffer, followed by key and value. +type entryHdr struct { + accessTime uint32 + expireAt uint32 + keyLen uint16 + hash16 uint16 + valLen uint32 + valCap uint32 + deleted bool + slotId uint8 + reserved uint16 +} + +// a segment contains 256 slots, a slot is an array of entry pointers ordered by hash16 value +// the entry can be looked up by hash value of the key. +type segment struct { + rb RingBuf // ring buffer that stores data + segId int + entryCount int64 + totalCount int64 // number of entries in ring buffer, including deleted entries. + totalTime int64 // used to calculate least recent used entry. + totalEvacuate int64 // used for debug + overwrites int64 // used for debug + vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. + slotLens [256]int32 // The actual length for every slot. + slotCap int32 // max number of entry pointers a slot can hold. + slotsData []entryPtr // shared by all 256 slots +} + +func newSegment(bufSize int, segId int) (seg segment) { + seg.rb = NewRingBuf(bufSize, 0) + seg.segId = segId + seg.vacuumLen = int64(bufSize) + seg.slotCap = 1 + seg.slotsData = make([]entryPtr, 256*seg.slotCap) + return +} + +func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { + if len(key) > 65535 { + return ErrLargeKey + } + maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE + if len(key)+len(value) > maxKeyValLen { + // Do not accept large entry. + return ErrLargeEntry + } + now := uint32(time.Now().Unix()) + expireAt := uint32(0) + if expireSeconds > 0 { + expireAt = now + uint32(expireSeconds) + } + + slotId := uint8(hashVal >> 8) + hash16 := uint16(hashVal >> 16) + + var hdrBuf [ENTRY_HDR_SIZE]byte + hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) + + slotOff := int32(slotId) * seg.slotCap + slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match := seg.lookup(slot, hash16, key) + if match { + matchedPtr := &slot[idx] + seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) + hdr.slotId = slotId + hdr.hash16 = hash16 + hdr.keyLen = uint16(len(key)) + hdr.accessTime = now + hdr.expireAt = expireAt + hdr.valLen = uint32(len(value)) + if hdr.valCap >= hdr.valLen { + //in place overwrite + seg.totalTime += int64(hdr.accessTime) - int64(now) + seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) + seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) + seg.overwrites++ + return + } + // increase capacity and limit entry len. + for hdr.valCap < hdr.valLen { + hdr.valCap *= 2 + } + if hdr.valCap > uint32(maxKeyValLen-len(key)) { + hdr.valCap = uint32(maxKeyValLen - len(key)) + } + } else { + hdr.slotId = slotId + hdr.hash16 = hash16 + hdr.keyLen = uint16(len(key)) + hdr.accessTime = now + hdr.expireAt = expireAt + hdr.valLen = uint32(len(value)) + hdr.valCap = uint32(len(value)) + if hdr.valCap == 0 { // avoid infinite loop when increasing capacity. + hdr.valCap = 1 + } + } + + entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap) + slotModified := seg.evacuate(entryLen, slotId, now) + if slotModified { + // the slot has been modified during evacuation, we need to looked up for the 'idx' again. + // otherwise there would be index out of bound error. + slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match = seg.lookup(slot, hash16, key) + } + newOff := seg.rb.End() + if match { + seg.updateEntryPtr(slotId, hash16, slot[idx].offset, newOff) + } else { + seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen) + } + seg.rb.Write(hdrBuf[:]) + seg.rb.Write(key) + seg.rb.Write(value) + seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) + seg.totalTime += int64(now) + seg.totalCount++ + seg.vacuumLen -= entryLen + return +} + +func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) { + var oldHdrBuf [ENTRY_HDR_SIZE]byte + consecutiveEvacuate := 0 + for seg.vacuumLen < entryLen { + oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size() + seg.rb.ReadAt(oldHdrBuf[:], oldOff) + oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0])) + oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) + if oldHdr.deleted { + consecutiveEvacuate = 0 + seg.totalTime -= int64(oldHdr.accessTime) + seg.totalCount-- + seg.vacuumLen += oldEntryLen + continue + } + expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now + leastRecentUsed := int64(oldHdr.accessTime)*seg.totalCount <= seg.totalTime + if expired || leastRecentUsed || consecutiveEvacuate > 5 { + seg.delEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff) + if oldHdr.slotId == slotId { + slotModified = true + } + consecutiveEvacuate = 0 + seg.totalTime -= int64(oldHdr.accessTime) + seg.totalCount-- + seg.vacuumLen += oldEntryLen + } else { + // evacuate an old entry that has been accessed recently for better cache hit rate. + newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) + seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) + consecutiveEvacuate++ + seg.totalEvacuate++ + } + } + return +} + +func (seg *segment) get(key []byte, hashVal uint64) (value []byte, err error) { + slotId := uint8(hashVal >> 8) + hash16 := uint16(hashVal >> 16) + slotOff := int32(slotId) * seg.slotCap + var slot = seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match := seg.lookup(slot, hash16, key) + if !match { + err = ErrNotFound + return + } + ptr := &slot[idx] + now := uint32(time.Now().Unix()) + + var hdrBuf [ENTRY_HDR_SIZE]byte + seg.rb.ReadAt(hdrBuf[:], ptr.offset) + hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) + + if hdr.expireAt != 0 && hdr.expireAt <= now { + seg.delEntryPtr(slotId, hash16, ptr.offset) + err = ErrNotFound + return + } + seg.totalTime += int64(now - hdr.accessTime) + hdr.accessTime = now + seg.rb.WriteAt(hdrBuf[:], ptr.offset) + value = make([]byte, hdr.valLen) + + seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) + return +} + +func (seg *segment) del(key []byte, hashVal uint64) (affected bool) { + slotId := uint8(hashVal >> 8) + hash16 := uint16(hashVal >> 16) + slotOff := int32(slotId) * seg.slotCap + slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match := seg.lookup(slot, hash16, key) + if !match { + return false + } + ptr := &slot[idx] + seg.delEntryPtr(slotId, hash16, ptr.offset) + return true +} + +func (seg *segment) expand() { + newSlotData := make([]entryPtr, seg.slotCap*2*256) + for i := 0; i < 256; i++ { + off := int32(i) * seg.slotCap + copy(newSlotData[off*2:], seg.slotsData[off:off+seg.slotLens[i]]) + } + seg.slotCap *= 2 + seg.slotsData = newSlotData +} + +func (seg *segment) updateEntryPtr(slotId uint8, hash16 uint16, oldOff, newOff int64) { + slotOff := int32(slotId) * seg.slotCap + slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match := seg.lookupByOff(slot, hash16, oldOff) + if !match { + return + } + ptr := &slot[idx] + ptr.offset = newOff +} + +func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, idx int, keyLen uint16) { + slotOff := int32(slotId) * seg.slotCap + if seg.slotLens[slotId] == seg.slotCap { + seg.expand() + slotOff *= 2 + } + seg.slotLens[slotId]++ + seg.entryCount++ + slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + copy(slot[idx+1:], slot[idx:]) + slot[idx].offset = offset + slot[idx].hash16 = hash16 + slot[idx].keyLen = keyLen +} + +func (seg *segment) delEntryPtr(slotId uint8, hash16 uint16, offset int64) { + slotOff := int32(slotId) * seg.slotCap + slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] + idx, match := seg.lookupByOff(slot, hash16, offset) + if !match { + return + } + var entryHdrBuf [ENTRY_HDR_SIZE]byte + seg.rb.ReadAt(entryHdrBuf[:], offset) + entryHdr := (*entryHdr)(unsafe.Pointer(&entryHdrBuf[0])) + entryHdr.deleted = true + seg.rb.WriteAt(entryHdrBuf[:], offset) + copy(slot[idx:], slot[idx+1:]) + seg.slotLens[slotId]-- + seg.entryCount-- +} + +func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) { + high := len(slot) + for idx < high { + mid := (idx + high) >> 1 + oldEntry := &slot[mid] + if oldEntry.hash16 < hash16 { + idx = mid + 1 + } else { + high = mid + } + } + return +} + +func (seg *segment) lookup(slot []entryPtr, hash16 uint16, key []byte) (idx int, match bool) { + idx = entryPtrIdx(slot, hash16) + for idx < len(slot) { + ptr := &slot[idx] + if ptr.hash16 != hash16 { + break + } + match = int(ptr.keyLen) == len(key) && seg.rb.EqualAt(key, ptr.offset+ENTRY_HDR_SIZE) + if match { + return + } + idx++ + } + return +} + +func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (idx int, match bool) { + idx = entryPtrIdx(slot, hash16) + for idx < len(slot) { + ptr := &slot[idx] + if ptr.hash16 != hash16 { + break + } + match = ptr.offset == offset + if match { + return + } + idx++ + } + return +} diff --git a/vendor/github.com/coocood/freecache/server/main.go b/vendor/github.com/coocood/freecache/server/main.go new file mode 100644 index 0000000000..43abb61ef5 --- /dev/null +++ b/vendor/github.com/coocood/freecache/server/main.go @@ -0,0 +1,318 @@ +//A basic freecache server supports redis protocol +package main + +import ( + "bufio" + "bytes" + "errors" + "github.com/coocood/freecache" + "io" + "log" + "net" + "runtime" + "strconv" + "time" + "runtime/debug" + _ "net/http/pprof" + "net/http" +) + +var ( + protocolErr = errors.New("protocol error") + CRLF = []byte("\r\n") + PING = []byte("ping") + DBSIZE = []byte("dbsize") + ERROR_UNSUPPORTED = []byte("-ERR unsupported command\r\n") + OK = []byte("+OK\r\n") + PONG = []byte("+PONG\r\n") + GET = []byte("get") + SET = []byte("set") + SETEX = []byte("setex") + DEL = []byte("del") + NIL = []byte("$-1\r\n") + CZERO = []byte(":0\r\n") + CONE = []byte(":1\r\n") + BulkSign = []byte("$") +) + +type Request struct { + args [][]byte + buf *bytes.Buffer +} + +func (req *Request) Reset() { + req.args = req.args[:0] + req.buf.Reset() +} + +type operation struct { + req Request + replyChan chan *bytes.Buffer +} + +type Session struct { + server *Server + conn net.Conn + addr string + reader *bufio.Reader + replyChan chan *bytes.Buffer +} + +type Server struct { + cache *freecache.Cache +} + +func NewServer(cacheSize int) (server *Server) { + server = new(Server) + server.cache = freecache.NewCache(cacheSize) + return +} + +func (server *Server) Start(addr string) error { + l, err := net.Listen("tcp", addr) + if err != nil { + log.Println(err) + return err + } + defer l.Close() + log.Println("Listening on port", addr) + for { + tcpListener := l.(*net.TCPListener) + tcpListener.SetDeadline(time.Now().Add(time.Second)) + conn, err := l.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + continue + } + return err + } + + session := new(Session) + session.conn = conn + session.replyChan = make(chan *bytes.Buffer, 100) + session.addr = conn.RemoteAddr().String() + session.server = server + session.reader = bufio.NewReader(conn) + go session.readLoop() + go session.writeLoop() + } +} + +func copyN(buffer *bytes.Buffer, r *bufio.Reader, n int64) (err error) { + if n <= 512 { + var buf [512]byte + _, err = r.Read(buf[:n]) + if err != nil { + return + } + buffer.Write(buf[:n]) + } else { + _, err = io.CopyN(buffer, r, n) + } + return +} + +func (server *Server) ReadClient(r *bufio.Reader, req *Request) (err error) { + line, err := readLine(r) + if err != nil { + return + } + if len(line) == 0 || line[0] != '*' { + err = protocolErr + return + } + argc, err := btoi(line[1:]) + if err != nil { + return + } + if argc <= 0 || argc > 4 { + err = protocolErr + return + } + var argStarts [4]int + var argEnds [4]int + req.buf.Write(line) + req.buf.Write(CRLF) + cursor := len(line) + 2 + for i := 0; i < argc; i++ { + line, err = readLine(r) + if err != nil { + return + } + if len(line) == 0 || line[0] != '$' { + err = protocolErr + return + } + var argLen int + argLen, err = btoi(line[1:]) + if err != nil { + return + } + if argLen < 0 || argLen > 512*1024*1024 { + err = protocolErr + return + } + req.buf.Write(line) + req.buf.Write(CRLF) + cursor += len(line) + 2 + err = copyN(req.buf, r, int64(argLen)+2) + if err != nil { + return + } + argStarts[i] = cursor + argEnds[i] = cursor + argLen + cursor += argLen + 2 + } + data := req.buf.Bytes() + for i := 0; i < argc; i++ { + req.args = append(req.args, data[argStarts[i]:argEnds[i]]) + } + lower(req.args[0]) + return +} + +func (down *Session) readLoop() { + var req = new(Request) + req.buf = new(bytes.Buffer) + for { + req.Reset() + err := down.server.ReadClient(down.reader, req) + if err != nil { + close(down.replyChan) + return + } + reply := new(bytes.Buffer) + if len(req.args) == 4 && bytes.Equal(req.args[0], SETEX) { + expire, err := btoi(req.args[2]) + if err != nil { + reply.Write(ERROR_UNSUPPORTED) + } else { + down.server.cache.Set(req.args[1], req.args[3], expire) + reply.Write(OK) + } + } else if len(req.args) == 3 && bytes.Equal(req.args[0], SET) { + down.server.cache.Set(req.args[1], req.args[2], 0) + reply.Write(OK) + } else if len(req.args) == 2 { + if bytes.Equal(req.args[0], GET) { + value, err := down.server.cache.Get(req.args[1]) + if err != nil { + reply.Write(NIL) + } else { + bukLen := strconv.Itoa(len(value)) + reply.Write(BulkSign) + reply.WriteString(bukLen) + reply.Write(CRLF) + reply.Write(value) + reply.Write(CRLF) + } + } else if bytes.Equal(req.args[0], DEL) { + if down.server.cache.Del(req.args[1]) { + reply.Write(CONE) + } else { + reply.Write(CZERO) + } + } + } else if len(req.args) == 1 { + if bytes.Equal(req.args[0], PING) { + reply.Write(PONG) + } else if bytes.Equal(req.args[0], DBSIZE) { + entryCount := down.server.cache.EntryCount() + reply.WriteString(":") + reply.WriteString(strconv.Itoa(int(entryCount))) + reply.Write(CRLF) + } else { + reply.Write(ERROR_UNSUPPORTED) + } + } + down.replyChan <- reply + } +} + +func (down *Session) writeLoop() { + var buffer = bytes.NewBuffer(nil) + var replies = make([]*bytes.Buffer, 1) + for { + buffer.Reset() + select { + case reply, ok := <-down.replyChan: + if !ok { + down.conn.Close() + return + } + replies = replies[:1] + replies[0] = reply + queueLen := len(down.replyChan) + for i := 0; i < queueLen; i++ { + reply = <-down.replyChan + replies = append(replies, reply) + } + for _, reply := range replies { + if reply == nil { + buffer.Write(NIL) + continue + } + buffer.Write(reply.Bytes()) + } + _, err := down.conn.Write(buffer.Bytes()) + if err != nil { + down.conn.Close() + return + } + } + } +} + +func readLine(r *bufio.Reader) ([]byte, error) { + p, err := r.ReadSlice('\n') + if err != nil { + return nil, err + } + i := len(p) - 2 + if i < 0 || p[i] != '\r' { + return nil, protocolErr + } + return p[:i], nil +} + +func btoi(data []byte) (int, error) { + if len(data) == 0 { + return 0, nil + } + i := 0 + sign := 1 + if data[0] == '-' { + i++ + sign *= -1 + } + if i >= len(data) { + return 0, protocolErr + } + var l int + for ; i < len(data); i++ { + c := data[i] + if c < '0' || c > '9' { + return 0, protocolErr + } + l = l*10 + int(c-'0') + } + return sign * l, nil +} + +func lower(data []byte) { + for i := 0; i < len(data); i++ { + if data[i] >= 'A' && data[i] <= 'Z' { + data[i] += 'a' - 'A' + } + } +} + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()-1) + server := NewServer(256 * 1024 * 1024) + debug.SetGCPercent(10) + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + server.Start(":7788") +} diff --git a/vendor/github.com/spaolacci/murmur3/LICENSE b/vendor/github.com/spaolacci/murmur3/LICENSE new file mode 100644 index 0000000000..2a46fd7500 --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/LICENSE @@ -0,0 +1,24 @@ +Copyright 2013, Sébastien Paolacci. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the library nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/spaolacci/murmur3/README.md b/vendor/github.com/spaolacci/murmur3/README.md new file mode 100644 index 0000000000..1edf62300d --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/README.md @@ -0,0 +1,84 @@ +murmur3 +======= + +Native Go implementation of Austin Appleby's third MurmurHash revision (aka +MurmurHash3). + +Reference algorithm has been slightly hacked as to support the streaming mode +required by Go's standard [Hash interface](http://golang.org/pkg/hash/#Hash). + + +Benchmarks +---------- + +Go tip as of 2014-06-12 (i.e almost go1.3), core i7 @ 3.4 Ghz. All runs +include hasher instantiation and sequence finalization. + +
+
+Benchmark32_1        500000000     7.69 ns/op      130.00 MB/s
+Benchmark32_2        200000000     8.83 ns/op      226.42 MB/s
+Benchmark32_4        500000000     7.99 ns/op      500.39 MB/s
+Benchmark32_8        200000000     9.47 ns/op      844.69 MB/s
+Benchmark32_16       100000000     12.1 ns/op     1321.61 MB/s
+Benchmark32_32       100000000     18.3 ns/op     1743.93 MB/s
+Benchmark32_64        50000000     30.9 ns/op     2071.64 MB/s
+Benchmark32_128       50000000     57.6 ns/op     2222.96 MB/s
+Benchmark32_256       20000000      116 ns/op     2188.60 MB/s
+Benchmark32_512       10000000      226 ns/op     2260.59 MB/s
+Benchmark32_1024       5000000      452 ns/op     2263.73 MB/s
+Benchmark32_2048       2000000      891 ns/op     2296.02 MB/s
+Benchmark32_4096       1000000     1787 ns/op     2290.92 MB/s
+Benchmark32_8192        500000     3593 ns/op     2279.68 MB/s
+Benchmark128_1       100000000     26.1 ns/op       38.33 MB/s
+Benchmark128_2       100000000     29.0 ns/op       69.07 MB/s
+Benchmark128_4        50000000     29.8 ns/op      134.17 MB/s
+Benchmark128_8        50000000     31.6 ns/op      252.86 MB/s
+Benchmark128_16      100000000     26.5 ns/op      603.42 MB/s
+Benchmark128_32      100000000     28.6 ns/op     1117.15 MB/s
+Benchmark128_64       50000000     35.5 ns/op     1800.97 MB/s
+Benchmark128_128      50000000     50.9 ns/op     2515.50 MB/s
+Benchmark128_256      20000000     76.9 ns/op     3330.11 MB/s
+Benchmark128_512      20000000      135 ns/op     3769.09 MB/s
+Benchmark128_1024     10000000      250 ns/op     4094.38 MB/s
+Benchmark128_2048      5000000      477 ns/op     4290.75 MB/s
+Benchmark128_4096      2000000      940 ns/op     4353.29 MB/s
+Benchmark128_8192      1000000     1838 ns/op     4455.47 MB/s
+
+
+ + +
+
+benchmark              Go1.0 MB/s    Go1.1 MB/s  speedup    Go1.2 MB/s  speedup    Go1.3 MB/s  speedup
+Benchmark32_1               98.90        118.59    1.20x        114.79    0.97x        130.00    1.13x
+Benchmark32_2              168.04        213.31    1.27x        210.65    0.99x        226.42    1.07x
+Benchmark32_4              414.01        494.19    1.19x        490.29    0.99x        500.39    1.02x
+Benchmark32_8              662.19        836.09    1.26x        836.46    1.00x        844.69    1.01x
+Benchmark32_16             917.46       1304.62    1.42x       1297.63    0.99x       1321.61    1.02x
+Benchmark32_32            1141.93       1737.54    1.52x       1728.24    0.99x       1743.93    1.01x
+Benchmark32_64            1289.47       2039.51    1.58x       2038.20    1.00x       2071.64    1.02x
+Benchmark32_128           1299.23       2097.63    1.61x       2177.13    1.04x       2222.96    1.02x
+Benchmark32_256           1369.90       2202.34    1.61x       2213.15    1.00x       2188.60    0.99x
+Benchmark32_512           1399.56       2255.72    1.61x       2264.49    1.00x       2260.59    1.00x
+Benchmark32_1024          1410.90       2285.82    1.62x       2270.99    0.99x       2263.73    1.00x
+Benchmark32_2048          1422.14       2297.62    1.62x       2269.59    0.99x       2296.02    1.01x
+Benchmark32_4096          1420.53       2307.81    1.62x       2273.43    0.99x       2290.92    1.01x
+Benchmark32_8192          1424.79       2312.87    1.62x       2286.07    0.99x       2279.68    1.00x
+Benchmark128_1               8.32         30.15    3.62x         30.84    1.02x         38.33    1.24x
+Benchmark128_2              16.38         59.72    3.65x         59.37    0.99x         69.07    1.16x
+Benchmark128_4              32.26        112.96    3.50x        114.24    1.01x        134.17    1.17x
+Benchmark128_8              62.68        217.88    3.48x        218.18    1.00x        252.86    1.16x
+Benchmark128_16            128.47        451.57    3.51x        474.65    1.05x        603.42    1.27x
+Benchmark128_32            246.18        910.42    3.70x        871.06    0.96x       1117.15    1.28x
+Benchmark128_64            449.05       1477.64    3.29x       1449.24    0.98x       1800.97    1.24x
+Benchmark128_128           762.61       2222.42    2.91x       2217.30    1.00x       2515.50    1.13x
+Benchmark128_256          1179.92       3005.46    2.55x       2931.55    0.98x       3330.11    1.14x
+Benchmark128_512          1616.51       3590.75    2.22x       3592.08    1.00x       3769.09    1.05x
+Benchmark128_1024         1964.36       3979.67    2.03x       4034.01    1.01x       4094.38    1.01x
+Benchmark128_2048         2225.07       4156.93    1.87x       4244.17    1.02x       4290.75    1.01x
+Benchmark128_4096         2360.15       4299.09    1.82x       4392.35    1.02x       4353.29    0.99x
+Benchmark128_8192         2411.50       4356.84    1.81x       4480.68    1.03x       4455.47    0.99x
+
+
+ diff --git a/vendor/github.com/spaolacci/murmur3/murmur.go b/vendor/github.com/spaolacci/murmur3/murmur.go new file mode 100644 index 0000000000..f99557cc3e --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/murmur.go @@ -0,0 +1,65 @@ +// Copyright 2013, Sébastien Paolacci. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* +Native (and fast) implementation of Austin Appleby's MurmurHash3. + +Package murmur3 implements Austin Appleby's non-cryptographic MurmurHash3. + + Reference implementation: + http://code.google.com/p/smhasher/wiki/MurmurHash3 + + History, characteristics and (legacy) perfs: + https://sites.google.com/site/murmurhash/ + https://sites.google.com/site/murmurhash/statistics +*/ +package murmur3 + +type bmixer interface { + bmix(p []byte) (tail []byte) + Size() (n int) + reset() +} + +type digest struct { + clen int // Digested input cumulative length. + tail []byte // 0 to Size()-1 bytes view of `buf'. + buf [16]byte // Expected (but not required) to be Size() large. + bmixer +} + +func (d *digest) BlockSize() int { return 1 } + +func (d *digest) Write(p []byte) (n int, err error) { + n = len(p) + d.clen += n + + if len(d.tail) > 0 { + // Stick back pending bytes. + nfree := d.Size() - len(d.tail) // nfree ∈ [1, d.Size()-1]. + if nfree < len(p) { + // One full block can be formed. + block := append(d.tail, p[:nfree]...) + p = p[nfree:] + _ = d.bmix(block) // No tail. + } else { + // Tail's buf is large enough to prevent reallocs. + p = append(d.tail, p...) + } + } + + d.tail = d.bmix(p) + + // Keep own copy of the 0 to Size()-1 pending bytes. + nn := copy(d.buf[:], d.tail) + d.tail = d.buf[:nn] + + return n, nil +} + +func (d *digest) Reset() { + d.clen = 0 + d.tail = nil + d.bmixer.reset() +} diff --git a/vendor/github.com/spaolacci/murmur3/murmur128.go b/vendor/github.com/spaolacci/murmur3/murmur128.go new file mode 100644 index 0000000000..d54d136541 --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/murmur128.go @@ -0,0 +1,189 @@ +package murmur3 + +import ( + //"encoding/binary" + "hash" + "unsafe" +) + +const ( + c1_128 = 0x87c37b91114253d5 + c2_128 = 0x4cf5ad432745937f +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash = new(digest128) + _ Hash128 = new(digest128) + _ bmixer = new(digest128) +) + +// Hack: the standard api doesn't define any Hash128 interface. +type Hash128 interface { + hash.Hash + Sum128() (uint64, uint64) +} + +// digest128 represents a partial evaluation of a 128 bites hash. +type digest128 struct { + digest + h1 uint64 // Unfinalized running hash part 1. + h2 uint64 // Unfinalized running hash part 2. +} + +func New128() Hash128 { + d := new(digest128) + d.bmixer = d + d.Reset() + return d +} + +func (d *digest128) Size() int { return 16 } + +func (d *digest128) reset() { d.h1, d.h2 = 0, 0 } + +func (d *digest128) Sum(b []byte) []byte { + h1, h2 := d.Sum128() + return append(b, + byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), + byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1), + + byte(h2>>56), byte(h2>>48), byte(h2>>40), byte(h2>>32), + byte(h2>>24), byte(h2>>16), byte(h2>>8), byte(h2), + ) +} + +func (d *digest128) bmix(p []byte) (tail []byte) { + h1, h2 := d.h1, d.h2 + + nblocks := len(p) / 16 + for i := 0; i < nblocks; i++ { + t := (*[2]uint64)(unsafe.Pointer(&p[i*16])) + k1, k2 := t[0], t[1] + + k1 *= c1_128 + k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) + k1 *= c2_128 + h1 ^= k1 + + h1 = (h1 << 27) | (h1 >> 37) // rotl64(h1, 27) + h1 += h2 + h1 = h1*5 + 0x52dce729 + + k2 *= c2_128 + k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) + k2 *= c1_128 + h2 ^= k2 + + h2 = (h2 << 31) | (h2 >> 33) // rotl64(h2, 31) + h2 += h1 + h2 = h2*5 + 0x38495ab5 + } + d.h1, d.h2 = h1, h2 + return p[nblocks*d.Size():] +} + +func (d *digest128) Sum128() (h1, h2 uint64) { + + h1, h2 = d.h1, d.h2 + + var k1, k2 uint64 + switch len(d.tail) & 15 { + case 15: + k2 ^= uint64(d.tail[14]) << 48 + fallthrough + case 14: + k2 ^= uint64(d.tail[13]) << 40 + fallthrough + case 13: + k2 ^= uint64(d.tail[12]) << 32 + fallthrough + case 12: + k2 ^= uint64(d.tail[11]) << 24 + fallthrough + case 11: + k2 ^= uint64(d.tail[10]) << 16 + fallthrough + case 10: + k2 ^= uint64(d.tail[9]) << 8 + fallthrough + case 9: + k2 ^= uint64(d.tail[8]) << 0 + + k2 *= c2_128 + k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) + k2 *= c1_128 + h2 ^= k2 + + fallthrough + + case 8: + k1 ^= uint64(d.tail[7]) << 56 + fallthrough + case 7: + k1 ^= uint64(d.tail[6]) << 48 + fallthrough + case 6: + k1 ^= uint64(d.tail[5]) << 40 + fallthrough + case 5: + k1 ^= uint64(d.tail[4]) << 32 + fallthrough + case 4: + k1 ^= uint64(d.tail[3]) << 24 + fallthrough + case 3: + k1 ^= uint64(d.tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint64(d.tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint64(d.tail[0]) << 0 + k1 *= c1_128 + k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) + k1 *= c2_128 + h1 ^= k1 + } + + h1 ^= uint64(d.clen) + h2 ^= uint64(d.clen) + + h1 += h2 + h2 += h1 + + h1 = fmix64(h1) + h2 = fmix64(h2) + + h1 += h2 + h2 += h1 + + return h1, h2 +} + +func fmix64(k uint64) uint64 { + k ^= k >> 33 + k *= 0xff51afd7ed558ccd + k ^= k >> 33 + k *= 0xc4ceb9fe1a85ec53 + k ^= k >> 33 + return k +} + +/* +func rotl64(x uint64, r byte) uint64 { + return (x << r) | (x >> (64 - r)) +} +*/ + +// Sum128 returns the MurmurHash3 sum of data. It is equivalent to the +// following sequence (without the extra burden and the extra allocation): +// hasher := New128() +// hasher.Write(data) +// return hasher.Sum128() +func Sum128(data []byte) (h1 uint64, h2 uint64) { + d := &digest128{h1: 0, h2: 0} + d.tail = d.bmix(data) + d.clen = len(data) + return d.Sum128() +} diff --git a/vendor/github.com/spaolacci/murmur3/murmur32.go b/vendor/github.com/spaolacci/murmur3/murmur32.go new file mode 100644 index 0000000000..8733f7cd76 --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/murmur32.go @@ -0,0 +1,154 @@ +package murmur3 + +// http://code.google.com/p/guava-libraries/source/browse/guava/src/com/google/common/hash/Murmur3_32HashFunction.java + +import ( + "hash" + "unsafe" +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash = new(digest32) + _ hash.Hash32 = new(digest32) +) + +const ( + c1_32 uint32 = 0xcc9e2d51 + c2_32 uint32 = 0x1b873593 +) + +// digest32 represents a partial evaluation of a 32 bites hash. +type digest32 struct { + digest + h1 uint32 // Unfinalized running hash. +} + +func New32() hash.Hash32 { + d := new(digest32) + d.bmixer = d + d.Reset() + return d +} + +func (d *digest32) Size() int { return 4 } + +func (d *digest32) reset() { d.h1 = 0 } + +func (d *digest32) Sum(b []byte) []byte { + h := d.Sum32() + return append(b, byte(h>>24), byte(h>>16), byte(h>>8), byte(h)) +} + +// Digest as many blocks as possible. +func (d *digest32) bmix(p []byte) (tail []byte) { + h1 := d.h1 + + nblocks := len(p) / 4 + for i := 0; i < nblocks; i++ { + k1 := *(*uint32)(unsafe.Pointer(&p[i*4])) + + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + + h1 ^= k1 + h1 = (h1 << 13) | (h1 >> 19) // rotl32(h1, 13) + h1 = h1*5 + 0xe6546b64 + } + d.h1 = h1 + return p[nblocks*d.Size():] +} + +func (d *digest32) Sum32() (h1 uint32) { + + h1 = d.h1 + + var k1 uint32 + switch len(d.tail) & 3 { + case 3: + k1 ^= uint32(d.tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint32(d.tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint32(d.tail[0]) + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + h1 ^= k1 + } + + h1 ^= uint32(d.clen) + + h1 ^= h1 >> 16 + h1 *= 0x85ebca6b + h1 ^= h1 >> 13 + h1 *= 0xc2b2ae35 + h1 ^= h1 >> 16 + + return h1 +} + +/* +func rotl32(x uint32, r byte) uint32 { + return (x << r) | (x >> (32 - r)) +} +*/ + +// Sum32 returns the MurmurHash3 sum of data. It is equivalent to the +// following sequence (without the extra burden and the extra allocation): +// hasher := New32() +// hasher.Write(data) +// return hasher.Sum32() +func Sum32(data []byte) uint32 { + + var h1 uint32 = 0 + + nblocks := len(data) / 4 + var p uintptr + if len(data) > 0 { + p = uintptr(unsafe.Pointer(&data[0])) + } + p1 := p + uintptr(4*nblocks) + for ; p < p1; p += 4 { + k1 := *(*uint32)(unsafe.Pointer(p)) + + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + + h1 ^= k1 + h1 = (h1 << 13) | (h1 >> 19) // rotl32(h1, 13) + h1 = h1*5 + 0xe6546b64 + } + + tail := data[nblocks*4:] + + var k1 uint32 + switch len(tail) & 3 { + case 3: + k1 ^= uint32(tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint32(tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint32(tail[0]) + k1 *= c1_32 + k1 = (k1 << 15) | (k1 >> 17) // rotl32(k1, 15) + k1 *= c2_32 + h1 ^= k1 + } + + h1 ^= uint32(len(data)) + + h1 ^= h1 >> 16 + h1 *= 0x85ebca6b + h1 ^= h1 >> 13 + h1 *= 0xc2b2ae35 + h1 ^= h1 >> 16 + + return h1 +} diff --git a/vendor/github.com/spaolacci/murmur3/murmur64.go b/vendor/github.com/spaolacci/murmur3/murmur64.go new file mode 100644 index 0000000000..6e43065fb8 --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/murmur64.go @@ -0,0 +1,45 @@ +package murmur3 + +import ( + "hash" +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash = new(digest64) + _ hash.Hash64 = new(digest64) + _ bmixer = new(digest64) +) + +// digest64 is half a digest128. +type digest64 digest128 + +func New64() hash.Hash64 { + d := (*digest64)(New128().(*digest128)) + return d +} + +func (d *digest64) Sum(b []byte) []byte { + h1 := d.Sum64() + return append(b, + byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), + byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1)) +} + +func (d *digest64) Sum64() uint64 { + h1, _ := (*digest128)(d).Sum128() + return h1 +} + +// Sum64 returns the MurmurHash3 sum of data. It is equivalent to the +// following sequence (without the extra burden and the extra allocation): +// hasher := New64() +// hasher.Write(data) +// return hasher.Sum64() +func Sum64(data []byte) uint64 { + d := &digest128{h1: 0, h2: 0} + d.tail = d.bmix(data) + d.clen = len(data) + h1, _ := d.Sum128() + return h1 +} diff --git a/vendor/github.com/spaolacci/murmur3/murmur_test.go b/vendor/github.com/spaolacci/murmur3/murmur_test.go new file mode 100644 index 0000000000..bf1652acd3 --- /dev/null +++ b/vendor/github.com/spaolacci/murmur3/murmur_test.go @@ -0,0 +1,251 @@ +package murmur3 + +import ( + "fmt" + "hash" + "testing" +) + +var data = []struct { + h32 uint32 + h64_1 uint64 + h64_2 uint64 + s string +}{ + {0x00000000, 0x0000000000000000, 0x0000000000000000, ""}, + {0x248bfa47, 0xcbd8a7b341bd9b02, 0x5b1e906a48ae1d19, "hello"}, + {0x149bbb7f, 0x342fac623a5ebc8e, 0x4cdcbc079642414d, "hello, world"}, + {0xe31e8a70, 0xb89e5988b737affc, 0x664fc2950231b2cb, "19 Jan 2038 at 3:14:07 AM"}, + {0xd5c48bfc, 0xcd99481f9ee902c9, 0x695da1a38987b6e7, "The quick brown fox jumps over the lazy dog."}, +} + +func TestRef(t *testing.T) { + for _, elem := range data { + + var h32 hash.Hash32 = New32() + h32.Write([]byte(elem.s)) + if v := h32.Sum32(); v != elem.h32 { + t.Errorf("'%s': 0x%x (want 0x%x)", elem.s, v, elem.h32) + } + + var h32_byte hash.Hash32 = New32() + h32_byte.Write([]byte(elem.s)) + target := fmt.Sprintf("%08x", elem.h32) + if p := fmt.Sprintf("%x", h32_byte.Sum(nil)); p != target { + t.Errorf("'%s': %s (want %s)", elem.s, p, target) + } + + if v := Sum32([]byte(elem.s)); v != elem.h32 { + t.Errorf("'%s': 0x%x (want 0x%x)", elem.s, v, elem.h32) + } + + var h64 hash.Hash64 = New64() + h64.Write([]byte(elem.s)) + if v := h64.Sum64(); v != elem.h64_1 { + t.Errorf("'%s': 0x%x (want 0x%x)", elem.s, v, elem.h64_1) + } + + var h64_byte hash.Hash64 = New64() + h64_byte.Write([]byte(elem.s)) + target = fmt.Sprintf("%016x", elem.h64_1) + if p := fmt.Sprintf("%x", h64_byte.Sum(nil)); p != target { + t.Errorf("'%s': %s (want %s)", elem.s, p, target) + } + + if v := Sum64([]byte(elem.s)); v != elem.h64_1 { + t.Errorf("'%s': 0x%x (want 0x%x)", elem.s, v, elem.h64_1) + } + + var h128 Hash128 = New128() + h128.Write([]byte(elem.s)) + if v1, v2 := h128.Sum128(); v1 != elem.h64_1 || v2 != elem.h64_2 { + t.Errorf("'%s': 0x%x-0x%x (want 0x%x-0x%x)", elem.s, v1, v2, elem.h64_1, elem.h64_2) + } + + var h128_byte Hash128 = New128() + h128_byte.Write([]byte(elem.s)) + target = fmt.Sprintf("%016x%016x", elem.h64_1, elem.h64_2) + if p := fmt.Sprintf("%x", h128_byte.Sum(nil)); p != target { + t.Errorf("'%s': %s (want %s)", elem.s, p, target) + } + + if v1, v2 := Sum128([]byte(elem.s)); v1 != elem.h64_1 || v2 != elem.h64_2 { + t.Errorf("'%s': 0x%x-0x%x (want 0x%x-0x%x)", elem.s, v1, v2, elem.h64_1, elem.h64_2) + } + } +} + +func TestIncremental(t *testing.T) { + for _, elem := range data { + h32 := New32() + h128 := New128() + for i, j, k := 0, 0, len(elem.s); i < k; i = j { + j = 2*i + 3 + if j > k { + j = k + } + s := elem.s[i:j] + print(s + "|") + h32.Write([]byte(s)) + h128.Write([]byte(s)) + } + println() + if v := h32.Sum32(); v != elem.h32 { + t.Errorf("'%s': 0x%x (want 0x%x)", elem.s, v, elem.h32) + } + if v1, v2 := h128.Sum128(); v1 != elem.h64_1 || v2 != elem.h64_2 { + t.Errorf("'%s': 0x%x-0x%x (want 0x%x-0x%x)", elem.s, v1, v2, elem.h64_1, elem.h64_2) + } + } +} + +//--- + +func bench32(b *testing.B, length int) { + buf := make([]byte, length) + b.SetBytes(int64(length)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + Sum32(buf) + } +} + +func Benchmark32_1(b *testing.B) { + bench32(b, 1) +} +func Benchmark32_2(b *testing.B) { + bench32(b, 2) +} +func Benchmark32_4(b *testing.B) { + bench32(b, 4) +} +func Benchmark32_8(b *testing.B) { + bench32(b, 8) +} +func Benchmark32_16(b *testing.B) { + bench32(b, 16) +} +func Benchmark32_32(b *testing.B) { + bench32(b, 32) +} +func Benchmark32_64(b *testing.B) { + bench32(b, 64) +} +func Benchmark32_128(b *testing.B) { + bench32(b, 128) +} +func Benchmark32_256(b *testing.B) { + bench32(b, 256) +} +func Benchmark32_512(b *testing.B) { + bench32(b, 512) +} +func Benchmark32_1024(b *testing.B) { + bench32(b, 1024) +} +func Benchmark32_2048(b *testing.B) { + bench32(b, 2048) +} +func Benchmark32_4096(b *testing.B) { + bench32(b, 4096) +} +func Benchmark32_8192(b *testing.B) { + bench32(b, 8192) +} + +//--- + +func benchPartial32(b *testing.B, length int) { + buf := make([]byte, length) + b.SetBytes(int64(length)) + + start := (32 / 8) / 2 + chunks := 7 + k := length / chunks + tail := (length - start) % k + + b.ResetTimer() + for i := 0; i < b.N; i++ { + hasher := New32() + hasher.Write(buf[0:start]) + + for j := start; j+k <= length; j += k { + hasher.Write(buf[j : j+k]) + } + + hasher.Write(buf[length-tail:]) + hasher.Sum32() + } +} + +func BenchmarkPartial32_8(b *testing.B) { + benchPartial32(b, 8) +} +func BenchmarkPartial32_16(b *testing.B) { + benchPartial32(b, 16) +} +func BenchmarkPartial32_32(b *testing.B) { + benchPartial32(b, 32) +} +func BenchmarkPartial32_64(b *testing.B) { + benchPartial32(b, 64) +} +func BenchmarkPartial32_128(b *testing.B) { + benchPartial32(b, 128) +} + +//--- + +func bench128(b *testing.B, length int) { + buf := make([]byte, length) + b.SetBytes(int64(length)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + Sum128(buf) + } +} + +func Benchmark128_1(b *testing.B) { + bench128(b, 1) +} +func Benchmark128_2(b *testing.B) { + bench128(b, 2) +} +func Benchmark128_4(b *testing.B) { + bench128(b, 4) +} +func Benchmark128_8(b *testing.B) { + bench128(b, 8) +} +func Benchmark128_16(b *testing.B) { + bench128(b, 16) +} +func Benchmark128_32(b *testing.B) { + bench128(b, 32) +} +func Benchmark128_64(b *testing.B) { + bench128(b, 64) +} +func Benchmark128_128(b *testing.B) { + bench128(b, 128) +} +func Benchmark128_256(b *testing.B) { + bench128(b, 256) +} +func Benchmark128_512(b *testing.B) { + bench128(b, 512) +} +func Benchmark128_1024(b *testing.B) { + bench128(b, 1024) +} +func Benchmark128_2048(b *testing.B) { + bench128(b, 2048) +} +func Benchmark128_4096(b *testing.B) { + bench128(b, 4096) +} +func Benchmark128_8192(b *testing.B) { + bench128(b, 8192) +} + +//--- diff --git a/vendor/manifest b/vendor/manifest index 21351d08b8..0f07864761 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -65,6 +65,12 @@ "revision": "84c0a38a18fcd2d657f3eabc958433c1f37ecdc0", "branch": "master" }, + { + "importpath": "github.com/coocood/freecache", + "repository": "https://github.com/coocood/freecache", + "revision": "a27035d5537f1fa5518225e9373c9ec7450f2ea2", + "branch": "master" + }, { "importpath": "github.com/coreos/etcd/Godeps/_workspace/src/bitbucket.org/ww/goautoneg", "repository": "https://github.com/coreos/etcd", @@ -697,6 +703,12 @@ "revision": "244f5ac324cb97e1987ef901a0081a77bfd8e845", "branch": "master" }, + { + "importpath": "github.com/spaolacci/murmur3", + "repository": "https://github.com/spaolacci/murmur3", + "revision": "0d12bf811670bf6a1a63828dfbd003eded177fce", + "branch": "master" + }, { "importpath": "github.com/spf13/cobra", "repository": "https://github.com/spf13/cobra", @@ -925,4 +937,4 @@ "branch": "master" } ] -} +} \ No newline at end of file From 9cff5699cb6aa6a8058ade47df26664092a510e3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Dec 2015 17:44:47 +0000 Subject: [PATCH 3/5] Don't decode the certificates everytime we create an AppClient. --- xfer/probe_config.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/xfer/probe_config.go b/xfer/probe_config.go index 071e832c25..58c1093d4f 100644 --- a/xfer/probe_config.go +++ b/xfer/probe_config.go @@ -2,6 +2,7 @@ package xfer import ( "crypto/tls" + "crypto/x509" "fmt" "io" "net" @@ -14,6 +15,16 @@ import ( // ID is currently set to the a random string on probe startup. const ScopeProbeIDHeader = "X-Scope-Probe-ID" +var certPool *x509.CertPool + +func init() { + var err error + certPool, err = gocertifi.CACerts() + if err != nil { + panic(err) + } +} + // ProbeConfig contains all the info needed for a probe to do HTTP requests type ProbeConfig struct { Token string @@ -46,10 +57,6 @@ func (pc ProbeConfig) getHTTPTransport(hostname string) (*http.Transport, error) return nil, err } - certPool, err := gocertifi.CACerts() - if err != nil { - return nil, err - } return &http.Transport{ TLSClientConfig: &tls.Config{ RootCAs: certPool, From cc90c6c95d323c05b681457b78da0361ade0bb77 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Dec 2015 17:45:33 +0000 Subject: [PATCH 4/5] Don't read tcp6 file (we don't deal with ipv6 anywhere else). --- probe/endpoint/procspy/proc.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index 6823a3ede8..84c5a00865 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -37,15 +37,12 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err // Read network namespace, and if we haven't seen it before, // read /proc//net/tcp - err := fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT) - if err != nil { + if err := fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT); err != nil { return } - if _, ok := namespaces[statT.Ino]; !ok { namespaces[statT.Ino] = struct{}{} readFile(filepath.Join(procRoot, dirName, "/net/tcp"), buf) - readFile(filepath.Join(procRoot, dirName, "/net/tcp6"), buf) } fds, err := fs.ReadDirNames(fdBase) From a2862baf33a44ce4f46d96351c4458f7acc4f90d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 10 Dec 2015 17:44:53 +0000 Subject: [PATCH 5/5] Don't list fds if there are not connections in the net namespace. --- .../procspy/benchmark_internal_test.go | 7 +++- probe/endpoint/procspy/proc.go | 37 +++++++++++-------- probe/endpoint/procspy/proc_internal_test.go | 8 +++- probe/endpoint/procspy/spy_linux.go | 6 +-- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/probe/endpoint/procspy/benchmark_internal_test.go b/probe/endpoint/procspy/benchmark_internal_test.go index ec1dea59c2..1e48aab06e 100644 --- a/probe/endpoint/procspy/benchmark_internal_test.go +++ b/probe/endpoint/procspy/benchmark_internal_test.go @@ -6,13 +6,16 @@ import ( ) func BenchmarkParseConnectionsBaseline(b *testing.B) { - readFile = func(string, *bytes.Buffer) error { return nil } + readFile = func(string, *bytes.Buffer) (int64, error) { return 0, nil } benchmarkConnections(b) // 333 ns/op, 0 allocs/op } func BenchmarkParseConnectionsFixture(b *testing.B) { - readFile = func(_ string, buf *bytes.Buffer) error { _, err := buf.Write(fixture); return err } + readFile = func(_ string, buf *bytes.Buffer) (int64, error) { + n, err := buf.Write(fixture) + return int64(n), err + } benchmarkConnections(b) // 15553 ns/op, 12 allocs/op } diff --git a/probe/endpoint/procspy/proc.go b/probe/endpoint/procspy/proc.go index 84c5a00865..dd8c47ae14 100644 --- a/probe/endpoint/procspy/proc.go +++ b/probe/endpoint/procspy/proc.go @@ -24,10 +24,10 @@ func SetProcRoot(root string) { // walkProcPid walks over all numerical (PID) /proc entries, and sees if their // ./fd/* files are symlink to sockets. Returns a map from socket ID (inode) // to PID. Will return an error if /proc isn't there. -func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, error) { +func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]*Proc, error) { var ( - res = map[uint64]Proc{} - namespaces = map[uint64]struct{}{} + res = map[uint64]*Proc{} + namespaces = map[uint64]bool{} // map namespace id -> has connections statT syscall.Stat_t ) @@ -40,9 +40,14 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err if err := fs.Lstat(filepath.Join(procRoot, dirName, "/ns/net"), &statT); err != nil { return } - if _, ok := namespaces[statT.Ino]; !ok { - namespaces[statT.Ino] = struct{}{} - readFile(filepath.Join(procRoot, dirName, "/net/tcp"), buf) + hasConns, ok := namespaces[statT.Ino] + if !ok { + read, err := readFile(filepath.Join(procRoot, dirName, "/net/tcp"), buf) + hasConns = err == nil && read > 0 + namespaces[statT.Ino] = hasConns + } + if !hasConns { + return } fds, err := fs.ReadDirNames(fdBase) @@ -50,6 +55,7 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err // Process is be gone by now, or we don't have access. return } + var proc *Proc for _, fd := range fds { // Direct use of syscall.Stat() to save garbage. err = fs.Stat(filepath.Join(fdBase, fd), &statT) @@ -61,11 +67,13 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err if statT.Mode&syscall.S_IFMT != syscall.S_IFSOCK { continue } - - res[statT.Ino] = Proc{ - PID: uint(p.PID), - Name: p.Comm, + if proc == nil { + proc = &Proc{ + PID: uint(p.PID), + Name: p.Comm, + } } + res[statT.Ino] = proc } }) @@ -75,12 +83,11 @@ func walkProcPid(buf *bytes.Buffer, walker process.Walker) (map[uint64]Proc, err // readFile reads an arbitrary file into a buffer. It's a variable so it can // be overwritten for benchmarks. That's bad practice and we should change it // to be a dependency. -var readFile = func(filename string, buf *bytes.Buffer) error { +var readFile = func(filename string, buf *bytes.Buffer) (int64, error) { f, err := fs.Open(filename) if err != nil { - return err + return -1, err } - _, err = buf.ReadFrom(f) - f.Close() - return err + defer f.Close() + return buf.ReadFrom(f) } diff --git a/probe/endpoint/procspy/proc_internal_test.go b/probe/endpoint/procspy/proc_internal_test.go index 4ebd005524..19c9aa3811 100644 --- a/probe/endpoint/procspy/proc_internal_test.go +++ b/probe/endpoint/procspy/proc_internal_test.go @@ -33,6 +33,12 @@ var mockFS = fs.Dir("", FStat: syscall.Stat_t{}, }, ), + fs.Dir("net", + fs.File{ + FName: "tcp", + FContents: "I'm a little teapot", + }, + ), fs.File{ FName: "stat", FContents: "1 na R 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1", @@ -50,7 +56,7 @@ func TestWalkProcPid(t *testing.T) { if err != nil { t.Fatal(err) } - want := map[uint64]Proc{ + want := map[uint64]*Proc{ 45: { PID: 1, Name: "foo", diff --git a/probe/endpoint/procspy/spy_linux.go b/probe/endpoint/procspy/spy_linux.go index 29634edb0b..68b0d7c0bc 100644 --- a/probe/endpoint/procspy/spy_linux.go +++ b/probe/endpoint/procspy/spy_linux.go @@ -16,7 +16,7 @@ var bufPool = sync.Pool{ type pnConnIter struct { pn *ProcNet buf *bytes.Buffer - procs map[uint64]Proc + procs map[uint64]*Proc } func (c *pnConnIter) Next() *Connection { @@ -27,7 +27,7 @@ func (c *pnConnIter) Next() *Connection { return nil } if proc, ok := c.procs[n.inode]; ok { - n.Proc = proc + n.Proc = *proc } return n } @@ -38,7 +38,7 @@ var cbConnections = func(processes bool, walker process.Walker) (ConnIter, error buf := bufPool.Get().(*bytes.Buffer) buf.Reset() - var procs map[uint64]Proc + var procs map[uint64]*Proc if processes { var err error if procs, err = walkProcPid(buf, walker); err != nil {