Skip to content

Commit

Permalink
Use z.Closer instead of y.Closer (#6394) (#6398)
Browse files Browse the repository at this point in the history
The closer type was moved from badger/y to ristretto/z in dgraph-io/ristretto#191

(cherry picked from commit 0287838)
  • Loading branch information
Ibrahim Jarif authored Sep 4, 2020
1 parent 13ad2af commit 69f8cd7
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 73 deletions.
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

var (
Expand Down Expand Up @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error {
}

// RunReadIndexLoop runs the RAFT index in a loop.
func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func(activeRctx []byte) (uint64, error) {
// Read Request can get rejected then we would wait indefinitely on the channel
Expand Down
6 changes: 3 additions & 3 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -50,7 +50,7 @@ type Pool struct {

lastEcho time.Time
Addr string
closer *y.Closer
closer *z.Closer
healthInfo pb.HealthInfo
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
Expand Down
9 changes: 4 additions & 5 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"syscall"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/ee/enc"
Expand All @@ -43,7 +42,7 @@ import (
"github.com/dgraph-io/dgraph/tok"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"

"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cast"
Expand Down Expand Up @@ -429,7 +428,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) {
}
}

func setupServer(closer *y.Closer) {
func setupServer(closer *z.Closer) {
go worker.RunServer(bindall) // For pb.communication.

laddr := "localhost"
Expand Down Expand Up @@ -713,7 +712,7 @@ func run() {
}()

// Setup external communication.
aclCloser := y.NewCloser(1)
aclCloser := z.NewCloser(1)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
// initialization of the admin account can only be done after raft nodes are running
Expand All @@ -724,7 +723,7 @@ func run() {

// Graphql subscribes to alpha to get schema updates. We need to close that before we
// close alpha. This closer is for closing and waiting that subscription.
adminCloser := y.NewCloser(1)
adminCloser := z.NewCloser(1)

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
Expand Down
9 changes: 5 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

type reducer struct {
Expand Down Expand Up @@ -334,7 +335,7 @@ func (r *reducer) streamIdFor(pred string) uint32 {
return streamId
}

func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()
for req := range entryCh {

Expand Down Expand Up @@ -384,7 +385,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
Expand Down Expand Up @@ -457,14 +458,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
fmt.Printf("Num CPUs: %d\n", cpu)
encoderCh := make(chan *encodeRequest, 2*cpu)
writerCh := make(chan *encodeRequest, 2*cpu)
encoderCloser := y.NewCloser(cpu)
encoderCloser := z.NewCloser(cpu)
for i := 0; i < cpu; i++ {
// Start listening to encode entries
// For time being let's lease 100 stream id for each encoder.
go r.encode(encoderCh, encoderCloser)
}
// Start listening to write the badger list.
writerCloser := y.NewCloser(1)
writerCloser := z.NewCloser(1)
go r.startWriting(ci, writerCh, writerCloser)

for i := 0; i < len(partitionKeys); i++ {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package zero
import (
"net/http"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"
)

// dummy function as enterprise features are not available in oss binary.
Expand All @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error {
}

// periodically checks the validity of the enterprise license and updates the membership state.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
closer.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"net/http"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *Server) expireLicense() {
// periodically checks the validity of the enterprise license and
// 1. Sets license.Enabled to false in membership state if license has expired.
// 2. Prints out warning once every day a week before the license is set to expire.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
defer closer.Done()

interval := 5 * time.Second
Expand Down
12 changes: 6 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/google/uuid"
Expand All @@ -44,7 +44,7 @@ type node struct {
*conn.Node
server *Server
ctx context.Context
closer *y.Closer // to stop Run.
closer *z.Closer // to stop Run.

// The last timestamp when this Zero was able to reach quorum.
mu sync.RWMutex
Expand Down Expand Up @@ -533,7 +533,7 @@ func (n *node) initAndStartNode() error {
return nil
}

func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -550,7 +550,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {

var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))

func (n *node) checkQuorum(closer *y.Closer) {
func (n *node) checkQuorum(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -588,7 +588,7 @@ func (n *node) checkQuorum(closer *y.Closer) {
}
}

func (n *node) snapshotPeriodically(closer *y.Closer) {
func (n *node) snapshotPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -630,7 +630,7 @@ func (n *node) Run() {
// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 100)
closer := y.NewCloser(5)
closer := z.NewCloser(5)
defer func() {
closer.SignalAndWait()
n.closer.Done()
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (

"github.com/dgraph-io/badger/v2"
bopt "github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
m.Cfg.DisableProposalForwarding = true
st.rs = conn.NewRaftServer(m)

st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)}
st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)}
st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node}
st.zero.Init()
st.node.server = st.zero
Expand Down Expand Up @@ -301,7 +301,7 @@ func run() {
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

gcCloser := y.NewCloser(1) // closer for vLogGC
gcCloser := z.NewCloser(1) // closer for vLogGC
go x.RunVlogGC(kv, gcCloser)
defer gcCloser.SignalAndWait()

Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Server struct {
// groupMap map[uint32]*Group
nextGroup uint32
leaderChangeCh chan struct{}
closer *y.Closer // Used to tell stream to close.
closer *z.Closer // Used to tell stream to close.
connectLock sync.Mutex // Used to serialize connect requests from servers.

moveOngoing chan struct{}
Expand All @@ -89,7 +89,7 @@ func (s *Server) Init() {
s.nextTxnTs = 1
s.nextGroup = 1
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = y.NewCloser(2) // grpc and http
s.closer = z.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)

Expand Down
4 changes: 2 additions & 2 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package edgraph
import (
"context"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
)

Expand All @@ -46,7 +46,7 @@ func ResetAcl() {
}

// ResetAcls is an empty method since ACL is only supported in the enterprise version.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
// do nothing
<-closer.HasBeenClosed()
closer.Done()
Expand Down
9 changes: 4 additions & 5 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/dgraph-io/badger/v2/y"

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"

jwt "github.com/dgrijalva/jwt-go"
"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -295,7 +294,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (
}

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
defer closer.Done()
if len(worker.Config.HmacSecret) == 0 {
// the acl feature is not turned on
Expand Down
Empty file added genre.rdf
Empty file.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4
github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 h1:DUDFTV
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7 h1:9oFXHEReyRIB291rbdGwRk1PYegGO2XBtZ8muXPKqPA=
github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I=
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
Expand Down
6 changes: 3 additions & 3 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
"github.com/pkg/errors"

badgerpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/graphql/web"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

const (
Expand Down Expand Up @@ -298,7 +298,7 @@ type adminServer struct {

// NewServers initializes the GraphQL servers. It sets up an empty server for the
// main /graphql endpoint and an admin server. The result is mainServer, adminServer.
func NewServers(withIntrospection bool, closer *y.Closer) (web.IServeGraphQL, web.IServeGraphQL) {
func NewServers(withIntrospection bool, closer *z.Closer) (web.IServeGraphQL, web.IServeGraphQL) {
gqlSchema, err := schema.FromString("")
if err != nil {
x.Panic(err)
Expand All @@ -325,7 +325,7 @@ func newAdminResolver(
gqlServer web.IServeGraphQL,
fns *resolve.ResolverFns,
withIntrospection bool,
closer *y.Closer) *resolve.RequestResolver {
closer *z.Closer) *resolve.RequestResolver {

adminSchema, err := schema.FromString(graphqlAdminSchema)
if err != nil {
Expand Down
Loading

0 comments on commit 69f8cd7

Please sign in to comment.