Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4304-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 30, 2021
2 parents e4a556f + c135477 commit 546365a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 18 deletions.
51 changes: 33 additions & 18 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,27 +1334,42 @@ func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, dat
if err != nil {
return err
}
var errs []error
for _, member := range resp.Members {
clientUrls := member.GetClientUrls()
if len(clientUrls) == 0 {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err))
return errs.ErrClientURLEmpty.FastGenByArgs()
}
url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(data))
req.Header.Set("PD-Allow-follower-handle", "true")
res, err := s.httpClient.Do(req)
if err != nil {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err))
return errs.ErrSendRequest.Wrap(err).GenWithStackByCause()
}
// Since we don't read the body, we can close it immediately.
res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Int("status-code", res.StatusCode))
return errs.ErrSendRequest.FastGenByArgs()
if err := s.replicateFileToMember(ctx, member, name, data); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
// join all error messages
for _, e := range errs[1:] {
errs[0] = errors.Wrap(errs[0], e.Error())
}
return errs[0]
}

func (s *Server) replicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
clientUrls := member.GetClientUrls()
if len(clientUrls) == 0 {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()))
return errs.ErrClientURLEmpty.FastGenByArgs()
}
url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(data))
req.Header.Set("PD-Allow-follower-handle", "true")
res, err := s.httpClient.Do(req)
if err != nil {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err))
return errs.ErrSendRequest.Wrap(err).GenWithStackByCause()
}
// Since we don't read the body, we can close it immediately.
res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Int("status-code", res.StatusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
return nil
}

Expand Down
33 changes: 33 additions & 0 deletions tests/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package server_test

import (
"context"
"io/ioutil"
"path/filepath"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -142,3 +144,34 @@ func (s *serverTestSuite) TestLeader(c *C) {
return leader != leader1
})
}

func (s *serverTestSuite) TestPersistFile(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 3)
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)

leader := cluster.WaitLeader()
c.Assert(leader, Not(Equals), "")

follower := cluster.GetFollower()
err = cluster.GetServer(follower).Stop()
c.Assert(err, IsNil)

content := `{"bar": 42}`
err = cluster.GetServer(leader).GetServer().ReplicateFileToAllMembers(context.Background(), "foo", []byte(content))
c.Assert(err, NotNil)

// check files are persisted except for the killed one.
for _, conf := range cluster.GetConfig().InitialServers {
data, err := ioutil.ReadFile(filepath.Join(conf.DataDir, "foo"))
if conf.Name == follower {
c.Assert(err, NotNil)
} else {
c.Assert(err, IsNil)
c.Assert(data, BytesEquals, []byte(content))
}
}
}

0 comments on commit 546365a

Please sign in to comment.