Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable FSM batching through optionally implemented interface #364

Merged
merged 13 commits into from
Oct 2, 2019
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ TEST_RESULTS_DIR?=/tmp/test-results

test:
go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .

integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ .

ci.test-norace:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest

ci.test:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest .

ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest .

fuzz:
go test $(TESTARGS) -timeout=20m ./fuzzy
go test $(TESTARGS) -timeout=500s ./fuzzy
go test $(TESTARGS) -timeout=500s -tags batchtest ./fuzzy

deps:
go get -t -d -v ./...
Expand Down
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
entry.Data = EncodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,19 +342,19 @@ func decodePeers(buf []byte, trans Transport) Configuration {
}
}

// encodeConfiguration serializes a Configuration using MsgPack, or panics on
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors.
func encodeConfiguration(configuration Configuration) []byte {
func EncodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration)
if err != nil {
panic(fmt.Errorf("failed to encode configuration: %v", err))
}
return buf.Bytes()
}

// decodeConfiguration deserializes a Configuration using MsgPack, or panics on
// DecodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors.
func decodeConfiguration(buf []byte) Configuration {
func DecodeConfiguration(buf []byte) Configuration {
var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err))
Expand Down
2 changes: 1 addition & 1 deletion configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestConfiguration_encodeDecodePeers(t *testing.T) {
}

func TestConfiguration_encodeDecodeConfiguration(t *testing.T) {
decoded := decodeConfiguration(encodeConfiguration(sampleConfiguration))
decoded := DecodeConfiguration(EncodeConfiguration(sampleConfiguration))
if !reflect.DeepEqual(sampleConfiguration, decoded) {
t.Fatalf("mismatch %v %v", sampleConfiguration, decoded)
}
Expand Down
95 changes: 89 additions & 6 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ type FSM interface {
Restore(io.ReadCloser) error
}

// BatchingFSM extends the FSM interface to add an ApplyBatch function. This can
// optionally be implemented by clients to enable multiple logs to be applied to
// the FSM in batches. Up to MaxAppendEntries could be sent in a batch.
type BatchingFSM interface {
// ApplyBatch is invoked once a batch of log entries has been committed and
// are ready to be applied to the FSM. ApplyBatch will take in an array of
// log entries. These log entries will be in the order they were committed,
// will not have gaps, and could be of a few log types. Clients should check
// the log type prior to attempting to decode the data attached. Presently
// the LogCommand and LogConfiguration types will be sent.
//
// The returned slice must be the same length as the input and each response
// should correlate to the log at the same index of the input. The returned
// values will be made available in the ApplyFuture returned by Raft.Apply
// method if that method was called on the same Raft node as the FSM.
briankassouf marked this conversation as resolved.
Show resolved Hide resolved
ApplyBatch([]*Log) []interface{}

FSM
}

// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
Expand All @@ -49,7 +69,10 @@ type FSMSnapshot interface {
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64

commit := func(req *commitTuple) {
batchingFSM, batchingEnabled := r.fsm.(BatchingFSM)
configStore, configStoreEnabled := r.fsm.(ConfigurationStore)

commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change
var resp interface{}
// Make sure we send a response
Expand All @@ -68,15 +91,14 @@ func (r *Raft) runFSM() {
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)

case LogConfiguration:
configStore, ok := r.fsm.(ConfigurationStore)
if !ok {
if !configStoreEnabled {
// Return early to avoid incrementing the index and term for
// an unimplemented operation.
return
}

start := time.Now()
configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data))
configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data))
metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start)
}

Expand All @@ -85,6 +107,67 @@ func (r *Raft) runFSM() {
lastTerm = req.log.Term
}

commitBatch := func(reqs []*commitTuple) {
if !batchingEnabled {
for _, ct := range reqs {
commitSingle(ct)
}
return
}

// Only send LogCommand and LogConfiguration log types. LogBarrier types
// will not be sent to the FSM.
shouldSend := func(l *Log) bool {
switch l.Type {
case LogCommand, LogConfiguration:
return true
}
return false
}

var lastBatchIndex, lastBatchTerm uint64
sendLogs := make([]*Log, 0, len(reqs))
for _, req := range reqs {
if shouldSend(req.log) {
sendLogs = append(sendLogs, req.log)
}
lastBatchIndex = req.log.Index
lastBatchTerm = req.log.Term
}

var responses []interface{}
if len(sendLogs) > 0 {
start := time.Now()
responses = batchingFSM.ApplyBatch(sendLogs)
metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start)
metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs)))

// Ensure we get the expected responses
if len(sendLogs) != len(responses) {
panic("invalid number of responses")
}
}

// Update the indexes
lastIndex = lastBatchIndex
lastTerm = lastBatchTerm

var i int
for _, req := range reqs {
var resp interface{}
// If the log was sent to the FSM, retrieve the response.
if shouldSend(req.log) {
resp = responses[i]
i++
}

if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
}

restore := func(req *restoreFuture) {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
Expand Down Expand Up @@ -132,8 +215,8 @@ func (r *Raft) runFSM() {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)
case []*commitTuple:
commitBatch(req)

case *restoreFuture:
restore(req)
Expand Down
17 changes: 17 additions & 0 deletions fuzzy/fsm_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// +build batchtest

package fuzzy

import "github.com/hashicorp/raft"

// ApplyBatch enables fuzzyFSM to satisfy the BatchingFSM interface. This
// function is gated by the batchtest build flag.
func (f *fuzzyFSM) ApplyBatch(logs []*raft.Log) []interface{} {
ret := make([]interface{}, len(logs))

for _, l := range logs {
f.Apply(l)
}

return ret
}
Loading