From c9ab88902387701231d2536d7b1ae297b64ba89a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 4 Jun 2013 19:15:09 -0700 Subject: [PATCH 1/2] close tcp connection after finish --- raftd.go | 45 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/raftd.go b/raftd.go index f00072e..510c312 100644 --- a/raftd.go +++ b/raftd.go @@ -109,15 +109,17 @@ func main() { server.Initialize() } else { join(server) + fmt.Println("success join") } } - + go server.Snapshot() // Create HTTP interface. r := mux.NewRouter() r.HandleFunc("/join", JoinHttpHandler).Methods("POST") r.HandleFunc("/vote", VoteHttpHandler).Methods("POST") r.HandleFunc("/log", GetLogHttpHandler).Methods("GET") r.HandleFunc("/log/append", AppendEntriesHttpHandler).Methods("POST") + r.HandleFunc("/snapshot", SnapshotHttpHandler).Methods("POST") r.HandleFunc("/files/{filename}", ReadFileHttpHandler).Methods("GET") r.HandleFunc("/files/{filename}", WriteFileHttpHandler).Methods("POST") http.Handle("/", r) @@ -208,10 +210,12 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P debug("[send] POST http://%s/log/append [%d]", peer.Name(), len(req.Entries)) resp, err := http.Post(fmt.Sprintf("http://%s/log/append", peer.Name()), "application/json", &b) if resp != nil { + defer resp.Body.Close() aersp = &raft.AppendEntriesResponse{} if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { return aersp, nil } + } return aersp, fmt.Errorf("raftd: Unable to append entries: %v", err) } @@ -224,14 +228,35 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req debug("[send] POST http://%s/vote", peer.Name()) resp, err := http.Post(fmt.Sprintf("http://%s/vote", peer.Name()), "application/json", &b) if resp != nil { + defer resp.Body.Close() rvrsp := &raft.RequestVoteResponse{} if err = json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { return rvrsp, nil } + } return rvrsp, fmt.Errorf("raftd: Unable to request vote: %v", err) } +// Sends SnapshotRequest RPCs to a peer when the server is the candidate. +func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) (*raft.SnapshotResponse, error) { + var aersp *raft.SnapshotResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + debug("[send] POST http://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) + resp, err := http.Post(fmt.Sprintf("http://%s/snapshot", peer.Name()), "application/json", &b) + if resp != nil { + defer resp.Body.Close() + aersp = &raft.SnapshotResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + + return aersp, nil + } + } + fmt.Println("error send snapshot") + return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err) +} + //-------------------------------------- // HTTP Handlers //-------------------------------------- @@ -281,6 +306,9 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { if resp, _ := server.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) + if !resp.Success { + fmt.Println("append error") + } return } } @@ -288,6 +316,21 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) } +func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.SnapshotRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + debug("[recv] POST http://%s/snapshot/ ", server.Name()) + if resp, _ := server.SnapshotRecovery(aereq.LastIndex, aereq.LastTerm, aereq.MachineState); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + warn("[snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + func WriteFileHttpHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) debug("[recv] POST http://%v/files/%s", server.Name(), vars["filename"]) From a4cace754c180259186e6c41316612e8795b0e40 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 4 Jun 2013 22:54:40 -0700 Subject: [PATCH 2/2] merge and log compaction --- raftd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/raftd.go b/raftd.go index c9aab59..c7a6777 100644 --- a/raftd.go +++ b/raftd.go @@ -112,7 +112,9 @@ func main() { fmt.Println("success join") } } - go server.Snapshot() + // open snapshot + //go server.Snapshot() + // Create HTTP interface. r := mux.NewRouter() r.HandleFunc("/join", JoinHttpHandler).Methods("POST")