Skip to content

Commit

Permalink
Use z.Closer instead of y.Closer (#6394) (#6399)
Browse files Browse the repository at this point in the history
The closer type was moved from badger/y to ristretto/z in dgraph-io/ristretto#191

(cherry picked from commit 0287838)
  • Loading branch information
Ibrahim Jarif authored Sep 4, 2020
1 parent f5ea8e3 commit 710b4c9
Show file tree
Hide file tree
Showing 24 changed files with 81 additions and 78 deletions.
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

var (
Expand Down Expand Up @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error {
}

// RunReadIndexLoop runs the RAFT index in a loop.
func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func(activeRctx []byte) (uint64, error) {
// Read Request can get rejected then we would wait indefinitely on the channel
Expand Down
6 changes: 3 additions & 3 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -50,7 +50,7 @@ type Pool struct {

lastEcho time.Time
Addr string
closer *y.Closer
closer *z.Closer
healthInfo pb.HealthInfo
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
Expand Down
14 changes: 7 additions & 7 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"github.com/dgraph-io/dgraph/graphql/web"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/ee/enc"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/dgraph-io/dgraph/tok"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cast"
Expand Down Expand Up @@ -388,7 +388,7 @@ func setupListener(addr string, port int) (net.Listener, error) {
return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port))
}

func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()

x.RegisterExporters(Alpha.Conf, "dgraph.alpha")
Expand All @@ -411,7 +411,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
s.Stop()
}

func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()
srv := &http.Server{
ReadTimeout: 10 * time.Second,
Expand All @@ -434,7 +434,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
}
}

func setupServer(closer *y.Closer) {
func setupServer(closer *z.Closer) {
go worker.RunServer(bindall) // For pb.communication.

laddr := "localhost"
Expand Down Expand Up @@ -546,7 +546,7 @@ func setupServer(closer *y.Closer) {
http.HandleFunc("/ui/keywords", keywordHandler)

// Initialize the servers.
admin.ServerCloser = y.NewCloser(3)
admin.ServerCloser = z.NewCloser(3)
go serveGRPC(grpcListener, tlsCfg, admin.ServerCloser)
go serveHTTP(httpListener, tlsCfg, admin.ServerCloser)

Expand Down Expand Up @@ -759,7 +759,7 @@ func run() {
}()

// Setup external communication.
aclCloser := y.NewCloser(1)
aclCloser := z.NewCloser(1)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
// initialization of the admin account can only be done after raft nodes are running
Expand All @@ -770,7 +770,7 @@ func run() {

// Graphql subscribes to alpha to get schema updates. We need to close that before we
// close alpha. This closer is for closing and waiting that subscription.
adminCloser := y.NewCloser(1)
adminCloser := z.NewCloser(1)

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
Expand Down
9 changes: 5 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

type reducer struct {
Expand Down Expand Up @@ -333,7 +334,7 @@ func (r *reducer) streamIdFor(pred string) uint32 {
return streamId
}

func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()
for req := range entryCh {

Expand Down Expand Up @@ -383,7 +384,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
Expand Down Expand Up @@ -456,14 +457,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
fmt.Printf("Num CPUs: %d\n", cpu)
encoderCh := make(chan *encodeRequest, 2*cpu)
writerCh := make(chan *encodeRequest, 2*cpu)
encoderCloser := y.NewCloser(cpu)
encoderCloser := z.NewCloser(cpu)
for i := 0; i < cpu; i++ {
// Start listening to encode entries
// For time being let's lease 100 stream id for each encoder.
go r.encode(encoderCh, encoderCloser)
}
// Start listening to write the badger list.
writerCloser := y.NewCloser(1)
writerCloser := z.NewCloser(1)
go r.startWriting(ci, writerCh, writerCloser)

for i := 0; i < len(partitionKeys); i++ {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package zero
import (
"net/http"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"
)

// dummy function as enterprise features are not available in oss binary.
Expand All @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error {
}

// periodically checks the validity of the enterprise license and updates the membership state.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
closer.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"net/http"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *Server) expireLicense() {
// periodically checks the validity of the enterprise license and
// 1. Sets license.Enabled to false in membership state if license has expired.
// 2. Prints out warning once every day a week before the license is set to expire.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
defer closer.Done()

interval := 5 * time.Second
Expand Down
12 changes: 6 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/google/uuid"
Expand All @@ -44,7 +44,7 @@ type node struct {
*conn.Node
server *Server
ctx context.Context
closer *y.Closer // to stop Run.
closer *z.Closer // to stop Run.

// The last timestamp when this Zero was able to reach quorum.
mu sync.RWMutex
Expand Down Expand Up @@ -533,7 +533,7 @@ func (n *node) initAndStartNode() error {
return nil
}

func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -550,7 +550,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {

var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))

func (n *node) checkQuorum(closer *y.Closer) {
func (n *node) checkQuorum(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -588,7 +588,7 @@ func (n *node) checkQuorum(closer *y.Closer) {
}
}

func (n *node) snapshotPeriodically(closer *y.Closer) {
func (n *node) snapshotPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -630,7 +630,7 @@ func (n *node) Run() {
// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 100)
closer := y.NewCloser(5)
closer := z.NewCloser(5)
defer func() {
closer.SignalAndWait()
n.closer.Done()
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (

"github.com/dgraph-io/badger/v2"
bopt "github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
m.Cfg.DisableProposalForwarding = true
st.rs = conn.NewRaftServer(m)

st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)}
st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)}
st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node}
st.zero.Init()
st.node.server = st.zero
Expand Down Expand Up @@ -301,7 +301,7 @@ func run() {
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

gcCloser := y.NewCloser(1) // closer for vLogGC
gcCloser := z.NewCloser(1) // closer for vLogGC
go x.RunVlogGC(kv, gcCloser)
defer gcCloser.SignalAndWait()

Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Server struct {
// groupMap map[uint32]*Group
nextGroup uint32
leaderChangeCh chan struct{}
closer *y.Closer // Used to tell stream to close.
closer *z.Closer // Used to tell stream to close.
connectLock sync.Mutex // Used to serialize connect requests from servers.

moveOngoing chan struct{}
Expand All @@ -89,7 +89,7 @@ func (s *Server) Init() {
s.nextTxnTs = 1
s.nextGroup = 1
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = y.NewCloser(2) // grpc and http
s.closer = z.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)

Expand Down
4 changes: 2 additions & 2 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package edgraph
import (
"context"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
)

Expand All @@ -47,7 +47,7 @@ func ResetAcl() {
}

// ResetAcls is an empty method since ACL is only supported in the enterprise version.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
// do nothing
<-closer.HasBeenClosed()
closer.Done()
Expand Down
5 changes: 2 additions & 3 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"time"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"

"github.com/dgraph-io/dgraph/query"

"github.com/pkg/errors"

"github.com/dgraph-io/badger/v2/y"

"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/gql"
Expand Down Expand Up @@ -305,7 +304,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (
}

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
defer closer.Done()
if len(worker.Config.HmacSecret) == 0 {
// the acl feature is not turned on
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nM
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1:NSl3XXyON9bgmBJSAvr5FPrgILAovtoTs7FwdtaZZq0=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I=
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down
Loading

0 comments on commit 710b4c9

Please sign in to comment.