diff --git a/conn/node.go b/conn/node.go index 518aa41db4f..d84c94554c0 100644 --- a/conn/node.go +++ b/conn/node.go @@ -38,6 +38,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error { } // RunReadIndexLoop runs the RAFT index in a loop. -func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) { +func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) { defer closer.Done() readIndex := func(activeRctx []byte) (uint64, error) { // Read Request can get rejected then we would wait indefinitely on the channel diff --git a/conn/pool.go b/conn/pool.go index 8252db47499..dbc7a79ccf2 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -21,10 +21,10 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "go.opencensus.io/plugin/ocgrpc" @@ -50,7 +50,7 @@ type Pool struct { lastEcho time.Time Addr string - closer *y.Closer + closer *z.Closer healthInfo pb.HealthInfo } @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) { if err != nil { return nil, err } - pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)} + pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)} go pl.MonitorHealth() return pl, nil } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 5732e331386..48fe3f802e3 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -33,7 +33,6 @@ import ( "syscall" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/ee/enc" @@ -43,7 +42,7 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" - + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "github.com/spf13/cast" @@ -429,7 +428,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { } } -func setupServer(closer *y.Closer) { +func setupServer(closer *z.Closer) { go worker.RunServer(bindall) // For pb.communication. laddr := "localhost" @@ -713,7 +712,7 @@ func run() { }() // Setup external communication. - aclCloser := y.NewCloser(1) + aclCloser := z.NewCloser(1) go func() { worker.StartRaftNodes(worker.State.WALstore, bindall) // initialization of the admin account can only be done after raft nodes are running @@ -724,7 +723,7 @@ func run() { // Graphql subscribes to alpha to get schema updates. We need to close that before we // close alpha. This closer is for closing and waiting that subscription. - adminCloser := y.NewCloser(1) + adminCloser := z.NewCloser(1) setupServer(adminCloser) glog.Infoln("GRPC and HTTP stopped.") diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 2afd9f922d1..0ed9566c490 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -44,6 +44,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type reducer struct { @@ -334,7 +335,7 @@ func (r *reducer) streamIdFor(pred string) uint32 { return streamId } -func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() for req := range entryCh { @@ -384,7 +385,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s x.Check(ci.splitWriter.Flush()) } -func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() // Concurrently write split lists to a temporary badger. @@ -457,14 +458,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou fmt.Printf("Num CPUs: %d\n", cpu) encoderCh := make(chan *encodeRequest, 2*cpu) writerCh := make(chan *encodeRequest, 2*cpu) - encoderCloser := y.NewCloser(cpu) + encoderCloser := z.NewCloser(cpu) for i := 0; i < cpu; i++ { // Start listening to encode entries // For time being let's lease 100 stream id for each encoder. go r.encode(encoderCh, encoderCloser) } // Start listening to write the badger list. - writerCloser := y.NewCloser(1) + writerCloser := z.NewCloser(1) go r.startWriting(ci, writerCh, writerCloser) for i := 0; i < len(partitionKeys); i++ { diff --git a/dgraph/cmd/zero/license.go b/dgraph/cmd/zero/license.go index 84f0c75e4b6..6de7366fe6b 100644 --- a/dgraph/cmd/zero/license.go +++ b/dgraph/cmd/zero/license.go @@ -21,8 +21,8 @@ package zero import ( "net/http" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" ) // dummy function as enterprise features are not available in oss binary. @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error { } // periodically checks the validity of the enterprise license and updates the membership state. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { closer.Done() } diff --git a/dgraph/cmd/zero/license_ee.go b/dgraph/cmd/zero/license_ee.go index 29ff9f64c70..95f8ef3c3d9 100644 --- a/dgraph/cmd/zero/license_ee.go +++ b/dgraph/cmd/zero/license_ee.go @@ -20,9 +20,9 @@ import ( "net/http" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/gogo/protobuf/proto" "github.com/golang/glog" @@ -61,7 +61,7 @@ func (s *Server) expireLicense() { // periodically checks the validity of the enterprise license and // 1. Sets license.Enabled to false in membership state if license has expired. // 2. Prints out warning once every day a week before the license is set to expire. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { defer closer.Done() interval := 5 * time.Second diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 120cb70bab4..bf2f39e58fa 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -28,10 +28,10 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" farm "github.com/dgryski/go-farm" "github.com/golang/glog" "github.com/google/uuid" @@ -44,7 +44,7 @@ type node struct { *conn.Node server *Server ctx context.Context - closer *y.Closer // to stop Run. + closer *z.Closer // to stop Run. // The last timestamp when this Zero was able to reach quorum. mu sync.RWMutex @@ -533,7 +533,7 @@ func (n *node) initAndStartNode() error { return nil } -func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { +func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -550,7 +550,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01)) -func (n *node) checkQuorum(closer *y.Closer) { +func (n *node) checkQuorum(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -588,7 +588,7 @@ func (n *node) checkQuorum(closer *y.Closer) { } } -func (n *node) snapshotPeriodically(closer *y.Closer) { +func (n *node) snapshotPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -630,7 +630,7 @@ func (n *node) Run() { // snapshot can cause select loop to block while deleting entries, so run // it in goroutine readStateCh := make(chan raft.ReadState, 100) - closer := y.NewCloser(5) + closer := z.NewCloser(5) defer func() { closer.SignalAndWait() n.closer.Done() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 0f33e393c0e..52b81c3df5f 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -35,12 +35,12 @@ import ( "github.com/dgraph-io/badger/v2" bopt "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/spf13/cobra" ) @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { m.Cfg.DisableProposalForwarding = true st.rs = conn.NewRaftServer(m) - st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)} + st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)} st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node} st.zero.Init() st.node.server = st.zero @@ -301,7 +301,7 @@ func run() { x.Checkf(err, "Error while opening WAL store") defer kv.Close() - gcCloser := y.NewCloser(1) // closer for vLogGC + gcCloser := z.NewCloser(1) // closer for vLogGC go x.RunVlogGC(kv, gcCloser) defer gcCloser.SignalAndWait() diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 47fabb5fa71..5ceaecd8c8e 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -26,12 +26,12 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/telemetry" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -66,7 +66,7 @@ type Server struct { // groupMap map[uint32]*Group nextGroup uint32 leaderChangeCh chan struct{} - closer *y.Closer // Used to tell stream to close. + closer *z.Closer // Used to tell stream to close. connectLock sync.Mutex // Used to serialize connect requests from servers. moveOngoing chan struct{} @@ -89,7 +89,7 @@ func (s *Server) Init() { s.nextTxnTs = 1 s.nextGroup = 1 s.leaderChangeCh = make(chan struct{}, 1) - s.closer = y.NewCloser(2) // grpc and http + s.closer = z.NewCloser(2) // grpc and http s.blockCommitsOn = new(sync.Map) s.moveOngoing = make(chan struct{}, 1) diff --git a/edgraph/access.go b/edgraph/access.go index 744275006e3..724e8819892 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -21,10 +21,10 @@ package edgraph import ( "context" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -46,7 +46,7 @@ func ResetAcl() { } // ResetAcls is an empty method since ACL is only supported in the enterprise version. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { // do nothing <-closer.HasBeenClosed() closer.Done() diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 89be006eac7..5f54621ba24 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -19,18 +19,17 @@ import ( "strings" "time" - "github.com/pkg/errors" - - "github.com/dgraph-io/badger/v2/y" - "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" + jwt "github.com/dgrijalva/jwt-go" "github.com/golang/glog" + "github.com/pkg/errors" otrace "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" @@ -295,7 +294,7 @@ func authorizeUser(ctx context.Context, userid string, password string) ( } // RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { defer closer.Done() if len(worker.Config.HmacSecret) == 0 { // the acl feature is not turned on diff --git a/genre.rdf b/genre.rdf new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go.mod b/go.mod index 6219455c8ba..1f07401b27c 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7 - github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de + github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1 diff --git a/go.sum b/go.sum index 4e9f56e27e8..761e068f338 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,9 @@ github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 h1:DUDFTV github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7 h1:9oFXHEReyRIB291rbdGwRk1PYegGO2XBtZ8muXPKqPA= github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 3ac0029816c..9cc9e70a608 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -29,13 +29,13 @@ import ( "github.com/pkg/errors" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/graphql/resolve" "github.com/dgraph-io/dgraph/graphql/schema" "github.com/dgraph-io/dgraph/graphql/web" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -298,7 +298,7 @@ type adminServer struct { // NewServers initializes the GraphQL servers. It sets up an empty server for the // main /graphql endpoint and an admin server. The result is mainServer, adminServer. -func NewServers(withIntrospection bool, closer *y.Closer) (web.IServeGraphQL, web.IServeGraphQL) { +func NewServers(withIntrospection bool, closer *z.Closer) (web.IServeGraphQL, web.IServeGraphQL) { gqlSchema, err := schema.FromString("") if err != nil { x.Panic(err) @@ -325,7 +325,7 @@ func newAdminResolver( gqlServer web.IServeGraphQL, fns *resolve.ResolverFns, withIntrospection bool, - closer *y.Closer) *resolve.RequestResolver { + closer *z.Closer) *resolve.RequestResolver { adminSchema, err := schema.FromString(graphqlAdminSchema) if err != nil { diff --git a/posting/lists.go b/posting/lists.go index b11d96a0b53..94610d89492 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -32,8 +32,8 @@ import ( ostats "go.opencensus.io/stats" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/dgraph-io/dgraph/protos/pb" @@ -90,7 +90,7 @@ func getMemUsage() int { return rss * os.Getpagesize() } -func updateMemoryMetrics(lc *y.Closer) { +func updateMemoryMetrics(lc *z.Closer) { defer lc.Done() ticker := time.NewTicker(time.Minute) defer ticker.Stop() @@ -130,13 +130,13 @@ func updateMemoryMetrics(lc *y.Closer) { var ( pstore *badger.DB - closer *y.Closer + closer *z.Closer ) // Init initializes the posting lists package, the in memory and dirty list hash. func Init(ps *badger.DB) { pstore = ps - closer = y.NewCloser(1) + closer = z.NewCloser(1) go updateMemoryMetrics(closer) } diff --git a/posting/mvcc.go b/posting/mvcc.go index 6ca016b44ab..6891ede3a22 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -101,7 +100,7 @@ func (ir *incrRollupi) addKeyToBatch(key []byte) { } // Process will rollup batches of 64 keys in a go routine. -func (ir *incrRollupi) Process(closer *y.Closer) { +func (ir *incrRollupi) Process(closer *z.Closer) { defer closer.Done() writer := NewTxnWriter(pstore) diff --git a/raftwal/storage.go b/raftwal/storage.go index a3ad8bc4b1b..be3e399869e 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -23,9 +23,9 @@ import ( "sync" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -42,7 +42,7 @@ type DiskStorage struct { elog trace.EventLog cache *sync.Map - Closer *y.Closer + Closer *z.Closer indexRangeChan chan indexRange } @@ -57,7 +57,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { id: id, gid: gid, cache: new(sync.Map), - Closer: y.NewCloser(1), + Closer: z.NewCloser(1), indexRangeChan: make(chan indexRange, 16), } if prev, err := RaftId(db); err != nil || prev != id { diff --git a/worker/draft.go b/worker/draft.go index f3fe4d88e41..05785f521da 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -38,7 +38,6 @@ import ( otrace "go.opencensus.io/trace" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/posting" @@ -47,6 +46,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type node struct { @@ -60,12 +60,12 @@ type node struct { applyCh chan []*pb.Proposal ctx context.Context gid uint32 - closer *y.Closer + closer *z.Closer streaming int32 // Used to avoid calculating snapshot // Used to track the ops going on in the system. - ops map[op]*y.Closer + ops map[op]*z.Closer opsLock sync.Mutex canCampaign bool @@ -104,7 +104,7 @@ const ( // You should only call Done() on the returned closer. Calling other // functions (such as SignalAndWait) for closer could result in panics. // For more details, see GitHub issue #5034. -func (n *node) startTask(id op) (*y.Closer, error) { +func (n *node) startTask(id op) (*z.Closer, error) { n.opsLock.Lock() defer n.opsLock.Unlock() @@ -123,7 +123,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { } } - closer := y.NewCloser(1) + closer := z.NewCloser(1) switch id { case opRollup: if len(n.ops) > 0 { @@ -158,7 +158,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { n.ops[id] = closer glog.Infof("Operation started with id: %s", id) - go func(id op, closer *y.Closer) { + go func(id op, closer *z.Closer) { closer.Wait() stopTask(id) }(id, closer) @@ -225,8 +225,8 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * // to maintain quorum health. applyCh: make(chan []*pb.Proposal, 1000), elog: trace.NewEventLog("Dgraph", "ApplyCh"), - closer: y.NewCloser(4), // Matches CLOSER:1 - ops: make(map[op]*y.Closer), + closer: z.NewCloser(4), // Matches CLOSER:1 + ops: make(map[op]*z.Closer), } if x.WorkerConfig.LudicrousMode { n.ex = newExecutor(&m.Applied) @@ -970,7 +970,7 @@ func (n *node) Run() { go n.ReportRaftComms() if x.WorkerConfig.LudicrousMode { - closer := y.NewCloser(2) + closer := z.NewCloser(2) defer closer.SignalAndWait() go x.StoreSync(n.Store, closer) go x.StoreSync(pstore, closer) diff --git a/worker/executor.go b/worker/executor.go index 8e087878ab8..50faf975d92 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -42,14 +43,14 @@ type executor struct { sync.RWMutex predChan map[string]chan *subMutation - closer *y.Closer + closer *z.Closer applied *y.WaterMark } func newExecutor(applied *y.WaterMark) *executor { ex := &executor{ predChan: make(map[string]chan *subMutation), - closer: y.NewCloser(0), + closer: z.NewCloser(0), applied: applied, } go ex.shutdown() diff --git a/worker/groups.go b/worker/groups.go index c51cb03052e..9924ee20124 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" @@ -35,6 +34,7 @@ import ( "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -51,7 +51,7 @@ type groupi struct { tablets map[string]*pb.Tablet triggerCh chan struct{} // Used to trigger membership sync blockDeletes *sync.Mutex // Ensure that deletion won't happen when move is going on. - closer *y.Closer + closer *z.Closer // Group checksum is used to determine if the tablets served by the groups have changed from // the membership information that the Alpha has. If so, Alpha cannot service a read. @@ -154,7 +154,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { x.UpdateHealthStatus(true) glog.Infof("Server is ready") - gr.closer = y.NewCloser(3) // Match CLOSER:1 in this file. + gr.closer = z.NewCloser(3) // Match CLOSER:1 in this file. go gr.sendMembershipUpdates() go gr.receiveMembershipUpdates() go gr.processOracleDeltaStream() @@ -1074,7 +1074,7 @@ func askZeroForEE() bool { // SubscribeForUpdates will listen for updates for the given group. func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), group uint32, - closer *y.Closer) { + closer *z.Closer) { defer closer.Done() for { diff --git a/worker/mutation.go b/worker/mutation.go index 0258f177b27..1dc1a5e7a33 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -29,7 +29,6 @@ import ( otrace "go.opencensus.io/trace" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/conn" @@ -38,6 +37,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -155,7 +155,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // done is used to ensure that we only stop the indexing task once. var done uint32 start := time.Now() - stopIndexing := func(closer *y.Closer) { + stopIndexing := func(closer *z.Closer) { // runSchemaMutation can return. stopIndexing could be called by goroutines. if !schema.State().IndexingInProgress() { if atomic.CompareAndSwapUint32(&done, 0, 1) { diff --git a/worker/server_state.go b/worker/server_state.go index 4609a96c454..663bccc688d 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -24,10 +24,10 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -38,7 +38,7 @@ type ServerState struct { Pstore *badger.DB WALstore *badger.DB - gcCloser *y.Closer // closer for valueLogGC + gcCloser *z.Closer // closer for valueLogGC needTs chan tsReq } @@ -184,7 +184,7 @@ func (s *ServerState) initStorage() { opt.EncryptionKey = nil } - s.gcCloser = y.NewCloser(2) + s.gcCloser = z.NewCloser(2) go x.RunVlogGC(s.Pstore, s.gcCloser) go x.RunVlogGC(s.WALstore, s.gcCloser) } diff --git a/x/x.go b/x/x.go index 086c494b995..3f7429bd482 100644 --- a/x/x.go +++ b/x/x.go @@ -38,9 +38,9 @@ import ( "time" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" @@ -880,7 +880,7 @@ func IsGuardian(groups []string) bool { // RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes. // Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute. -func RunVlogGC(store *badger.DB, closer *y.Closer) { +func RunVlogGC(store *badger.DB, closer *z.Closer) { defer closer.Done() // Get initial size on start. _, lastVlogSize := store.Size() @@ -921,7 +921,7 @@ type DB interface { Sync() error } -func StoreSync(db DB, closer *y.Closer) { +func StoreSync(db DB, closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(1 * time.Second) for {