Skip to content
This repository has been archived by the owner on Jan 4, 2023. It is now read-only.

IS-1082: close db gracefully when node terminate abnormally #46

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/dbtest/cli_calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ func (cli *CLI) calculate(dbName string, blockHeight uint64, batchCount uint64)
req.BlockHeight = blockHeight
req.Path = "noiissdata"

core.DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0)
}
core.DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0)
}
16 changes: 14 additions & 2 deletions cmd/icon_rc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/icon-project/rewardcalculator/common"
"github.com/icon-project/rewardcalculator/core"
Expand Down Expand Up @@ -82,14 +84,24 @@ func main() {
}

rcm, err := core.InitManager(&cfg)
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

if err != nil {
log.Panicf("Failed to start RewardCalculator manager. %+v", err)
}

forever := make(chan bool)
go func() {
sig := <-sigs
log.Println("Catch ", sig, "signal")
rcm.Close()
done <- true
}()

go rcm.Loop()

fmt.Printf("[*] To exit press CTRL+C\n")
<-forever
<-done
}
8 changes: 4 additions & 4 deletions core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ type Context struct {
PRepCandidates map[common.Address]*PRepCandidate
GV []*GovernanceVariable

stats *Statistics
Rollback *Rollback
stats *Statistics
CancelCalculation *CancelCalculation

calcDebug *CalcDebug
}
Expand Down Expand Up @@ -581,8 +581,8 @@ func NewContext(dbPath string, dbType string, dbName string, dbCount int, debugC
// Open account DB
isDB.OpenAccountDB()

// make new Rollback stuff
ctx.Rollback = NewRollback()
// make new CancelCalculation stuff
ctx.CancelCalculation = NewCancel()

return ctx, nil
}
Expand Down
11 changes: 9 additions & 2 deletions core/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"math"
"path/filepath"
"sync"
)

const (
Expand Down Expand Up @@ -50,7 +51,8 @@ type manager struct {
server ipc.Server
conn ipc.Connection

ctx *Context
ctx *Context
waitGroup *sync.WaitGroup
}

func (m *manager) Loop() error {
Expand All @@ -69,6 +71,8 @@ func (m *manager) Loop() error {
}

func (m *manager) Close() error {
m.ctx.CancelCalculation.notifyExit()
m.waitGroup.Wait()
if m.clientMode {
m.conn.Close()
} else {
Expand Down Expand Up @@ -98,8 +102,10 @@ func (m *manager) OnClose(c ipc.Connection) error {
func InitManager(cfg *RcConfig) (*manager, error) {

var err error
waitGroup := new(sync.WaitGroup)
m := new(manager)
m.clientMode = cfg.ClientMode
m.waitGroup = waitGroup

// Initialize DB and load context values
m.ctx, err = NewContext(cfg.DBDir, string(db.GoLevelDBBackend), "IScore", cfg.DBCount, cfg.CalcDebugConf)
Expand Down Expand Up @@ -137,6 +143,7 @@ func InitManager(cfg *RcConfig) (*manager, error) {
monitor := new(manager)
monitor.ctx = m.ctx
monitor.monitorMode = true
monitor.waitGroup = waitGroup

srv := ipc.NewServer()
err = srv.Listen("unix", DebugAddress)
Expand All @@ -162,7 +169,7 @@ func reloadIISSData(ctx *Context, dir string) {
req.BlockHeight = reloadBlockHeight

log.Printf("Reload IISS Data. %s", req.Path)
err, _, _, _ := DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, reloadMsgID)
err, _, _, _ := DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, reloadMsgID)

if err != nil {
log.Printf("Failed to reload IISS Data. %s. %v", req.Path, err)
Expand Down
6 changes: 6 additions & 0 deletions core/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ func (rv *ResponseVersion) String() string {
}

func (mh *msgHandler) version(c ipc.Connection, id uint32) error {
mh.mgr.waitGroup.Add(1)
cBI := mh.mgr.ctx.DB.getCurrentBlockInfo()
mh.mgr.waitGroup.Done()
inwonkim marked this conversation as resolved.
Show resolved Hide resolved
return sendVersion(c, MsgVersion, id, cBI.BlockHeight, cBI.BlockHash)
}

Expand Down Expand Up @@ -188,13 +190,15 @@ func (rq *ResponseQuery) String() string {

func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error {
var addr common.Address
mh.mgr.waitGroup.Add(1)
if _, err := codec.MP.UnmarshalFromBytes(data, &addr); err != nil {
return err
}
log.Printf("\t QUERY request: address: %s", addr.String())

resp := DoQuery(mh.mgr.ctx, addr)

mh.mgr.waitGroup.Done()
log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQuery), id, resp.String())
return c.Send(MsgQuery, id, &resp)
}
Expand Down Expand Up @@ -250,6 +254,7 @@ func (resp *ResponseInit) String() string {

func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error {
var blockHeight uint64
mh.mgr.waitGroup.Add(1)
if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil {
return err
}
Expand All @@ -262,6 +267,7 @@ func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error {
resp.Success = false
}

mh.mgr.waitGroup.Done()
log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgINIT), id, resp.String())
return c.Send(MsgINIT, id, &resp)
}
Expand Down
28 changes: 25 additions & 3 deletions core/msg_calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,14 @@ func sendCalculateACK(c ipc.Connection, id uint32, status uint16, blockHeight ui
func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error {
success := true
var req CalculateRequest
mh.mgr.waitGroup.Add(1)
if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil {
return err
}
log.Printf("\t CALCULATE request: %s", req.String())

ctx := mh.mgr.ctx
rollback := ctx.Rollback.GetChannel()
rollback := ctx.CancelCalculation.GetChannel()

// do calculation
err, blockHeight, stats, stateHash := DoCalculate(rollback, ctx, &req, c, id)
Expand All @@ -324,6 +325,7 @@ func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error
}
resp.StateHash = stateHash

mh.mgr.waitGroup.Done()
log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCalculateDone), 0, resp.String())
return c.Send(MsgCalculateDone, 0, &resp)
}
Expand Down Expand Up @@ -441,8 +443,16 @@ func DoCalculate(quit <-chan struct{}, ctx *Context, req *CalculateRequest, c ip
}
wait.Wait()

if quit != ctx.Rollback.GetChannel() {
return &CalcCancelByRollbackError{blockHeight}, blockHeight, nil, nil
if quit != ctx.CancelCalculation.GetChannel() {
var err error
switch ctx.CancelCalculation.cancelCode {
case CancelExit:
return &CalcCancelByExit{blockHeight}, blockHeight, nil, nil
case CancelRollback:
err = &CalcCancelByRollbackError{blockHeight}
inwonkim marked this conversation as resolved.
Show resolved Hide resolved
}
ctx.CancelCalculation.cancelCode = CancelNone
return err, blockHeight, nil, nil
}

// update Statistics
Expand Down Expand Up @@ -894,12 +904,14 @@ func (cs *QueryCalculateStatusResponse) String() string {

func (mh *msgHandler) queryCalculateStatus(c ipc.Connection, id uint32, data []byte) error {
ctx := mh.mgr.ctx
mh.mgr.waitGroup.Add(1)

// send QUERY_CALCULATE_STATUS response
var resp QueryCalculateStatusResponse

DoQueryCalculateStatus(ctx, &resp)

mh.mgr.waitGroup.Done()
log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateStatus), id, resp.String())
return c.Send(MsgQueryCalculateStatus, id, &resp)
}
Expand Down Expand Up @@ -953,6 +965,7 @@ func (cr *QueryCalculateResultResponse) String() string {

func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []byte) error {
var blockHeight uint64
mh.mgr.waitGroup.Add(1)
if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil {
log.Printf("Failed to unmarshal data. err=%+v", err)
return err
Expand All @@ -966,6 +979,7 @@ func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []b

DoQueryCalculateResult(ctx, blockHeight, &resp)

mh.mgr.waitGroup.Done()
log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateResult), id, resp.String())
return c.Send(MsgQueryCalculateResult, id, &resp)
}
Expand Down Expand Up @@ -1014,3 +1028,11 @@ func isCalcCancelByRollback(err error) bool {
_, ok := err.(*CalcCancelByRollbackError)
return ok
}

type CalcCancelByExit struct {
BlockHeight uint64
}

func (e *CalcCancelByExit) Error() string {
return fmt.Sprintf("CALCULATE(%d) was canceled due to RC process shutting down", e.BlockHeight)
}
Loading