Skip to content

Commit

Permalink
Merge pull request #10041 from wenjiaswe/automated-cherry-pick-of-#99…
Browse files Browse the repository at this point in the history
…97-upstream-release-3.3

Automated cherry pick of #9997
  • Loading branch information
wenjiaswe authored Oct 3, 2018
2 parents dc02dc2 + d838e24 commit cb57901
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 4 deletions.
19 changes: 17 additions & 2 deletions rafthttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"

pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
Expand Down Expand Up @@ -149,6 +150,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
}
}

const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"

// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
Expand All @@ -159,16 +162,20 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}

w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())

if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}

Expand All @@ -177,19 +184,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))

if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

Expand All @@ -200,9 +210,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))

if err := h.r.Process(context.TODO(), m); err != nil {
Expand All @@ -215,12 +226,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)

snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}

type streamHandler struct {
Expand Down
70 changes: 70 additions & 0 deletions rafthttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,68 @@ var (
[]string{"From"},
)

snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_success",
Help: "Total number of successful snapshot sends",
},
[]string{"To"},
)

snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)

snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"To"},
)

snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_success",
Help: "Total number of successful snapshot receives",
},
[]string{"From"},
)

snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)

snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"From"},
)

rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Expand All @@ -69,5 +131,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)

prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)

prometheus.MustRegister(rtts)
}
11 changes: 9 additions & 2 deletions rafthttp/snapshot_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }

func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()

m := merged.Message
to := types.ID(m.To).String()

body := createSnapBody(merged)
defer body.Close()
Expand Down Expand Up @@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))

sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))

snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}

// post posts the given request.
Expand Down
6 changes: 6 additions & 0 deletions snap/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/coreos/etcd/pkg/fileutil"
)
Expand All @@ -30,14 +31,18 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
start := time.Now()

f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil {
return 0, err
}
var n int64
n, err = io.Copy(f, r)
if err == nil {
fsyncStart := time.Now()
err = fileutil.Fsync(f)
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
}
f.Close()
if err != nil {
Expand All @@ -57,6 +62,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {

plog.Infof("saved database snapshot to disk [total bytes: %d]", n)

snapDBSaveSec.Observe(time.Since(start).Seconds())
return n, nil
}

Expand Down
24 changes: 24 additions & 0 deletions snap/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,33 @@ var (
Help: "The marshalling cost distributions of save called by snapshot.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "save_total_duration_seconds",
Help: "The total latency distributions of v3 snapshot save",

// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
})

snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsyncing .snap.db file",

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
)

func init() {
prometheus.MustRegister(saveDurations)
prometheus.MustRegister(marshallingDurations)
prometheus.MustRegister(snapDBSaveSec)
prometheus.MustRegister(snapDBFsyncSec)
}

0 comments on commit cb57901

Please sign in to comment.