Skip to content

Commit

Permalink
Refactoring: StartRaftLoggingSchedule not use goroutine
Browse files Browse the repository at this point in the history
Signed-off-by: awzhgw <guowl18702995996@gmail.com>
awzhgw committed Aug 16, 2019
1 parent 2a2fa5d commit e65d12d
Showing 25 changed files with 107 additions and 110 deletions.
2 changes: 1 addition & 1 deletion datanode/partition.go
Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk) (dp *DataPartition

go dp.StartRaftLoggingSchedule()
go dp.StartRaftAfterRepair()
go dp.ForceLoadHeader()
dp.ForceLoadHeader()

// persist file metadata
err = dp.PersistMetadata()
64 changes: 31 additions & 33 deletions datanode/partition_raft.go
Original file line number Diff line number Diff line change
@@ -125,46 +125,44 @@ func (dp *DataPartition) StartRaftLoggingSchedule() {

log.LogDebugf("[startSchedule] hello DataPartition schedule")

go func(stopC chan bool) {
for {
select {
case <-stopC:
log.LogDebugf("[startSchedule] stop partition=%v", dp.partitionID)
getAppliedIDTimer.Stop()
truncateRaftLogTimer.Stop()
storeAppliedIDTimer.Stop()
return
for {
select {
case <-dp.stopC:
log.LogDebugf("[startSchedule] stop partition=%v", dp.partitionID)
getAppliedIDTimer.Stop()
truncateRaftLogTimer.Stop()
storeAppliedIDTimer.Stop()
return

case extentID := <-dp.stopRaftC:
dp.stopRaft()
log.LogErrorf("action[ExtentRepair] stop raft partition=%v_%v", dp.partitionID, extentID)
case extentID := <-dp.stopRaftC:
dp.stopRaft()
log.LogErrorf("action[ExtentRepair] stop raft partition=%v_%v", dp.partitionID, extentID)

case <-getAppliedIDTimer.C:
if dp.raftPartition != nil {
go dp.updateMaxMinAppliedID()
}
getAppliedIDTimer.Reset(time.Minute * 1)
case <-getAppliedIDTimer.C:
if dp.raftPartition != nil {
dp.updateMaxMinAppliedID()
}
getAppliedIDTimer.Reset(time.Minute * 1)

case <-truncateRaftLogTimer.C:
if dp.raftPartition == nil {
break
}
case <-truncateRaftLogTimer.C:
if dp.raftPartition == nil {
break
}

if dp.minAppliedID > dp.lastTruncateID { // Has changed
go dp.raftPartition.Truncate(dp.minAppliedID)
dp.lastTruncateID = dp.minAppliedID
}
truncateRaftLogTimer.Reset(time.Minute * 10)
if dp.minAppliedID > dp.lastTruncateID { // Has changed
go dp.raftPartition.Truncate(dp.minAppliedID)
dp.lastTruncateID = dp.minAppliedID
}
truncateRaftLogTimer.Reset(time.Minute * 10)

case <-storeAppliedIDTimer.C:
if err := dp.storeAppliedID(dp.appliedID); err != nil {
err = errors.NewErrorf("[startSchedule]: dump partition=%d: %v", dp.config.PartitionID, err.Error())
log.LogErrorf(err.Error())
}
storeAppliedIDTimer.Reset(time.Second * 10)
case <-storeAppliedIDTimer.C:
if err := dp.storeAppliedID(dp.appliedID); err != nil {
err = errors.NewErrorf("[startSchedule]: dump partition=%d: %v", dp.config.PartitionID, err.Error())
log.LogErrorf(err.Error())
}
storeAppliedIDTimer.Reset(time.Second * 10)
}
}(dp.stopC)
}
}

// StartRaftAfterRepair starts the raft after repairing a partition.
4 changes: 2 additions & 2 deletions datanode/server_handler.go
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ func (s *DataNode) getPartitionAPI(w http.ResponseWriter, r *http.Request) {
Replicas []string `json:"replicas"`
TinyDeleteRecordSize int64 `json:"tinyDeleteRecordSize"`
TinyExtentRealSize map[uint64]int64 `json:"tinyExtentRealSize"`
RaftStatus *raft.Status `json: "raftStatus"`
RaftStatus *raft.Status `json: "raftStatus"`
}{
VolName: partition.volumeID,
ID: partition.partitionID,
@@ -173,7 +173,7 @@ func (s *DataNode) getPartitionAPI(w http.ResponseWriter, r *http.Request) {
Replicas: partition.Replicas(),
TinyDeleteRecordSize: tinyDeleteRecordSize,
TinyExtentRealSize: partition.extentStore.GetAllTinyExtentsRealSize(),
RaftStatus: partition.raftPartition.Status(),
RaftStatus: partition.raftPartition.Status(),
}
s.buildSuccessResp(w, result)
}
6 changes: 3 additions & 3 deletions datanode/space_manager.go
Original file line number Diff line number Diff line change
@@ -282,15 +282,15 @@ func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionR
// DeletePartition deletes a partition based on the partition id.
func (manager *SpaceManager) DeletePartition(dpID uint64) {
manager.partitionMutex.Lock()
dp:=manager.partitions[dpID]
if dp==nil {
dp := manager.partitions[dpID]
if dp == nil {
return
}
delete(manager.partitions, dpID)
dp.Stop()
dp.Disk().DetachDataPartition(dp)
manager.partitionMutex.Unlock()

os.RemoveAll(dp.Path())
}

10 changes: 5 additions & 5 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
@@ -163,6 +163,7 @@ func (s *DataNode) handlePacketToCreateDataPartition(p *repl.Packet) {
err = fmt.Errorf("from master Task[%v] cannot unmash CreateDataPartitionRequest struct", task.ToString())
return
}
p.PartitionID=request.PartitionId
if dp, err = s.space.CreatePartition(request); err != nil {
err = fmt.Errorf("from master Task[%v] cannot create Partition err(%v)", task.ToString(), err)
return
@@ -218,7 +219,6 @@ func (s *DataNode) handleHeartbeatPacket(p *repl.Packet) {
}
}()


}

// Handle OpDeleteDataPartition packet.
@@ -740,7 +740,7 @@ func (s *DataNode) handlePacketToDecommissionDataPartition(p *repl.Packet) {
defer func() {
if err != nil {
p.PackErrorBody(ActionDecommissionPartition, err.Error())
}else {
} else {
p.PacketOkReply()
}
}()
@@ -761,14 +761,14 @@ func (s *DataNode) handlePacketToDecommissionDataPartition(p *repl.Packet) {
return
}
dp := s.space.Partition(req.PartitionId)
if dp==nil{
err=fmt.Errorf("partition %v not exsit",req.PartitionId)
if dp == nil {
err = fmt.Errorf("partition %v not exsit", req.PartitionId)
return
}

isRaftLeader, err = s.forwardToRaftLeader(dp, p)
if !isRaftLeader {
err=raft.ErrNotLeader
err = raft.ErrNotLeader
return
}

4 changes: 2 additions & 2 deletions master/admin_task_manager.go
Original file line number Diff line number Diff line change
@@ -41,8 +41,8 @@ type AdminTaskManager struct {
targetAddr string
TaskMap map[string]*proto.AdminTask
sync.RWMutex
exitCh chan struct{}
connPool *util.ConnectPool
exitCh chan struct{}
connPool *util.ConnectPool
}

func newAdminTaskManager(targetAddr, clusterID string) (sender *AdminTaskManager) {
18 changes: 9 additions & 9 deletions master/api_service_test.go
Original file line number Diff line number Diff line change
@@ -15,21 +15,21 @@
package master

import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"testing"
_ "net/http/pprof"
"github.com/chubaofs/chubaofs/master/mocktest"
"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/util/config"
"github.com/chubaofs/chubaofs/util/log"
"strings"
"time"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"crypto/md5"
"encoding/hex"
"github.com/chubaofs/chubaofs/proto"
"encoding/json"
"strings"
"testing"
"time"
)

const (
4 changes: 2 additions & 2 deletions master/cluster.go
Original file line number Diff line number Diff line change
@@ -732,7 +732,7 @@ func (c *Cluster) delDataNodeFromCache(dataNode *DataNode) {
func (c *Cluster) decommissionDataPartition(offlineAddr string, dp *DataPartition, errMsg string) (err error) {
var (
targetHosts []string
newHosts [] string
newHosts []string
newAddr string
newPeers []proto.Peer
msg string
@@ -815,7 +815,7 @@ func (c *Cluster) decommissionDataPartition(offlineAddr string, dp *DataPartitio
c.Name, dp.PartitionID, offlineAddr, newAddr, dp.Hosts)
return
errHandler:
msg = fmt.Sprintf(errMsg + " clusterID[%v] partitionID:%v on Node:%v "+
msg = fmt.Sprintf(errMsg+" clusterID[%v] partitionID:%v on Node:%v "+
"Then Fix It on newHost:%v Err:%v , PersistenceHosts:%v ",
c.Name, dp.PartitionID, offlineAddr, newAddr, err, dp.Hosts)
if err != nil {
2 changes: 1 addition & 1 deletion master/cluster_stat.go
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ package master
import (
"fmt"
"github.com/chubaofs/chubaofs/util"
"strconv"
"github.com/chubaofs/chubaofs/util/log"
"strconv"
)

type nodeStatInfo struct {
2 changes: 1 addition & 1 deletion master/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package master

import (
"testing"
"fmt"
"testing"
"time"
)

16 changes: 8 additions & 8 deletions master/data_node.go
Original file line number Diff line number Diff line change
@@ -25,14 +25,14 @@ import (

// DataNode stores all the information about a data node
type DataNode struct {
Total uint64 `json:"TotalWeight"`
Used uint64 `json:"UsedWeight"`
AvailableSpace uint64
ID uint64
RackName string `json:"Rack"`
Addr string
ReportTime time.Time
isActive bool
Total uint64 `json:"TotalWeight"`
Used uint64 `json:"UsedWeight"`
AvailableSpace uint64
ID uint64
RackName string `json:"Rack"`
Addr string
ReportTime time.Time
isActive bool
sync.RWMutex
UsageRatio float64 // used / total space
SelectedTimes uint64 // number times that this datanode has been selected as the location for a data partition.
6 changes: 3 additions & 3 deletions master/data_node_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package master

import (
"time"
"testing"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"testing"
"time"
)

func TestDataNode(t *testing.T) {
// /dataNode/add and /dataNode/response processed by mock data server
addr := "127.0.0.1:9096"
addDataServer(addr,DefaultRackName)
addDataServer(addr, DefaultRackName)
server.cluster.checkDataNodeHeartbeat()
time.Sleep(5 * time.Second)
getDataNodeInfo(addr, t)
16 changes: 8 additions & 8 deletions master/data_partition.go
Original file line number Diff line number Diff line change
@@ -28,14 +28,14 @@ import (

// DataPartition represents the structure of storing the file contents.
type DataPartition struct {
PartitionID uint64
LastLoadedTime int64
ReplicaNum uint8
Status int8
isRecover bool
Replicas []*DataReplica
Hosts []string // host addresses
Peers []proto.Peer
PartitionID uint64
LastLoadedTime int64
ReplicaNum uint8
Status int8
isRecover bool
Replicas []*DataReplica
Hosts []string // host addresses
Peers []proto.Peer
sync.RWMutex
total uint64
used uint64
6 changes: 3 additions & 3 deletions master/data_partition_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package master

import (
"time"
"testing"
"fmt"
"github.com/chubaofs/chubaofs/util"
"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/util"
"testing"
"time"
)

func TestDataPartition(t *testing.T) {
4 changes: 2 additions & 2 deletions master/meta_node_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package master

import (
"testing"
"time"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"testing"
"time"
)

func TestMetaNode(t *testing.T) {
2 changes: 1 addition & 1 deletion master/meta_partition_manager.go
Original file line number Diff line number Diff line change
@@ -16,9 +16,9 @@ package master

import (
"fmt"
"github.com/chubaofs/chubaofs/util/log"
"strconv"
"time"
"github.com/chubaofs/chubaofs/util/log"
)

func (c *Cluster) startCheckLoadMetaPartitions() {
4 changes: 2 additions & 2 deletions master/meta_partition_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package master

import (
"testing"
"time"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"testing"
"time"
)

func TestMetaPartition(t *testing.T) {
10 changes: 5 additions & 5 deletions master/mocktest/data_server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package mocktest

import (
"io/ioutil"
"net"
"net/http"
"bytes"
"encoding/json"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"encoding/json"
"github.com/chubaofs/chubaofs/util"
"bytes"
"io/ioutil"
"net"
"net/http"
)

type MockDataServer struct {
Loading

0 comments on commit e65d12d

Please sign in to comment.