From 9a69d8b5798fc87b1778299e8690ea05203da825 Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Wed, 2 Jun 2021 10:27:52 +0800 Subject: [PATCH 1/7] add rwmutex, fix thread safe issue --- graph/memstore/quadstore.go | 92 +++++++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 14 deletions(-) diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 21594501d..136fdcf85 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/cayleygraph/cayley/graph" "github.com/cayleygraph/cayley/graph/iterator" @@ -44,7 +45,7 @@ type bnode int64 func (n bnode) Key() interface{} { return n } type qprim struct { - p *Primitive + p *primitive } func (n qprim) Key() interface{} { return n.p.ID } @@ -60,7 +61,7 @@ type QuadDirectionIndex struct { } func NewQuadDirectionIndex() QuadDirectionIndex { - return QuadDirectionIndex{[...]map[int64]*Tree{ + return QuadDirectionIndex{index: [...]map[int64]*Tree{ quad.Subject - 1: make(map[int64]*Tree), quad.Predicate - 1: make(map[int64]*Tree), quad.Object - 1: make(map[int64]*Tree), @@ -72,6 +73,7 @@ func (qdi QuadDirectionIndex) Tree(d quad.Direction, id int64) *Tree { if d < quad.Subject || d > quad.Label { panic("illegal direction") } + tree, ok := qdi.index[d-1][id] if !ok { tree = TreeNew(cmp) @@ -88,7 +90,7 @@ func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*Tree, bool) { return tree, ok } -type Primitive struct { +type primitive struct { ID int64 Quad internalQuad Value quad.Value @@ -137,12 +139,17 @@ type QuadStore struct { // TODO: string -> quad.Value once Raw -> typed resolution is unnecessary vals map[string]int64 quads map[internalQuad]int64 - prim map[int64]*Primitive - all []*Primitive // might not be sorted by id + prim map[int64]*primitive + all []*primitive // might not be sorted by id reading bool // someone else might be reading "all" slice - next insert/delete should clone it index QuadDirectionIndex horizon int64 // used only to assign ids to tx // vip_index map[string]map[int64]map[string]map[int64]*b.Tree + + vrw sync.RWMutex + qrw sync.RWMutex + prw sync.RWMutex + irw sync.RWMutex } // New creates a new in-memory quad store and loads provided quads. @@ -158,17 +165,17 @@ func newQuadStore() *QuadStore { return &QuadStore{ vals: make(map[string]int64), quads: make(map[internalQuad]int64), - prim: make(map[int64]*Primitive), + prim: make(map[int64]*primitive), index: NewQuadDirectionIndex(), } } -func (qs *QuadStore) cloneAll() []*Primitive { +func (qs *QuadStore) cloneAll() []*primitive { qs.reading = true return qs.all } -func (qs *QuadStore) addPrimitive(p *Primitive) int64 { +func (qs *QuadStore) addPrimitive(p *primitive) int64 { qs.last++ id := qs.last p.ID = id @@ -177,8 +184,11 @@ func (qs *QuadStore) addPrimitive(p *Primitive) int64 { return id } -func (qs *QuadStore) appendPrimitive(p *Primitive) { +func (qs *QuadStore) appendPrimitive(p *primitive) { + qs.prw.Lock() qs.prim[p.ID] = p + qs.prw.Unlock() + if !qs.reading { qs.all = append(qs.all, p) } else { @@ -198,24 +208,36 @@ func (qs *QuadStore) resolveVal(v quad.Value, add bool) (int64, bool) { n = n[len(internalBNodePrefix):] id, err := strconv.ParseInt(string(n), 10, 64) if err == nil && id != 0 { + qs.prw.RLock() if p, ok := qs.prim[id]; ok || !add { + qs.prw.RUnlock() if add { p.refs++ } return id, ok } - qs.appendPrimitive(&Primitive{ID: id, refs: 1}) + qs.prw.RUnlock() + qs.appendPrimitive(&primitive{ID: id, refs: 1}) return id, true } } vs := v.String() + qs.vrw.RLock() if id, exists := qs.vals[vs]; exists || !add { + qs.vrw.RUnlock() if exists && add { + qs.prw.Lock() qs.prim[id].refs++ + qs.prw.Unlock() } return id, exists } - id := qs.addPrimitive(&Primitive{Value: v}) + qs.vrw.RUnlock() + + id := qs.addPrimitive(&primitive{Value: v}) + + qs.vrw.Lock() + defer qs.vrw.Unlock() qs.vals[vs] = id return id, true } @@ -237,7 +259,9 @@ func (qs *QuadStore) resolveQuad(q quad.Quad, add bool) (internalQuad, bool) { } func (qs *QuadStore) lookupVal(id int64) quad.Value { + qs.prw.RLock() pv := qs.prim[id] + qs.prw.RUnlock() if pv == nil || pv.Value == nil { return quad.BNode(internalBNodePrefix + strconv.FormatInt(id, 10)) } @@ -259,7 +283,7 @@ func (qs *QuadStore) lookupQuadDirs(p internalQuad) quad.Quad { // AddNode adds a blank node (with no value) to quad store. It returns an id of the node. func (qs *QuadStore) AddBNode() int64 { - return qs.addPrimitive(&Primitive{}) + return qs.addPrimitive(&primitive{}) } // AddNode adds a value to quad store. It returns an id of the value. @@ -270,6 +294,9 @@ func (qs *QuadStore) AddValue(v quad.Value) (int64, bool) { } func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { + qs.irw.Lock() + defer qs.irw.Unlock() + trees := make([]*Tree, 0, 4) for dir := quad.Subject; dir <= quad.Label; dir++ { v := q.Dir(dir) @@ -285,13 +312,20 @@ func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { // False is returned as a second parameter if quad exists already. func (qs *QuadStore) AddQuad(q quad.Quad) (int64, bool) { p, _ := qs.resolveQuad(q, false) + qs.qrw.RLock() if id := qs.quads[p]; id != 0 { + qs.qrw.RUnlock() return id, false } + qs.qrw.RUnlock() p, _ = qs.resolveQuad(q, true) - pr := &Primitive{Quad: p} + pr := &primitive{Quad: p} id := qs.addPrimitive(pr) + + qs.qrw.Lock() qs.quads[p] = id + qs.qrw.Unlock() + for _, t := range qs.indexesForQuad(p) { t.Set(id, pr) } @@ -345,32 +379,44 @@ func (qs *QuadStore) deleteQuadNodes(q internalQuad) { if id == 0 { continue } + qs.prw.RLock() if p := qs.prim[id]; p != nil { + qs.prw.RUnlock() p.refs-- if p.refs < 0 { panic("remove of deleted node") } else if p.refs == 0 { qs.Delete(id) } + } else { + qs.prw.RUnlock() } } } func (qs *QuadStore) Delete(id int64) bool { + qs.prw.RLock() p := qs.prim[id] + qs.prw.RUnlock() if p == nil { return false } // remove from value index if p.Value != nil { + qs.vrw.Lock() delete(qs.vals, p.Value.String()) + qs.vrw.Unlock() } // remove from quad indexes for _, t := range qs.indexesForQuad(p.Quad) { t.Delete(id) } + qs.qrw.Lock() delete(qs.quads, p.Quad) + qs.qrw.Unlock() // remove primitive + qs.prw.Lock() delete(qs.prim, id) + qs.prw.Unlock() di := -1 for i, p2 := range qs.all { if p == p2 { @@ -382,7 +428,7 @@ func (qs *QuadStore) Delete(id int64) bool { if !qs.reading { qs.all = append(qs.all[:di], qs.all[di+1:]...) } else { - all := make([]*Primitive, 0, len(qs.all)-1) + all := make([]*primitive, 0, len(qs.all)-1) all = append(all, qs.all[:di]...) all = append(all, qs.all[di+1:]...) qs.all = all @@ -398,6 +444,8 @@ func (qs *QuadStore) findQuad(q quad.Quad) (int64, internalQuad, bool) { if !ok { return 0, p, false } + qs.qrw.Lock() + defer qs.qrw.Unlock() id := qs.quads[p] return id, p, id != 0 } @@ -456,7 +504,9 @@ func asID(v graph.Ref) (int64, bool) { func (qs *QuadStore) quad(v graph.Ref) (q internalQuad, ok bool) { switch v := v.(type) { case bnode: + qs.prw.RLock() p := qs.prim[int64(v)] + qs.prw.RUnlock() if p == nil { return } @@ -482,7 +532,9 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Ref) iterator.Sh if !ok { return iterator.NewNull() } + qs.irw.RLock() index, ok := qs.index.Get(d, id) + qs.irw.RUnlock() if ok && index.Len() != 0 { return qs.newIterator(index, d, id) } @@ -494,7 +546,9 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g if !ok { return refs.Size{Value: 0, Exact: true}, nil } + qs.irw.RLock() index, ok := qs.index.Get(d, id) + qs.irw.RUnlock() if !ok { return refs.Size{Value: 0, Exact: true}, nil } @@ -502,6 +556,10 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g } func (qs *QuadStore) Stats(ctx context.Context, exact bool) (graph.Stats, error) { + qs.vrw.RLock() + defer qs.vrw.RUnlock() + qs.qrw.RLock() + defer qs.qrw.RUnlock() return graph.Stats{ Nodes: refs.Size{ Value: int64(len(qs.vals)), @@ -518,7 +576,10 @@ func (qs *QuadStore) ValueOf(name quad.Value) graph.Ref { if name == nil { return nil } + + qs.vrw.Lock() id := qs.vals[name.String()] + qs.vrw.Unlock() if id == 0 { return nil } @@ -535,9 +596,12 @@ func (qs *QuadStore) NameOf(v graph.Ref) quad.Value { if !ok { return nil } + qs.prw.RLock() if _, ok = qs.prim[n]; !ok { + qs.prw.RUnlock() return nil } + qs.prw.RUnlock() return qs.lookupVal(n) } From 49cdd063ffc2b990fbfefe86c792a93dbbed0827 Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Wed, 2 Jun 2021 10:32:46 +0800 Subject: [PATCH 2/7] format code --- graph/memstore/quadstore.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 136fdcf85..006fce5f5 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -45,7 +45,7 @@ type bnode int64 func (n bnode) Key() interface{} { return n } type qprim struct { - p *primitive + p *Primitive } func (n qprim) Key() interface{} { return n.p.ID } @@ -90,7 +90,7 @@ func (qdi QuadDirectionIndex) Get(d quad.Direction, id int64) (*Tree, bool) { return tree, ok } -type primitive struct { +type Primitive struct { ID int64 Quad internalQuad Value quad.Value @@ -139,8 +139,8 @@ type QuadStore struct { // TODO: string -> quad.Value once Raw -> typed resolution is unnecessary vals map[string]int64 quads map[internalQuad]int64 - prim map[int64]*primitive - all []*primitive // might not be sorted by id + prim map[int64]*Primitive + all []*Primitive // might not be sorted by id reading bool // someone else might be reading "all" slice - next insert/delete should clone it index QuadDirectionIndex horizon int64 // used only to assign ids to tx @@ -165,17 +165,17 @@ func newQuadStore() *QuadStore { return &QuadStore{ vals: make(map[string]int64), quads: make(map[internalQuad]int64), - prim: make(map[int64]*primitive), + prim: make(map[int64]*Primitive), index: NewQuadDirectionIndex(), } } -func (qs *QuadStore) cloneAll() []*primitive { +func (qs *QuadStore) cloneAll() []*Primitive { qs.reading = true return qs.all } -func (qs *QuadStore) addPrimitive(p *primitive) int64 { +func (qs *QuadStore) addPrimitive(p *Primitive) int64 { qs.last++ id := qs.last p.ID = id @@ -184,7 +184,7 @@ func (qs *QuadStore) addPrimitive(p *primitive) int64 { return id } -func (qs *QuadStore) appendPrimitive(p *primitive) { +func (qs *QuadStore) appendPrimitive(p *Primitive) { qs.prw.Lock() qs.prim[p.ID] = p qs.prw.Unlock() @@ -217,7 +217,7 @@ func (qs *QuadStore) resolveVal(v quad.Value, add bool) (int64, bool) { return id, ok } qs.prw.RUnlock() - qs.appendPrimitive(&primitive{ID: id, refs: 1}) + qs.appendPrimitive(&Primitive{ID: id, refs: 1}) return id, true } } @@ -234,7 +234,7 @@ func (qs *QuadStore) resolveVal(v quad.Value, add bool) (int64, bool) { } qs.vrw.RUnlock() - id := qs.addPrimitive(&primitive{Value: v}) + id := qs.addPrimitive(&Primitive{Value: v}) qs.vrw.Lock() defer qs.vrw.Unlock() @@ -283,7 +283,7 @@ func (qs *QuadStore) lookupQuadDirs(p internalQuad) quad.Quad { // AddNode adds a blank node (with no value) to quad store. It returns an id of the node. func (qs *QuadStore) AddBNode() int64 { - return qs.addPrimitive(&primitive{}) + return qs.addPrimitive(&Primitive{}) } // AddNode adds a value to quad store. It returns an id of the value. @@ -319,7 +319,7 @@ func (qs *QuadStore) AddQuad(q quad.Quad) (int64, bool) { } qs.qrw.RUnlock() p, _ = qs.resolveQuad(q, true) - pr := &primitive{Quad: p} + pr := &Primitive{Quad: p} id := qs.addPrimitive(pr) qs.qrw.Lock() @@ -413,7 +413,7 @@ func (qs *QuadStore) Delete(id int64) bool { qs.qrw.Lock() delete(qs.quads, p.Quad) qs.qrw.Unlock() - // remove primitive + // remove Primitive qs.prw.Lock() delete(qs.prim, id) qs.prw.Unlock() @@ -428,7 +428,7 @@ func (qs *QuadStore) Delete(id int64) bool { if !qs.reading { qs.all = append(qs.all[:di], qs.all[di+1:]...) } else { - all := make([]*primitive, 0, len(qs.all)-1) + all := make([]*Primitive, 0, len(qs.all)-1) all = append(all, qs.all[:di]...) all = append(all, qs.all[di+1:]...) qs.all = all From c3bbf124446d02532f9e54b2d992d4a0235266f8 Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Thu, 3 Jun 2021 15:50:43 +0800 Subject: [PATCH 3/7] add test case for multi-thread rw --- go.mod | 1 + go.sum | 2 + graph/memstore/quadstore.go | 106 +++++++++++++++---------------- graph/memstore/quadstore_test.go | 45 +++++++++++++ graph/nosql/elastic/elastic.go | 1 - graph/nosql/mongo/mongo.go | 1 - 6 files changed, 101 insertions(+), 55 deletions(-) diff --git a/go.mod b/go.mod index 28bdd2ff4..7f1df8747 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cayleygraph/cayley go 1.12 require ( + github.com/RyouZhang/async-go v0.2.2 // indirect github.com/badgerodon/peg v0.0.0-20130729175151-9e5f7f4d07ca github.com/cayleygraph/quad v1.2.4 github.com/cockroachdb/apd v1.1.0 // indirect diff --git a/go.sum b/go.sum index 59b61846d..8f7216275 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcy github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/RyouZhang/async-go v0.2.2 h1:/cRhxuJkRLygF/a+c9Teaa96QIY22mOx1YrOK+WSgAA= +github.com/RyouZhang/async-go v0.2.2/go.mod h1:ogL6baAxf0sZWPi/i9i4XdTt8D1Vn/AxNeG0eUWrY18= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/graph/memstore/quadstore.go b/graph/memstore/quadstore.go index 006fce5f5..948779590 100644 --- a/graph/memstore/quadstore.go +++ b/graph/memstore/quadstore.go @@ -146,10 +146,10 @@ type QuadStore struct { horizon int64 // used only to assign ids to tx // vip_index map[string]map[int64]map[string]map[int64]*b.Tree - vrw sync.RWMutex - qrw sync.RWMutex - prw sync.RWMutex - irw sync.RWMutex + valsMu sync.RWMutex + quadsMu sync.RWMutex + primMu sync.RWMutex + indexMu sync.RWMutex } // New creates a new in-memory quad store and loads provided quads. @@ -185,9 +185,9 @@ func (qs *QuadStore) addPrimitive(p *Primitive) int64 { } func (qs *QuadStore) appendPrimitive(p *Primitive) { - qs.prw.Lock() + qs.primMu.Lock() qs.prim[p.ID] = p - qs.prw.Unlock() + qs.primMu.Unlock() if !qs.reading { qs.all = append(qs.all, p) @@ -208,36 +208,36 @@ func (qs *QuadStore) resolveVal(v quad.Value, add bool) (int64, bool) { n = n[len(internalBNodePrefix):] id, err := strconv.ParseInt(string(n), 10, 64) if err == nil && id != 0 { - qs.prw.RLock() + qs.primMu.RLock() if p, ok := qs.prim[id]; ok || !add { - qs.prw.RUnlock() + qs.primMu.RUnlock() if add { p.refs++ } return id, ok } - qs.prw.RUnlock() + qs.primMu.RUnlock() qs.appendPrimitive(&Primitive{ID: id, refs: 1}) return id, true } } vs := v.String() - qs.vrw.RLock() + qs.valsMu.RLock() if id, exists := qs.vals[vs]; exists || !add { - qs.vrw.RUnlock() + qs.valsMu.RUnlock() if exists && add { - qs.prw.Lock() + qs.primMu.Lock() qs.prim[id].refs++ - qs.prw.Unlock() + qs.primMu.Unlock() } return id, exists } - qs.vrw.RUnlock() + qs.valsMu.RUnlock() id := qs.addPrimitive(&Primitive{Value: v}) - qs.vrw.Lock() - defer qs.vrw.Unlock() + qs.valsMu.Lock() + defer qs.valsMu.Unlock() qs.vals[vs] = id return id, true } @@ -259,9 +259,9 @@ func (qs *QuadStore) resolveQuad(q quad.Quad, add bool) (internalQuad, bool) { } func (qs *QuadStore) lookupVal(id int64) quad.Value { - qs.prw.RLock() + qs.primMu.RLock() pv := qs.prim[id] - qs.prw.RUnlock() + qs.primMu.RUnlock() if pv == nil || pv.Value == nil { return quad.BNode(internalBNodePrefix + strconv.FormatInt(id, 10)) } @@ -294,8 +294,8 @@ func (qs *QuadStore) AddValue(v quad.Value) (int64, bool) { } func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { - qs.irw.Lock() - defer qs.irw.Unlock() + qs.indexMu.Lock() + defer qs.indexMu.Unlock() trees := make([]*Tree, 0, 4) for dir := quad.Subject; dir <= quad.Label; dir++ { @@ -312,19 +312,19 @@ func (qs *QuadStore) indexesForQuad(q internalQuad) []*Tree { // False is returned as a second parameter if quad exists already. func (qs *QuadStore) AddQuad(q quad.Quad) (int64, bool) { p, _ := qs.resolveQuad(q, false) - qs.qrw.RLock() + qs.quadsMu.RLock() if id := qs.quads[p]; id != 0 { - qs.qrw.RUnlock() + qs.quadsMu.RUnlock() return id, false } - qs.qrw.RUnlock() + qs.quadsMu.RUnlock() p, _ = qs.resolveQuad(q, true) pr := &Primitive{Quad: p} id := qs.addPrimitive(pr) - qs.qrw.Lock() + qs.quadsMu.Lock() qs.quads[p] = id - qs.qrw.Unlock() + qs.quadsMu.Unlock() for _, t := range qs.indexesForQuad(p) { t.Set(id, pr) @@ -379,9 +379,9 @@ func (qs *QuadStore) deleteQuadNodes(q internalQuad) { if id == 0 { continue } - qs.prw.RLock() + qs.primMu.RLock() if p := qs.prim[id]; p != nil { - qs.prw.RUnlock() + qs.primMu.RUnlock() p.refs-- if p.refs < 0 { panic("remove of deleted node") @@ -389,34 +389,34 @@ func (qs *QuadStore) deleteQuadNodes(q internalQuad) { qs.Delete(id) } } else { - qs.prw.RUnlock() + qs.primMu.RUnlock() } } } func (qs *QuadStore) Delete(id int64) bool { - qs.prw.RLock() + qs.primMu.RLock() p := qs.prim[id] - qs.prw.RUnlock() + qs.primMu.RUnlock() if p == nil { return false } // remove from value index if p.Value != nil { - qs.vrw.Lock() + qs.valsMu.Lock() delete(qs.vals, p.Value.String()) - qs.vrw.Unlock() + qs.valsMu.Unlock() } // remove from quad indexes for _, t := range qs.indexesForQuad(p.Quad) { t.Delete(id) } - qs.qrw.Lock() + qs.quadsMu.Lock() delete(qs.quads, p.Quad) - qs.qrw.Unlock() + qs.quadsMu.Unlock() // remove Primitive - qs.prw.Lock() + qs.primMu.Lock() delete(qs.prim, id) - qs.prw.Unlock() + qs.primMu.Unlock() di := -1 for i, p2 := range qs.all { if p == p2 { @@ -444,8 +444,8 @@ func (qs *QuadStore) findQuad(q quad.Quad) (int64, internalQuad, bool) { if !ok { return 0, p, false } - qs.qrw.Lock() - defer qs.qrw.Unlock() + qs.quadsMu.Lock() + defer qs.quadsMu.Unlock() id := qs.quads[p] return id, p, id != 0 } @@ -504,9 +504,9 @@ func asID(v graph.Ref) (int64, bool) { func (qs *QuadStore) quad(v graph.Ref) (q internalQuad, ok bool) { switch v := v.(type) { case bnode: - qs.prw.RLock() + qs.primMu.RLock() p := qs.prim[int64(v)] - qs.prw.RUnlock() + qs.primMu.RUnlock() if p == nil { return } @@ -532,9 +532,9 @@ func (qs *QuadStore) QuadIterator(d quad.Direction, value graph.Ref) iterator.Sh if !ok { return iterator.NewNull() } - qs.irw.RLock() + qs.indexMu.RLock() index, ok := qs.index.Get(d, id) - qs.irw.RUnlock() + qs.indexMu.RUnlock() if ok && index.Len() != 0 { return qs.newIterator(index, d, id) } @@ -546,9 +546,9 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g if !ok { return refs.Size{Value: 0, Exact: true}, nil } - qs.irw.RLock() + qs.indexMu.RLock() index, ok := qs.index.Get(d, id) - qs.irw.RUnlock() + qs.indexMu.RUnlock() if !ok { return refs.Size{Value: 0, Exact: true}, nil } @@ -556,10 +556,10 @@ func (qs *QuadStore) QuadIteratorSize(ctx context.Context, d quad.Direction, v g } func (qs *QuadStore) Stats(ctx context.Context, exact bool) (graph.Stats, error) { - qs.vrw.RLock() - defer qs.vrw.RUnlock() - qs.qrw.RLock() - defer qs.qrw.RUnlock() + qs.valsMu.RLock() + defer qs.valsMu.RUnlock() + qs.quadsMu.RLock() + defer qs.quadsMu.RUnlock() return graph.Stats{ Nodes: refs.Size{ Value: int64(len(qs.vals)), @@ -577,9 +577,9 @@ func (qs *QuadStore) ValueOf(name quad.Value) graph.Ref { return nil } - qs.vrw.Lock() + qs.valsMu.Lock() id := qs.vals[name.String()] - qs.vrw.Unlock() + qs.valsMu.Unlock() if id == 0 { return nil } @@ -596,12 +596,12 @@ func (qs *QuadStore) NameOf(v graph.Ref) quad.Value { if !ok { return nil } - qs.prw.RLock() + qs.primMu.RLock() if _, ok = qs.prim[n]; !ok { - qs.prw.RUnlock() + qs.primMu.RUnlock() return nil } - qs.prw.RUnlock() + qs.primMu.RUnlock() return qs.lookupVal(n) } diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index 42b77f41a..c469005e9 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -19,6 +19,8 @@ import ( "reflect" "sort" "testing" + "fmt" + "time" "github.com/stretchr/testify/require" @@ -28,7 +30,10 @@ import ( "github.com/cayleygraph/cayley/graph/refs" "github.com/cayleygraph/cayley/query/shape" "github.com/cayleygraph/cayley/writer" + "github.com/cayleygraph/cayley/query/path" "github.com/cayleygraph/quad" + + "github.com/RyouZhang/async-go" ) // This is a simple test graph. @@ -251,3 +256,43 @@ func TestTransaction(t *testing.T) { require.NoError(t, err) require.Equal(t, st, st2, "Appended a new quad in a failed transaction") } + +// test multi thread insert and query +func TestMultiThreadQuery(t *testing.T) { + qs, _, _ := makeTestStore(simpleGraph) + + // we make 50 insert, 50 query + funcs := make([]async.LambdaMethod, 100) + for i:=0; i<100; i++ { + if i % 2 == 0 { + index := i + funcs[i] = func() (interface{}, error) { + id, flag := qs.AddQuad(quad.Make( + fmt.Sprintf("E_%d",index), "follows", "G", nil), + ) + if !flag { + return nil, fmt.Errorf("quard exist:%d", id) + } else { + return id, nil + } + } + } else { + funcs[i] = func() (interface{}, error) { + ctx := context.Background() + followers, err := path.StartPath(qs, quad.Raw("G")).In("follows").Iterate(ctx).AllValues(qs) + if err != nil { + return nil, err + } + return followers, nil + } + } + } + + results := async.All(funcs, 1* time.Second) + for _, result := range results { + switch result.(type) { + case error: + require.NoError(t, result.(error)) + } + } +} diff --git a/graph/nosql/elastic/elastic.go b/graph/nosql/elastic/elastic.go index 840d5f0b3..2cb7883c1 100644 --- a/graph/nosql/elastic/elastic.go +++ b/graph/nosql/elastic/elastic.go @@ -6,7 +6,6 @@ import ( "github.com/hidal-go/hidalgo/legacy/nosql/elastic" //import hidal-go first so the registration of the no sql stores occurs before quadstore iterates for registration gnosql "github.com/cayleygraph/cayley/graph/nosql" - ) const Type = elastic.Name diff --git a/graph/nosql/mongo/mongo.go b/graph/nosql/mongo/mongo.go index 0b2649b0c..2dcdc1388 100644 --- a/graph/nosql/mongo/mongo.go +++ b/graph/nosql/mongo/mongo.go @@ -6,7 +6,6 @@ import ( "github.com/hidal-go/hidalgo/legacy/nosql/mongo" //import hidal-go first so the registration of the no sql stores occurs before quadstore iterates for registration gnosql "github.com/cayleygraph/cayley/graph/nosql" - ) const Type = mongo.Name From ce9cd4cd92174d4ba64b089042a512b56dad3791 Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Thu, 3 Jun 2021 15:51:16 +0800 Subject: [PATCH 4/7] format code --- graph/memstore/quadstore_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index c469005e9..7b97e88d0 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -16,10 +16,10 @@ package memstore import ( "context" + "fmt" "reflect" "sort" "testing" - "fmt" "time" "github.com/stretchr/testify/require" @@ -28,9 +28,9 @@ import ( "github.com/cayleygraph/cayley/graph/graphtest" "github.com/cayleygraph/cayley/graph/iterator" "github.com/cayleygraph/cayley/graph/refs" + "github.com/cayleygraph/cayley/query/path" "github.com/cayleygraph/cayley/query/shape" "github.com/cayleygraph/cayley/writer" - "github.com/cayleygraph/cayley/query/path" "github.com/cayleygraph/quad" "github.com/RyouZhang/async-go" @@ -263,12 +263,12 @@ func TestMultiThreadQuery(t *testing.T) { // we make 50 insert, 50 query funcs := make([]async.LambdaMethod, 100) - for i:=0; i<100; i++ { - if i % 2 == 0 { + for i := 0; i < 100; i++ { + if i%2 == 0 { index := i funcs[i] = func() (interface{}, error) { id, flag := qs.AddQuad(quad.Make( - fmt.Sprintf("E_%d",index), "follows", "G", nil), + fmt.Sprintf("E_%d", index), "follows", "G", nil), ) if !flag { return nil, fmt.Errorf("quard exist:%d", id) @@ -283,12 +283,12 @@ func TestMultiThreadQuery(t *testing.T) { if err != nil { return nil, err } - return followers, nil + return followers, nil } } } - results := async.All(funcs, 1* time.Second) + results := async.All(funcs, 1*time.Second) for _, result := range results { switch result.(type) { case error: From 28ea5d7cbf33d49ddaf8ce590d3a2ab65c1da07a Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Thu, 3 Jun 2021 16:03:09 +0800 Subject: [PATCH 5/7] fix lint bug --- graph/memstore/quadstore_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index 7b97e88d0..246ba8d85 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -272,9 +272,8 @@ func TestMultiThreadQuery(t *testing.T) { ) if !flag { return nil, fmt.Errorf("quard exist:%d", id) - } else { - return id, nil } + return id, nil } } else { funcs[i] = func() (interface{}, error) { From 8c05dcab2bf40d3281b0608ac5a836cf616badcb Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Mon, 21 Jun 2021 10:44:32 +0800 Subject: [PATCH 6/7] fix map rw bug --- graph/memstore/all_iterator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/graph/memstore/all_iterator.go b/graph/memstore/all_iterator.go index 8a26424f8..3981ac4f7 100644 --- a/graph/memstore/all_iterator.go +++ b/graph/memstore/all_iterator.go @@ -178,7 +178,9 @@ func (it *allIteratorContains) Contains(ctx context.Context, v graph.Ref) bool { if !ok { return false } + it.qs.primMu.RLock() p := it.qs.prim[id] + it.qs.primMu.RUnlock() if p.ID > it.maxid { return false } From fb132a019e4a0b02e87490957015da75a67038c4 Mon Sep 17 00:00:00 2001 From: zhangliang03 Date: Mon, 21 Jun 2021 16:19:42 +0800 Subject: [PATCH 7/7] fix bug, add rw --- graph/memstore/quadstore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/memstore/quadstore_test.go b/graph/memstore/quadstore_test.go index 246ba8d85..e2fc8ae58 100644 --- a/graph/memstore/quadstore_test.go +++ b/graph/memstore/quadstore_test.go @@ -273,7 +273,7 @@ func TestMultiThreadQuery(t *testing.T) { if !flag { return nil, fmt.Errorf("quard exist:%d", id) } - return id, nil + return id, nil } } else { funcs[i] = func() (interface{}, error) {