Skip to content

Commit

Permalink
Support comma separated list of zero addresses in alpha (dgraph-io#5116)
Browse files Browse the repository at this point in the history
Currently, alpha supports specifying only a single zero instance
address via the `--zero` flag in `dgraph alpha` command. This commit
adds supports for a comma-separated list of `zero` addresses in
`dgraph alpha` so that any zero address can be used from the list
of zero addresses if one of them is unavailable.

We'll pick zero addresses starting from the first one in the list and 
try to establish a connection to it. If it fails, we'll pick
the next zero address.

The `dgraph bulk/live` command accepts only a single zero address.
The comma-separated list is only for `dgraph alpha` command.
  • Loading branch information
Ibrahim Jarif authored and dna2github committed Jul 18, 2020
1 parent 92baa63 commit fc4a3c0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 10 deletions.
2 changes: 1 addition & 1 deletion contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestCountIndexConcurrentTxns(t *testing.T) {
require.Error(t, err,
"the txn2 should be aborted due to concurrent update on the count index of <0x01>")

// retry the mutatiton
// retry the mutation
txn3 := dg.NewTxn()
_, err = txn3.Mutate(ctxb, &mu)
x.Check(err)
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ they form a Raft group and provide synchronous replication.
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
"IP_ADDRESS:PORT of a Dgraph Zero.")
"Comma separated list of Dgraph zero addresses of the form IP_ADDRESS:PORT.")
flag.Uint64("idx", 0,
"Optional Raft ID that this Dgraph Alpha will use to join RAFT groups.")
flag.Int("max_retries", -1,
Expand Down Expand Up @@ -575,7 +575,7 @@ func run() {
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
MyAddr: Alpha.Conf.GetString("my"),
ZeroAddr: Alpha.Conf.GetString("zero"),
ZeroAddr: strings.Split(Alpha.Conf.GetString("zero"), ","),
RaftId: cast.ToUint64(Alpha.Conf.GetString("idx")),
WhiteListedIPRanges: ips,
MaxRetries: Alpha.Conf.GetInt("max_retries"),
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *reducer) run() error {
func (r *reducer) createBadger(i int) *badger.DB {
if r.opt.BadgerKeyFile != "" {
// Need to set zero addr in WorkerConfig before checking the license.
x.WorkerConfig.ZeroAddr = r.opt.ZeroAddr
x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr}

if !worker.EnterpriseEnabled() {
// Crash since the enterprise license is not enabled..
Expand Down
17 changes: 13 additions & 4 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}

x.AssertTruef(len(x.WorkerConfig.ZeroAddr) > 0, "Providing dgraphzero address is mandatory.")
x.AssertTruef(x.WorkerConfig.ZeroAddr != x.WorkerConfig.MyAddr,
"Dgraph Zero address and Dgraph address (IP:Port) can't be the same.")
for _, zeroAddr := range x.WorkerConfig.ZeroAddr {
x.AssertTruef(zeroAddr != x.WorkerConfig.MyAddr,
"Dgraph Zero address %s and Dgraph address (IP:Port) %s can't be the same.",
zeroAddr, x.WorkerConfig.MyAddr)
}

if x.WorkerConfig.RaftId == 0 {
id, err := raftwal.RaftId(walStore)
Expand All @@ -113,6 +116,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}
var connState *pb.ConnectionState
var err error

for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
pl := gr.connToZeroLeader()
if pl == nil {
Expand Down Expand Up @@ -646,14 +650,19 @@ func (g *groupi) connToZeroLeader() *conn.Pool {
// No leader found. Let's get the latest membership state from Zero.
delay := connBaseDelay
maxHalfDelay := time.Second
for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289
time.Sleep(delay)
if delay <= maxHalfDelay {
delay *= 2
}

zAddrList := x.WorkerConfig.ZeroAddr
// Pick addresses in round robin manner.
addr := zAddrList[i%len(zAddrList)]

pl := g.AnyServer(0)
if pl == nil {
pl = conn.GetPools().Connect(x.WorkerConfig.ZeroAddr)
pl = conn.GetPools().Connect(addr)
}
if pl == nil {
glog.V(1).Infof("No healthy Zero server found. Retrying...")
Expand Down
6 changes: 4 additions & 2 deletions x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type WorkerOptions struct {
Tracing float64
// MyAddr stores the address and port for this alpha.
MyAddr string
// ZeroAddr stores the address and port for the zero instance associated with this alpha.
ZeroAddr string
// ZeroAddr stores the list of address:port for the zero instances associated with this alpha.
// Alpha would communicate via only one zero address from the list. All
// the other addresses serve as fallback.
ZeroAddr []string
// RaftId represents the id of this alpha instance for participating in the RAFT
// consensus protocol.
RaftId uint64
Expand Down

0 comments on commit fc4a3c0

Please sign in to comment.