From 028783889501fe5c514df9270242e9264e476ca8 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 4 Sep 2020 20:53:33 +0530 Subject: [PATCH] Use z.Closer instead of y.Closer (#6394) The closer type was moved from badger/y to ristretto/z in https://github.com/dgraph-io/ristretto/pull/191 --- conn/node.go | 3 ++- conn/pool.go | 6 +++--- dgraph/cmd/alpha/run.go | 16 ++++++++-------- dgraph/cmd/bulk/reduce.go | 9 +++++---- dgraph/cmd/zero/license.go | 4 ++-- dgraph/cmd/zero/license_ee.go | 4 ++-- dgraph/cmd/zero/raft.go | 12 ++++++------ dgraph/cmd/zero/run.go | 6 +++--- dgraph/cmd/zero/zero.go | 6 +++--- edgraph/access.go | 6 +++--- edgraph/access_ee.go | 7 +++---- edgraph/server.go | 4 ++-- go.mod | 2 +- go.sum | 2 ++ graphql/admin/admin.go | 6 +++--- graphql/admin/shutdown.go | 4 ++-- posting/lists.go | 8 ++++---- posting/mvcc.go | 3 +-- raftwal/storage.go | 6 +++--- worker/draft.go | 20 ++++++++++---------- worker/executor.go | 5 +++-- worker/groups.go | 8 ++++---- worker/mutation.go | 4 ++-- worker/server_state.go | 6 +++--- x/x.go | 6 +++--- 25 files changed, 83 insertions(+), 80 deletions(-) diff --git a/conn/node.go b/conn/node.go index a27fecab972..01416f0400c 100644 --- a/conn/node.go +++ b/conn/node.go @@ -38,6 +38,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error { } // RunReadIndexLoop runs the RAFT index in a loop. -func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) { +func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) { defer closer.Done() readIndex := func(activeRctx []byte) (uint64, error) { // Read Request can get rejected then we would wait indefinitely on the channel diff --git a/conn/pool.go b/conn/pool.go index d81d3d36812..a822b2a4474 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -21,10 +21,10 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "go.opencensus.io/plugin/ocgrpc" @@ -50,7 +50,7 @@ type Pool struct { lastEcho time.Time Addr string - closer *y.Closer + closer *z.Closer healthInfo pb.HealthInfo } @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) { if err != nil { return nil, err } - pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)} + pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)} go pl.MonitorHealth() return pl, nil } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index a9a53c123df..a91a30f5712 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -35,7 +35,6 @@ import ( "time" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/ee/enc" @@ -47,6 +46,7 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "github.com/spf13/cast" @@ -389,7 +389,7 @@ func setupListener(addr string, port int) (net.Listener, error) { return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port)) } -func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { +func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { defer closer.Done() x.RegisterExporters(Alpha.Conf, "dgraph.alpha") @@ -412,7 +412,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { s.Stop() } -func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { +func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { defer closer.Done() srv := &http.Server{ ReadTimeout: 10 * time.Second, @@ -435,7 +435,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { } } -func setupServer(closer *y.Closer) { +func setupServer(closer *z.Closer) { go worker.RunServer(bindall) // For pb.communication. laddr := "localhost" @@ -564,7 +564,7 @@ func setupServer(closer *y.Closer) { http.HandleFunc("/ui/keywords", keywordHandler) // Initialize the servers. - admin.ServerCloser = y.NewCloser(3) + admin.ServerCloser = z.NewCloser(3) go serveGRPC(grpcListener, tlsCfg, admin.ServerCloser) go serveHTTP(httpListener, tlsCfg, admin.ServerCloser) @@ -783,7 +783,7 @@ func run() { } }() - updaters := y.NewCloser(4) + updaters := z.NewCloser(4) go func() { worker.StartRaftNodes(worker.State.WALstore, bindall) atomic.AddUint32(&initDone, 1) @@ -809,7 +809,7 @@ func run() { // Graphql subscribes to alpha to get schema updates. We need to close that before we // close alpha. This closer is for closing and waiting that subscription. - adminCloser := y.NewCloser(1) + adminCloser := z.NewCloser(1) setupServer(adminCloser) glog.Infoln("GRPC and HTTP stopped.") @@ -835,7 +835,7 @@ func run() { } // listenForCorsUpdate listen for any cors change and update the accepeted cors. -func listenForCorsUpdate(closer *y.Closer) { +func listenForCorsUpdate(closer *z.Closer) { prefix := x.DataKey("dgraph.cors", 0) // Remove uid from the key, to get the correct prefix prefix = prefix[:len(prefix)-8] diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 02e4f8c51fa..aeb8451e721 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -43,6 +43,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type reducer struct { @@ -333,7 +334,7 @@ func (r *reducer) streamIdFor(pred string) uint32 { return streamId } -func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() for req := range entryCh { @@ -383,7 +384,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s x.Check(ci.splitWriter.Flush()) } -func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() // Concurrently write split lists to a temporary badger. @@ -456,14 +457,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou fmt.Printf("Num CPUs: %d\n", cpu) encoderCh := make(chan *encodeRequest, 2*cpu) writerCh := make(chan *encodeRequest, 2*cpu) - encoderCloser := y.NewCloser(cpu) + encoderCloser := z.NewCloser(cpu) for i := 0; i < cpu; i++ { // Start listening to encode entries // For time being let's lease 100 stream id for each encoder. go r.encode(encoderCh, encoderCloser) } // Start listening to write the badger list. - writerCloser := y.NewCloser(1) + writerCloser := z.NewCloser(1) go r.startWriting(ci, writerCh, writerCloser) for i := 0; i < len(partitionKeys); i++ { diff --git a/dgraph/cmd/zero/license.go b/dgraph/cmd/zero/license.go index 84f0c75e4b6..6de7366fe6b 100644 --- a/dgraph/cmd/zero/license.go +++ b/dgraph/cmd/zero/license.go @@ -21,8 +21,8 @@ package zero import ( "net/http" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" ) // dummy function as enterprise features are not available in oss binary. @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error { } // periodically checks the validity of the enterprise license and updates the membership state. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { closer.Done() } diff --git a/dgraph/cmd/zero/license_ee.go b/dgraph/cmd/zero/license_ee.go index 29ff9f64c70..95f8ef3c3d9 100644 --- a/dgraph/cmd/zero/license_ee.go +++ b/dgraph/cmd/zero/license_ee.go @@ -20,9 +20,9 @@ import ( "net/http" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/gogo/protobuf/proto" "github.com/golang/glog" @@ -61,7 +61,7 @@ func (s *Server) expireLicense() { // periodically checks the validity of the enterprise license and // 1. Sets license.Enabled to false in membership state if license has expired. // 2. Prints out warning once every day a week before the license is set to expire. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { defer closer.Done() interval := 5 * time.Second diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index cd82893353d..1a549447362 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -28,10 +28,10 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" farm "github.com/dgryski/go-farm" "github.com/golang/glog" "github.com/google/uuid" @@ -44,7 +44,7 @@ type node struct { *conn.Node server *Server ctx context.Context - closer *y.Closer // to stop Run. + closer *z.Closer // to stop Run. // The last timestamp when this Zero was able to reach quorum. mu sync.RWMutex @@ -582,7 +582,7 @@ func (n *node) initAndStartNode() error { return nil } -func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { +func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -599,7 +599,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01)) -func (n *node) checkQuorum(closer *y.Closer) { +func (n *node) checkQuorum(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -637,7 +637,7 @@ func (n *node) checkQuorum(closer *y.Closer) { } } -func (n *node) snapshotPeriodically(closer *y.Closer) { +func (n *node) snapshotPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -679,7 +679,7 @@ func (n *node) Run() { // snapshot can cause select loop to block while deleting entries, so run // it in goroutine readStateCh := make(chan raft.ReadState, 100) - closer := y.NewCloser(5) + closer := z.NewCloser(5) defer func() { closer.SignalAndWait() n.closer.Done() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index bdb2578d2bc..12dd2003758 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -35,12 +35,12 @@ import ( "github.com/dgraph-io/badger/v2" bopt "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/spf13/cobra" ) @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { m.Cfg.DisableProposalForwarding = true st.rs = conn.NewRaftServer(m) - st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)} + st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)} st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node} st.zero.Init() st.node.server = st.zero @@ -302,7 +302,7 @@ func run() { x.Checkf(err, "Error while opening WAL store") defer kv.Close() - gcCloser := y.NewCloser(1) // closer for vLogGC + gcCloser := z.NewCloser(1) // closer for vLogGC go x.RunVlogGC(kv, gcCloser) defer gcCloser.SignalAndWait() diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index ed6b2bb184e..9f326fca396 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -26,12 +26,12 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/telemetry" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -66,7 +66,7 @@ type Server struct { // groupMap map[uint32]*Group nextGroup uint32 leaderChangeCh chan struct{} - closer *y.Closer // Used to tell stream to close. + closer *z.Closer // Used to tell stream to close. connectLock sync.Mutex // Used to serialize connect requests from servers. moveOngoing chan struct{} @@ -89,7 +89,7 @@ func (s *Server) Init() { s.nextTxnTs = 1 s.nextGroup = 1 s.leaderChangeCh = make(chan struct{}, 1) - s.closer = y.NewCloser(2) // grpc and http + s.closer = z.NewCloser(2) // grpc and http s.blockCommitsOn = new(sync.Map) s.moveOngoing = make(chan struct{}, 1) diff --git a/edgraph/access.go b/edgraph/access.go index ee3929d91de..879c7d6bbe4 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -21,11 +21,11 @@ package edgraph import ( "context" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -42,12 +42,12 @@ func (s *Server) Login(ctx context.Context, } // ResetAcl is an empty method since ACL is only supported in the enterprise version. -func ResetAcl(closer *y.Closer) { +func ResetAcl(closer *z.Closer) { // do nothing } // ResetAcls is an empty method since ACL is only supported in the enterprise version. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { // do nothing <-closer.HasBeenClosed() closer.Done() diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 85d88f6d1cf..284a6d5b723 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -20,13 +20,12 @@ import ( "time" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" "github.com/dgraph-io/dgraph/query" "github.com/pkg/errors" - "github.com/dgraph-io/badger/v2/y" - "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/gql" @@ -305,7 +304,7 @@ func authorizeUser(ctx context.Context, userid string, password string) ( } // RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { defer func() { glog.Infoln("RefreshAcls closed") closer.Done() @@ -368,7 +367,7 @@ const queryAcls = ` ` // ResetAcl clears the aclCachePtr and upserts the Groot account. -func ResetAcl(closer *y.Closer) { +func ResetAcl(closer *z.Closer) { defer func() { glog.Infof("ResetAcl closed") closer.Done() diff --git a/edgraph/server.go b/edgraph/server.go index 68207344c5c..0b97223d1d1 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -42,7 +42,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/chunker" @@ -59,6 +58,7 @@ import ( "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -1551,7 +1551,7 @@ func isDropAll(op *api.Operation) bool { // ResetCors make the dgraph to accept all the origins if no origins were given // by the users. -func ResetCors(closer *y.Closer) { +func ResetCors(closer *z.Closer) { defer func() { glog.Infof("ResetCors closed") closer.Done() diff --git a/go.mod b/go.mod index 55663fdef75..ec7b5d90614 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200828220306-806a325a0462 github.com/dgraph-io/dgo/v200 v200.0.0-20200805103119-a3544c464dd6 github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 - github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b + github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 diff --git a/go.sum b/go.sum index aacf84bf675..f5bc91623ce 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1: github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b h1:/g8jOqvD1UzHTOwENtkqcLmMLzTcN18P3ut8aSUZ45g= github.com/dgraph-io/ristretto v0.0.4-0.20200820164438-623d8ef1614b/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index f0eda1c718c..0206133aba1 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -26,7 +26,6 @@ import ( "github.com/pkg/errors" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/graphql/resolve" "github.com/dgraph-io/dgraph/graphql/schema" @@ -35,6 +34,7 @@ import ( "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -425,7 +425,7 @@ type adminServer struct { // NewServers initializes the GraphQL servers. It sets up an empty server for the // main /graphql endpoint and an admin server. The result is mainServer, adminServer. -func NewServers(withIntrospection bool, globalEpoch *uint64, closer *y.Closer) (web.IServeGraphQL, +func NewServers(withIntrospection bool, globalEpoch *uint64, closer *z.Closer) (web.IServeGraphQL, web.IServeGraphQL, *GraphQLHealthStore) { gqlSchema, err := schema.FromString("") if err != nil { @@ -454,7 +454,7 @@ func newAdminResolver( fns *resolve.ResolverFns, withIntrospection bool, epoch *uint64, - closer *y.Closer) *resolve.RequestResolver { + closer *z.Closer) *resolve.RequestResolver { adminSchema, err := schema.FromString(graphqlAdminSchema) if err != nil { diff --git a/graphql/admin/shutdown.go b/graphql/admin/shutdown.go index 5fb6bf63344..e3d3469c16d 100644 --- a/graphql/admin/shutdown.go +++ b/graphql/admin/shutdown.go @@ -19,16 +19,16 @@ package admin import ( "context" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/graphql/resolve" "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) var ( // ServerCloser is used to signal and wait for other goroutines to return gracefully after user // requests shutdown. - ServerCloser *y.Closer + ServerCloser *z.Closer ) func resolveShutdown(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) { diff --git a/posting/lists.go b/posting/lists.go index 1c1e463f168..a9e2cd330c5 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -31,9 +31,9 @@ import ( ostats "go.opencensus.io/stats" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/ristretto" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/dgraph-io/dgraph/protos/pb" @@ -90,7 +90,7 @@ func getMemUsage() int { return rss * os.Getpagesize() } -func updateMemoryMetrics(lc *y.Closer) { +func updateMemoryMetrics(lc *z.Closer) { defer lc.Done() ticker := time.NewTicker(time.Minute) defer ticker.Stop() @@ -130,14 +130,14 @@ func updateMemoryMetrics(lc *y.Closer) { var ( pstore *badger.DB - closer *y.Closer + closer *z.Closer lCache *ristretto.Cache ) // Init initializes the posting lists package, the in memory and dirty list hash. func Init(ps *badger.DB, cacheSize int64) { pstore = ps - closer = y.NewCloser(1) + closer = z.NewCloser(1) go updateMemoryMetrics(closer) // Initialize cache. diff --git a/posting/mvcc.go b/posting/mvcc.go index a0bfc16f7dc..50e67cc3f72 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -105,7 +104,7 @@ func (ir *incrRollupi) addKeyToBatch(key []byte) { } // Process will rollup batches of 64 keys in a go routine. -func (ir *incrRollupi) Process(closer *y.Closer) { +func (ir *incrRollupi) Process(closer *z.Closer) { defer closer.Done() writer := NewTxnWriter(pstore) diff --git a/raftwal/storage.go b/raftwal/storage.go index a3ad8bc4b1b..be3e399869e 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -23,9 +23,9 @@ import ( "sync" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -42,7 +42,7 @@ type DiskStorage struct { elog trace.EventLog cache *sync.Map - Closer *y.Closer + Closer *z.Closer indexRangeChan chan indexRange } @@ -57,7 +57,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { id: id, gid: gid, cache: new(sync.Map), - Closer: y.NewCloser(1), + Closer: z.NewCloser(1), indexRangeChan: make(chan indexRange, 16), } if prev, err := RaftId(db); err != nil || prev != id { diff --git a/worker/draft.go b/worker/draft.go index e3899fadc83..e8fd2293ef0 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -38,7 +38,6 @@ import ( otrace "go.opencensus.io/trace" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/posting" @@ -47,6 +46,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type node struct { @@ -60,12 +60,12 @@ type node struct { applyCh chan []*pb.Proposal ctx context.Context gid uint32 - closer *y.Closer + closer *z.Closer streaming int32 // Used to avoid calculating snapshot // Used to track the ops going on in the system. - ops map[op]*y.Closer + ops map[op]*z.Closer opsLock sync.Mutex canCampaign bool @@ -107,7 +107,7 @@ const ( // Restore operations have preference and cancel all other operations, not just rollups. // You should only call Done() on the returned closer. Calling other functions (such as // SignalAndWait) for closer could result in panics. For more details, see GitHub issue #5034. -func (n *node) startTask(id op) (*y.Closer, error) { +func (n *node) startTask(id op) (*z.Closer, error) { n.opsLock.Lock() defer n.opsLock.Unlock() @@ -126,7 +126,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { } } - closer := y.NewCloser(1) + closer := z.NewCloser(1) switch id { case opRollup: if len(n.ops) > 0 { @@ -172,7 +172,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { n.ops[id] = closer glog.Infof("Operation started with id: %s", id) - go func(id op, closer *y.Closer) { + go func(id op, closer *z.Closer) { closer.Wait() stopTask(id) }(id, closer) @@ -239,8 +239,8 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * // to maintain quorum health. applyCh: make(chan []*pb.Proposal, 1000), elog: trace.NewEventLog("Dgraph", "ApplyCh"), - closer: y.NewCloser(4), // Matches CLOSER:1 - ops: make(map[op]*y.Closer), + closer: z.NewCloser(4), // Matches CLOSER:1 + ops: make(map[op]*z.Closer), } if x.WorkerConfig.LudicrousMode { n.ex = newExecutor(&m.Applied, x.WorkerConfig.LudicrousConcurrency) @@ -619,7 +619,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { defer x.UpdateDrainingMode(false) var err error - var closer *y.Closer + var closer *z.Closer closer, err = n.startTask(opRestore) if err != nil { return errors.Wrapf(err, "cannot start restore task") @@ -1032,7 +1032,7 @@ func (n *node) Run() { go n.ReportRaftComms() if x.WorkerConfig.LudicrousMode { - closer := y.NewCloser(2) + closer := z.NewCloser(2) defer closer.SignalAndWait() go x.StoreSync(n.Store, closer) go x.StoreSync(pstore, closer) diff --git a/worker/executor.go b/worker/executor.go index c9285f680fe..108454a1931 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -32,6 +32,7 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/dgryski/go-farm" "github.com/golang/glog" ) @@ -49,7 +50,7 @@ type executor struct { sync.RWMutex predChan map[string]chan *subMutation - closer *y.Closer + closer *z.Closer applied *y.WaterMark throttle *y.Throttle } @@ -57,7 +58,7 @@ type executor struct { func newExecutor(applied *y.WaterMark, conc int) *executor { ex := &executor{ predChan: make(map[string]chan *subMutation), - closer: y.NewCloser(0), + closer: z.NewCloser(0), applied: applied, throttle: y.NewThrottle(conc), // Run conc threads mutations at a time. } diff --git a/worker/groups.go b/worker/groups.go index 41ae827763b..4f93f7fbfb2 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" @@ -35,6 +34,7 @@ import ( "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -48,7 +48,7 @@ type groupi struct { tablets map[string]*pb.Tablet triggerCh chan struct{} // Used to trigger membership sync blockDeletes *sync.Mutex // Ensure that deletion won't happen when move is going on. - closer *y.Closer + closer *z.Closer // Group checksum is used to determine if the tablets served by the groups have changed from // the membership information that the Alpha has. If so, Alpha cannot service a read. @@ -148,7 +148,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { x.UpdateHealthStatus(true) glog.Infof("Server is ready") - gr.closer = y.NewCloser(3) // Match CLOSER:1 in this file. + gr.closer = z.NewCloser(3) // Match CLOSER:1 in this file. go gr.sendMembershipUpdates() go gr.receiveMembershipUpdates() go gr.processOracleDeltaStream() @@ -1095,7 +1095,7 @@ func (g *groupi) askZeroForEE() bool { // SubscribeForUpdates will listen for updates for the given group. func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), - group uint32, closer *y.Closer) { + group uint32, closer *z.Closer) { var prefix []byte if len(prefixes) > 0 { diff --git a/worker/mutation.go b/worker/mutation.go index aa91fede442..30da23a6164 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -31,7 +31,6 @@ import ( otrace "go.opencensus.io/trace" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" @@ -40,6 +39,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -153,7 +153,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // done is used to ensure that we only stop the indexing task once. var done uint32 start := time.Now() - stopIndexing := func(closer *y.Closer) { + stopIndexing := func(closer *z.Closer) { // runSchemaMutation can return. stopIndexing could be called by goroutines. if !schema.State().IndexingInProgress() { if atomic.CompareAndSwapUint32(&done, 0, 1) { diff --git a/worker/server_state.go b/worker/server_state.go index 23d78a8bc02..af8c140fe3d 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -24,9 +24,9 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -36,7 +36,7 @@ type ServerState struct { Pstore *badger.DB WALstore *badger.DB - gcCloser *y.Closer // closer for valueLogGC + gcCloser *z.Closer // closer for valueLogGC needTs chan tsReq } @@ -183,7 +183,7 @@ func (s *ServerState) initStorage() { opt.EncryptionKey = nil } - s.gcCloser = y.NewCloser(2) + s.gcCloser = z.NewCloser(2) go x.RunVlogGC(s.Pstore, s.gcCloser) go x.RunVlogGC(s.WALstore, s.gcCloser) } diff --git a/x/x.go b/x/x.go index ac755d0c55b..274185f61b7 100644 --- a/x/x.go +++ b/x/x.go @@ -41,9 +41,9 @@ import ( "google.golang.org/grpc/peer" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" @@ -1016,7 +1016,7 @@ func IsGuardian(groups []string) bool { // RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes. // Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute. -func RunVlogGC(store *badger.DB, closer *y.Closer) { +func RunVlogGC(store *badger.DB, closer *z.Closer) { defer closer.Done() // Get initial size on start. _, lastVlogSize := store.Size() @@ -1057,7 +1057,7 @@ type DB interface { Sync() error } -func StoreSync(db DB, closer *y.Closer) { +func StoreSync(db DB, closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(1 * time.Second) for {