Skip to content

Commit

Permalink
Use z.Closer instead of y.Closer (#6394)
Browse files Browse the repository at this point in the history
The closer type was moved from badger/y to ristretto/z in dgraph-io/ristretto#191
  • Loading branch information
Ibrahim Jarif authored Sep 4, 2020
1 parent 902ea49 commit 0287838
Show file tree
Hide file tree
Showing 25 changed files with 83 additions and 80 deletions.
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

var (
Expand Down Expand Up @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error {
}

// RunReadIndexLoop runs the RAFT index in a loop.
func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func(activeRctx []byte) (uint64, error) {
// Read Request can get rejected then we would wait indefinitely on the channel
Expand Down
6 changes: 3 additions & 3 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -50,7 +50,7 @@ type Pool struct {

lastEcho time.Time
Addr string
closer *y.Closer
closer *z.Closer
healthInfo pb.HealthInfo
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
Expand Down
16 changes: 8 additions & 8 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"time"

badgerpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/ee/enc"
Expand All @@ -47,6 +46,7 @@ import (
"github.com/dgraph-io/dgraph/tok"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cast"
Expand Down Expand Up @@ -389,7 +389,7 @@ func setupListener(addr string, port int) (net.Listener, error) {
return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port))
}

func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()

x.RegisterExporters(Alpha.Conf, "dgraph.alpha")
Expand All @@ -412,7 +412,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
s.Stop()
}

func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()
srv := &http.Server{
ReadTimeout: 10 * time.Second,
Expand All @@ -435,7 +435,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
}
}

func setupServer(closer *y.Closer) {
func setupServer(closer *z.Closer) {
go worker.RunServer(bindall) // For pb.communication.

laddr := "localhost"
Expand Down Expand Up @@ -564,7 +564,7 @@ func setupServer(closer *y.Closer) {
http.HandleFunc("/ui/keywords", keywordHandler)

// Initialize the servers.
admin.ServerCloser = y.NewCloser(3)
admin.ServerCloser = z.NewCloser(3)
go serveGRPC(grpcListener, tlsCfg, admin.ServerCloser)
go serveHTTP(httpListener, tlsCfg, admin.ServerCloser)

Expand Down Expand Up @@ -783,7 +783,7 @@ func run() {
}
}()

updaters := y.NewCloser(4)
updaters := z.NewCloser(4)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
atomic.AddUint32(&initDone, 1)
Expand All @@ -809,7 +809,7 @@ func run() {

// Graphql subscribes to alpha to get schema updates. We need to close that before we
// close alpha. This closer is for closing and waiting that subscription.
adminCloser := y.NewCloser(1)
adminCloser := z.NewCloser(1)

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
Expand All @@ -835,7 +835,7 @@ func run() {
}

// listenForCorsUpdate listen for any cors change and update the accepeted cors.
func listenForCorsUpdate(closer *y.Closer) {
func listenForCorsUpdate(closer *z.Closer) {
prefix := x.DataKey("dgraph.cors", 0)
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
Expand Down
9 changes: 5 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

type reducer struct {
Expand Down Expand Up @@ -333,7 +334,7 @@ func (r *reducer) streamIdFor(pred string) uint32 {
return streamId
}

func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()
for req := range entryCh {

Expand Down Expand Up @@ -383,7 +384,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
Expand Down Expand Up @@ -456,14 +457,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
fmt.Printf("Num CPUs: %d\n", cpu)
encoderCh := make(chan *encodeRequest, 2*cpu)
writerCh := make(chan *encodeRequest, 2*cpu)
encoderCloser := y.NewCloser(cpu)
encoderCloser := z.NewCloser(cpu)
for i := 0; i < cpu; i++ {
// Start listening to encode entries
// For time being let's lease 100 stream id for each encoder.
go r.encode(encoderCh, encoderCloser)
}
// Start listening to write the badger list.
writerCloser := y.NewCloser(1)
writerCloser := z.NewCloser(1)
go r.startWriting(ci, writerCh, writerCloser)

for i := 0; i < len(partitionKeys); i++ {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package zero
import (
"net/http"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"
)

// dummy function as enterprise features are not available in oss binary.
Expand All @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error {
}

// periodically checks the validity of the enterprise license and updates the membership state.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
closer.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"net/http"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *Server) expireLicense() {
// periodically checks the validity of the enterprise license and
// 1. Sets license.Enabled to false in membership state if license has expired.
// 2. Prints out warning once every day a week before the license is set to expire.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
defer closer.Done()

interval := 5 * time.Second
Expand Down
12 changes: 6 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/google/uuid"
Expand All @@ -44,7 +44,7 @@ type node struct {
*conn.Node
server *Server
ctx context.Context
closer *y.Closer // to stop Run.
closer *z.Closer // to stop Run.

// The last timestamp when this Zero was able to reach quorum.
mu sync.RWMutex
Expand Down Expand Up @@ -582,7 +582,7 @@ func (n *node) initAndStartNode() error {
return nil
}

func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -599,7 +599,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {

var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))

func (n *node) checkQuorum(closer *y.Closer) {
func (n *node) checkQuorum(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -637,7 +637,7 @@ func (n *node) checkQuorum(closer *y.Closer) {
}
}

func (n *node) snapshotPeriodically(closer *y.Closer) {
func (n *node) snapshotPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -679,7 +679,7 @@ func (n *node) Run() {
// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 100)
closer := y.NewCloser(5)
closer := z.NewCloser(5)
defer func() {
closer.SignalAndWait()
n.closer.Done()
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (

"github.com/dgraph-io/badger/v2"
bopt "github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
m.Cfg.DisableProposalForwarding = true
st.rs = conn.NewRaftServer(m)

st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)}
st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)}
st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node}
st.zero.Init()
st.node.server = st.zero
Expand Down Expand Up @@ -302,7 +302,7 @@ func run() {
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

gcCloser := y.NewCloser(1) // closer for vLogGC
gcCloser := z.NewCloser(1) // closer for vLogGC
go x.RunVlogGC(kv, gcCloser)
defer gcCloser.SignalAndWait()

Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Server struct {
// groupMap map[uint32]*Group
nextGroup uint32
leaderChangeCh chan struct{}
closer *y.Closer // Used to tell stream to close.
closer *z.Closer // Used to tell stream to close.
connectLock sync.Mutex // Used to serialize connect requests from servers.

moveOngoing chan struct{}
Expand All @@ -89,7 +89,7 @@ func (s *Server) Init() {
s.nextTxnTs = 1
s.nextGroup = 1
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = y.NewCloser(2) // grpc and http
s.closer = z.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)

Expand Down
6 changes: 3 additions & 3 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package edgraph
import (
"context"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
)

Expand All @@ -42,12 +42,12 @@ func (s *Server) Login(ctx context.Context,
}

// ResetAcl is an empty method since ACL is only supported in the enterprise version.
func ResetAcl(closer *y.Closer) {
func ResetAcl(closer *z.Closer) {
// do nothing
}

// ResetAcls is an empty method since ACL is only supported in the enterprise version.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
// do nothing
<-closer.HasBeenClosed()
closer.Done()
Expand Down
7 changes: 3 additions & 4 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"time"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"

"github.com/dgraph-io/dgraph/query"

"github.com/pkg/errors"

"github.com/dgraph-io/badger/v2/y"

"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/gql"
Expand Down Expand Up @@ -305,7 +304,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (
}

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
defer func() {
glog.Infoln("RefreshAcls closed")
closer.Done()
Expand Down Expand Up @@ -368,7 +367,7 @@ const queryAcls = `
`

// ResetAcl clears the aclCachePtr and upserts the Groot account.
func ResetAcl(closer *y.Closer) {
func ResetAcl(closer *z.Closer) {
defer func() {
glog.Infof("ResetAcl closed")
closer.Done()
Expand Down
Loading

0 comments on commit 0287838

Please sign in to comment.