Skip to content

Commit

Permalink
fix: when datanode is killed,then maxBaseExtentId so low
Browse files Browse the repository at this point in the history
Signed-off-by: awzhgw <guowl18702995996@gmail.com>
  • Loading branch information
awzhgw committed Jun 19, 2019
1 parent 1956e07 commit e8552fe
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
17 changes: 12 additions & 5 deletions datanode/wrap_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,28 @@ func (s *DataNode) checkPartition(p *repl.Packet) (err error) {
func (s *DataNode) addExtentInfo(p *repl.Packet) error {
partition := p.Object.(*DataPartition)
store := p.Object.(*DataPartition).ExtentStore()
var (
extentID uint64
err error
)
if isLeaderPacket(p) && p.ExtentType == proto.TinyExtentType && isWriteOperation(p) {
extentID, err := store.GetAvailableTinyExtent()
extentID, err = store.GetAvailableTinyExtent()
if err != nil {
return err
return fmt.Errorf("addExtentInfo partition %v GetAvailableTinyExtent error %v", p.PartitionID,err.Error())
}
p.ExtentID = extentID
p.ExtentOffset, err = store.GetTinyExtentOffset(extentID)
if err != nil {
return err
return fmt.Errorf("addExtentInfo partition %v %v GetTinyExtentOffset error %v", p.PartitionID,extentID,err.Error())
}
} else if isLeaderPacket(p) && isCreateExtentOperation(p) {
if partition.GetExtentCount() >= storage.MaxExtentCount*3 {
return fmt.Errorf("partition %v has reached maxExtentId", p.PartitionID)
return fmt.Errorf("addExtentInfo partition %v has reached maxExtentId", p.PartitionID)
}
p.ExtentID,err = store.NextExtentID()
if err!=nil {
return fmt.Errorf("addExtentInfo partition %v alloc NextExtentId error %v", p.PartitionID,err)
}
p.ExtentID = store.NextExtentID()
} else if isLeaderPacket(p) && isMarkDeleteExtentOperation(p) && isTinyExtentType(p) {
record := new(proto.TinyExtentDeleteRecord)
if err := json.Unmarshal(p.Data[:p.Size], record); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions storage/extent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,10 @@ func (s *ExtentStore) ReadTinyDeleteRecords(offset, size int64, data []byte) (cr
// NextExtentID returns the next extentID. When the client sends the request to create an extent,
// this function generates an unique extentID within the current partition.
// This function can only be called by the leader.
func (s *ExtentStore) NextExtentID() (extentID uint64) {
return atomic.AddUint64(&s.baseExtentID, 1)
func (s *ExtentStore) NextExtentID() (extentID uint64,err error) {
extentID=atomic.AddUint64(&s.baseExtentID, 1)
err=s.PersistenceBaseExtentID(extentID)
return
}

func (s *ExtentStore) NextTinyDeleteFileOffset() (offset int64) {
Expand All @@ -651,7 +653,7 @@ func (s *ExtentStore) UpdateBaseExtentID(id uint64) (err error) {
if IsTinyExtent(id) {
return
}
if id >= atomic.LoadUint64(&s.baseExtentID) {
if id > atomic.LoadUint64(&s.baseExtentID) {
atomic.StoreUint64(&s.baseExtentID, id)
err = s.PersistenceBaseExtentID(atomic.LoadUint64(&s.baseExtentID))
}
Expand Down

0 comments on commit e8552fe

Please sign in to comment.