From 92f4962fc212c748bec8df785f7f31555dceadfd Mon Sep 17 00:00:00 2001 From: inwonkim Date: Wed, 13 May 2020 16:54:18 +0900 Subject: [PATCH 1/7] Add signal Handler so that close DBs and connection gracefully when get SIGINT or SIGTERM signal --- cmd/icon_rc/main.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/cmd/icon_rc/main.go b/cmd/icon_rc/main.go index 630a13f..d19b99e 100644 --- a/cmd/icon_rc/main.go +++ b/cmd/icon_rc/main.go @@ -6,6 +6,8 @@ import ( "fmt" "log" "os" + "os/signal" + "syscall" "github.com/icon-project/rewardcalculator/common" "github.com/icon-project/rewardcalculator/core" @@ -82,14 +84,24 @@ func main() { } rcm, err := core.InitManager(&cfg) + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + if err != nil { log.Panicf("Failed to start RewardCalculator manager. %+v", err) } - forever := make(chan bool) + go func() { + sig := <-sigs + log.Println("Catch ", sig, "signal") + rcm.Close() + done <- true + }() go rcm.Loop() fmt.Printf("[*] To exit press CTRL+C\n") - <-forever + <-done } From a57c9fa75441b09cea2d290a728c55b26c0ba224 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Fri, 15 May 2020 15:54:57 +0900 Subject: [PATCH 2/7] Exit process after handling message when RC got SIGTERM or SIGINT signal. * Rename Rollback struct to CancelCalculation. * use CancelCalculation in case rollback, exiting process --- cmd/dbtest/cli_calculate.go | 4 +-- core/context.go | 8 +++--- core/manager.go | 11 ++++++-- core/msg.go | 6 ++++ core/msg_calculate.go | 28 +++++++++++++++++-- core/msg_calculate_test.go | 55 ++++++++++++++++++------------------- core/msg_claim.go | 7 +++++ core/msg_debug.go | 30 ++++++++++++-------- core/msg_rollback.go | 55 ++++++++++++++++++++++++++----------- core/msg_rollback_test.go | 49 ++++++++++++++++----------------- 10 files changed, 161 insertions(+), 92 deletions(-) diff --git a/cmd/dbtest/cli_calculate.go b/cmd/dbtest/cli_calculate.go index 0800cb0..c8ff5a1 100644 --- a/cmd/dbtest/cli_calculate.go +++ b/cmd/dbtest/cli_calculate.go @@ -21,5 +21,5 @@ func (cli *CLI) calculate(dbName string, blockHeight uint64, batchCount uint64) req.BlockHeight = blockHeight req.Path = "noiissdata" - core.DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0) -} \ No newline at end of file + core.DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0) +} diff --git a/core/context.go b/core/context.go index 7696245..93d276c 100644 --- a/core/context.go +++ b/core/context.go @@ -324,8 +324,8 @@ type Context struct { PRepCandidates map[common.Address]*PRepCandidate GV []*GovernanceVariable - stats *Statistics - Rollback *Rollback + stats *Statistics + CancelCalculation *CancelCalculation calcDebug *CalcDebug } @@ -581,8 +581,8 @@ func NewContext(dbPath string, dbType string, dbName string, dbCount int, debugC // Open account DB isDB.OpenAccountDB() - // make new Rollback stuff - ctx.Rollback = NewRollback() + // make new CancelCalculation stuff + ctx.CancelCalculation = NewCancel() return ctx, nil } diff --git a/core/manager.go b/core/manager.go index 7b059ef..9293bdc 100644 --- a/core/manager.go +++ b/core/manager.go @@ -8,6 +8,7 @@ import ( "log" "math" "path/filepath" + "sync" ) const ( @@ -50,7 +51,8 @@ type manager struct { server ipc.Server conn ipc.Connection - ctx *Context + ctx *Context + waitGroup *sync.WaitGroup } func (m *manager) Loop() error { @@ -69,6 +71,8 @@ func (m *manager) Loop() error { } func (m *manager) Close() error { + m.ctx.CancelCalculation.notifyExit() + m.waitGroup.Wait() if m.clientMode { m.conn.Close() } else { @@ -98,8 +102,10 @@ func (m *manager) OnClose(c ipc.Connection) error { func InitManager(cfg *RcConfig) (*manager, error) { var err error + waitGroup := new(sync.WaitGroup) m := new(manager) m.clientMode = cfg.ClientMode + m.waitGroup = waitGroup // Initialize DB and load context values m.ctx, err = NewContext(cfg.DBDir, string(db.GoLevelDBBackend), "IScore", cfg.DBCount, cfg.CalcDebugConf) @@ -137,6 +143,7 @@ func InitManager(cfg *RcConfig) (*manager, error) { monitor := new(manager) monitor.ctx = m.ctx monitor.monitorMode = true + monitor.waitGroup = waitGroup srv := ipc.NewServer() err = srv.Listen("unix", DebugAddress) @@ -162,7 +169,7 @@ func reloadIISSData(ctx *Context, dir string) { req.BlockHeight = reloadBlockHeight log.Printf("Reload IISS Data. %s", req.Path) - err, _, _, _ := DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, reloadMsgID) + err, _, _, _ := DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, reloadMsgID) if err != nil { log.Printf("Failed to reload IISS Data. %s. %v", req.Path, err) diff --git a/core/msg.go b/core/msg.go index 728ac1b..470a95d 100644 --- a/core/msg.go +++ b/core/msg.go @@ -158,7 +158,9 @@ func (rv *ResponseVersion) String() string { } func (mh *msgHandler) version(c ipc.Connection, id uint32) error { + mh.mgr.waitGroup.Add(1) cBI := mh.mgr.ctx.DB.getCurrentBlockInfo() + mh.mgr.waitGroup.Done() return sendVersion(c, MsgVersion, id, cBI.BlockHeight, cBI.BlockHash) } @@ -188,6 +190,7 @@ func (rq *ResponseQuery) String() string { func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { var addr common.Address + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &addr); err != nil { return err } @@ -195,6 +198,7 @@ func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { resp := DoQuery(mh.mgr.ctx, addr) + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQuery), id, resp.String()) return c.Send(MsgQuery, id, &resp) } @@ -250,6 +254,7 @@ func (resp *ResponseInit) String() string { func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { return err } @@ -262,6 +267,7 @@ func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { resp.Success = false } + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgINIT), id, resp.String()) return c.Send(MsgINIT, id, &resp) } diff --git a/core/msg_calculate.go b/core/msg_calculate.go index 69af038..eaf83c8 100644 --- a/core/msg_calculate.go +++ b/core/msg_calculate.go @@ -294,13 +294,14 @@ func sendCalculateACK(c ipc.Connection, id uint32, status uint16, blockHeight ui func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error { success := true var req CalculateRequest + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } log.Printf("\t CALCULATE request: %s", req.String()) ctx := mh.mgr.ctx - rollback := ctx.Rollback.GetChannel() + rollback := ctx.CancelCalculation.GetChannel() // do calculation err, blockHeight, stats, stateHash := DoCalculate(rollback, ctx, &req, c, id) @@ -324,6 +325,7 @@ func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error } resp.StateHash = stateHash + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCalculateDone), 0, resp.String()) return c.Send(MsgCalculateDone, 0, &resp) } @@ -441,8 +443,16 @@ func DoCalculate(quit <-chan struct{}, ctx *Context, req *CalculateRequest, c ip } wait.Wait() - if quit != ctx.Rollback.GetChannel() { - return &CalcCancelByRollbackError{blockHeight}, blockHeight, nil, nil + if quit != ctx.CancelCalculation.GetChannel() { + var err error + switch ctx.CancelCalculation.cancelCode { + case CancelExit: + return &CalcCancelByExit{blockHeight}, blockHeight, nil, nil + case CancelRollback: + err = &CalcCancelByRollbackError{blockHeight} + } + ctx.CancelCalculation.cancelCode = CancelNone + return err, blockHeight, nil, nil } // update Statistics @@ -894,12 +904,14 @@ func (cs *QueryCalculateStatusResponse) String() string { func (mh *msgHandler) queryCalculateStatus(c ipc.Connection, id uint32, data []byte) error { ctx := mh.mgr.ctx + mh.mgr.waitGroup.Add(1) // send QUERY_CALCULATE_STATUS response var resp QueryCalculateStatusResponse DoQueryCalculateStatus(ctx, &resp) + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateStatus), id, resp.String()) return c.Send(MsgQueryCalculateStatus, id, &resp) } @@ -953,6 +965,7 @@ func (cr *QueryCalculateResultResponse) String() string { func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { log.Printf("Failed to unmarshal data. err=%+v", err) return err @@ -966,6 +979,7 @@ func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []b DoQueryCalculateResult(ctx, blockHeight, &resp) + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateResult), id, resp.String()) return c.Send(MsgQueryCalculateResult, id, &resp) } @@ -1014,3 +1028,11 @@ func isCalcCancelByRollback(err error) bool { _, ok := err.(*CalcCancelByRollbackError) return ok } + +type CalcCancelByExit struct { + BlockHeight uint64 +} + +func (e *CalcCancelByExit) Error() string { + return fmt.Sprintf("CALCULATE(%d) was canceled due to RC process shutting down", e.BlockHeight) +} diff --git a/core/msg_calculate_test.go b/core/msg_calculate_test.go index ffbf321..5aec0b8 100644 --- a/core/msg_calculate_test.go +++ b/core/msg_calculate_test.go @@ -50,7 +50,7 @@ func TestMsgCalc_CalculateIISSTX(t *testing.T) { // TX 0: Add new delegation at block height 10 // iconist delegates MinDelegation to prepA and delegates 2 * MinDelegation to prepB - dgDataSlice := []DelegateData { + dgDataSlice := []DelegateData{ {prepA.Address, *common.NewHexIntFromUint64(MinDelegation)}, {prepB.Address, *common.NewHexIntFromUint64(MinDelegation * 2)}, } @@ -61,7 +61,7 @@ func TestMsgCalc_CalculateIISSTX(t *testing.T) { // TX 1: Modify delegation at block height 20 // iconist delegates MinDelegation to prepA and delegates MinDelegation to iconist - dgDataSlice = []DelegateData { + dgDataSlice = []DelegateData{ {prepA.Address, *common.NewHexIntFromUint64(MinDelegation)}, {iconist, *common.NewHexIntFromUint64(MinDelegation)}, } @@ -97,16 +97,16 @@ func TestMsgCalc_CalculateIISSTX(t *testing.T) { h := sha3.NewShake256() iaHash.IScore.SetUint64(3 * MinDelegation * (100 - 10) * minRewardRep / rewardDivider) h.Write(iaHash.BytesForHash()) - iaHash.IScore.SetUint64(3 * MinDelegation * (20 - 10) * minRewardRep / rewardDivider + - MinDelegation * (100 - 20) * minRewardRep / rewardDivider) + iaHash.IScore.SetUint64(3*MinDelegation*(20-10)*minRewardRep/rewardDivider + + MinDelegation*(100-20)*minRewardRep/rewardDivider) h.Write(iaHash.BytesForHash()) - iaHash.IScore.SetUint64(3 * MinDelegation * (20 - 10) * minRewardRep / rewardDivider + - MinDelegation * (30 - 20) * minRewardRep / rewardDivider) + iaHash.IScore.SetUint64(3*MinDelegation*(20-10)*minRewardRep/rewardDivider + + MinDelegation*(30-20)*minRewardRep/rewardDivider) h.Write(iaHash.BytesForHash()) h.Read(stateHash) - reward := 3 * MinDelegation * (20 - 10) * minRewardRep / rewardDivider + - MinDelegation * (30 - 20) * minRewardRep / rewardDivider + reward := 3*MinDelegation*(20-10)*minRewardRep/rewardDivider + + MinDelegation*(30-20)*minRewardRep/rewardDivider assert.Equal(t, uint64(reward), ia.IScore.Uint64()) assert.Equal(t, uint64(reward), stats.Uint64()) @@ -148,7 +148,7 @@ func TestMsgCalc_CalculateIISSTX_small_delegation(t *testing.T) { // TX 0: Add new delegation at block height 10 // iconist delegates MinDelegation - 1 to prepA - dgDataSlice := []DelegateData { + dgDataSlice := []DelegateData{ {prepA.Address, *common.NewHexIntFromUint64(MinDelegation - 1)}, } tx := makeIISSTX(TXDataTypeDelegate, iconist.String(), dgDataSlice) @@ -523,7 +523,6 @@ func testCalculatePRepReward(t *testing.T, revision uint64) { assert.Equal(t, BlockHeight2, ia.BlockHeight) } - totalReward.Add(&totalReward.Int, &reward0.Int) // check stats @@ -559,14 +558,14 @@ func TestMsgCalc_CalculateDB(t *testing.T) { calculateBlockHeight uint64 = 100 - addr1BlockHeight uint64 = 1 - addr1InitIScore = 100 - addr1DelegationToPRepA = 10 + MinDelegation + addr1BlockHeight uint64 = 1 + addr1InitIScore = 100 + addr1DelegationToPRepA = 10 + MinDelegation - addr2BlockHeight uint64 = 10 - addr2InitIScore = 0 - addr2DelegationToPRepA = 20 + MinDelegation - addr2DelegationToPRepB = 30 + MinDelegation + addr2BlockHeight uint64 = 10 + addr2InitIScore = 0 + addr2DelegationToPRepA = 20 + MinDelegation + addr2DelegationToPRepB = 30 + MinDelegation ) ctx := initTest(1) defer finalizeTest(ctx) @@ -632,7 +631,7 @@ func TestMsgCalc_CalculateDB(t *testing.T) { bucket.Set(ia.ID(), ia.Bytes()) // calculate - count, stats, hash := calculateDB(ctx.Rollback.GetChannel(), 0, queryDB, calcDB, ctx, + count, stats, hash := calculateDB(ctx.CancelCalculation.GetChannel(), 0, queryDB, calcDB, ctx, calculateBlockHeight, writeBatchCount) var reward, totalReward uint64 @@ -647,7 +646,7 @@ func TestMsgCalc_CalculateDB(t *testing.T) { return } // calculate delegation reward for P-Rep only - reward = gv.RewardRep.Uint64() * period * addr1DelegationToPRepA / rewardDivider + addr1InitIScore + reward = gv.RewardRep.Uint64()*period*addr1DelegationToPRepA/rewardDivider + addr1InitIScore bucket, _ = calcDB.GetBucket(db.PrefixIScore) bs, _ := bucket.Get(addr1.Bytes()) @@ -668,7 +667,7 @@ func TestMsgCalc_CalculateDB(t *testing.T) { assert.True(t, false) return } - reward = gv.RewardRep.Uint64() * period * (addr2DelegationToPRepA + addr2DelegationToPRepB) / rewardDivider + addr2InitIScore + reward = gv.RewardRep.Uint64()*period*(addr2DelegationToPRepA+addr2DelegationToPRepB)/rewardDivider + addr2InitIScore bs, _ = bucket.Get(addr2.Bytes()) ia, _ = NewIScoreAccountFromBytes(bs) @@ -695,18 +694,18 @@ func TestMsgCalc_DoCalculate_Error(t *testing.T) { defer finalizeTest(ctx) iissDBDir := testDBDir + "/iiss" - req := CalculateRequest{Path: iissDBDir, BlockHeight:100, BlockHash: testHash} + req := CalculateRequest{Path: iissDBDir, BlockHeight: 100, BlockHash: testHash} // get CALCULATE message while processing CALCULATE message ctx.DB.setCalculatingBH(uint64(50)) - err, blockHeight, _, _ := DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0) + err, blockHeight, _, _ := DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "calculating now. drop calculate message"), err) assert.Equal(t, req.BlockHeight, blockHeight) ctx.DB.resetCalculatingBH() // get CALCULATE message with no IISS data - err, blockHeight, _, _ = DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0) + err, blockHeight, _, _ = DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "Failed to load IISS data")) assert.Equal(t, req.BlockHeight, blockHeight) @@ -718,7 +717,7 @@ func TestMsgCalc_DoCalculate_Error(t *testing.T) { // get CALCULATE message with invalid block height ctx.DB.setCalcDoneBH(uint64(200)) - err, blockHeight, _, _ = DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0) + err, blockHeight, _, _ = DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "too low blockHeight")) assert.Equal(t, req.BlockHeight, blockHeight) @@ -726,17 +725,17 @@ func TestMsgCalc_DoCalculate_Error(t *testing.T) { // get CALCULATE message with duplicated block height ctx.DB.setCalcDoneBH(uint64(100)) ctx.DB.setCalculatingBH(uint64(100)) - err, blockHeight, _, _ = DoCalculate(ctx.Rollback.GetChannel(), ctx, &req, nil, 0) + err, blockHeight, _, _ = DoCalculate(ctx.CancelCalculation.GetChannel(), ctx, &req, nil, 0) assert.Error(t, err) assert.True(t, strings.HasPrefix(err.Error(), "duplicated block")) assert.Equal(t, req.BlockHeight, blockHeight) - // Cancel with ROLLBACK + // Rollback with ROLLBACK ctx.DB.setCalcDoneBH(uint64(50)) ctx.DB.setCalculatingBH(uint64(50)) - quitChannel := ctx.Rollback.GetChannel() - ctx.Rollback.notifyRollback() + quitChannel := ctx.CancelCalculation.GetChannel() + ctx.CancelCalculation.notifyRollback() err, blockHeight, _, _ = DoCalculate(quitChannel, ctx, &req, nil, 0) assert.Error(t, err) assert.True(t, strings.HasSuffix(err.Error(), "was canceled by ROLLBACK")) diff --git a/core/msg_claim.go b/core/msg_claim.go index 1d95e02..100715f 100644 --- a/core/msg_claim.go +++ b/core/msg_claim.go @@ -14,6 +14,7 @@ import ( ) const claimMinIScore = 1000 + var BigIntClaimMinIScore = big.NewInt(claimMinIScore) type ClaimMessage struct { @@ -44,6 +45,7 @@ func (rc *ResponseClaim) String() string { func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { var req ClaimMessage + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize CLAIM message. err=%+v", err) return err @@ -59,6 +61,7 @@ func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { resp.IScore.Set(&IScore.Int) } + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgClaim), id, resp.String()) return c.Send(MsgClaim, id, &resp) } @@ -166,6 +169,7 @@ func (cc *CommitClaim) String() string { func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) error { var req CommitClaim var err error + mh.mgr.waitGroup.Add(1) if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err @@ -178,6 +182,7 @@ func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) erro return nil } + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitClaim), id, "ack") return c.Send(MsgCommitClaim, id, nil) } @@ -217,6 +222,7 @@ func (cb *CommitBlock) String() string { func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) error { var req CommitBlock var err error + mh.mgr.waitGroup.Add(1) if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err } @@ -243,6 +249,7 @@ func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) erro resp = req resp.Success = ret + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitBlock), id, resp.String()) return c.Send(MsgCommitBlock, id, &resp) } diff --git a/core/msg_debug.go b/core/msg_debug.go index f5cc471..9f72dfb 100644 --- a/core/msg_debug.go +++ b/core/msg_debug.go @@ -44,6 +44,8 @@ type MessageData struct { func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { var req DebugMessage + var result error + mh.mgr.waitGroup.Add(1) if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize DEBUG message. err=%+v", err) return err @@ -54,32 +56,36 @@ func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { switch req.Cmd { case DebugStatistics: - return handleStats(c, id, ctx) + result = handleStats(c, id, ctx) case DebugDBInfo: - return handleDBInfo(c, id, ctx) + result = handleDBInfo(c, id, ctx) case DebugPRep: - return handlePRep(c, id, ctx) + result = handlePRep(c, id, ctx) case DebugPRepCandidate: - return handlePRepCandidate(c, id, ctx) + result = handlePRepCandidate(c, id, ctx) case DebugGV: - return handleGV(c, id, ctx) + result = handleGV(c, id, ctx) case DebugLogCTX: ctx.Print() + result = nil case DebugCalcFlagOn: - return handleCalcDebugFlagOn(c, id, ctx) + result = handleCalcDebugFlagOn(c, id, ctx) case DebugCalcFlagOff: - return handleCalcDebugFlagOff(c, id, ctx) + result = handleCalcDebugFlagOff(c, id, ctx) case DebugCalcAddAddress: - return handleCalcDebugAddAddress(c, id, ctx, req.Address) + result = handleCalcDebugAddAddress(c, id, ctx, req.Address) case DebugCalcDelAddress: - return handleCalcDebugDeleteAddress(c, id, ctx, req.Address) + result = handleCalcDebugDeleteAddress(c, id, ctx, req.Address) case DebugCalcListAddresses: - return handleCalcDebugAddresses(c, id, ctx) + result = handleCalcDebugAddresses(c, id, ctx) case DebugCalcDebugResult: - return handleQueryCalcDebugResult(c, id, ctx, req.Address, req.BlockHeight) + result = handleQueryCalcDebugResult(c, id, ctx, req.Address, req.BlockHeight) + default: + result = fmt.Errorf("unknown debug message %d", req.Cmd) } - return fmt.Errorf("unknown debug message %d", req.Cmd) + mh.mgr.waitGroup.Done() + return result } type ResponseDebugStats struct { diff --git a/core/msg_rollback.go b/core/msg_rollback.go index 120cff3..44d984b 100644 --- a/core/msg_rollback.go +++ b/core/msg_rollback.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strconv" + "sync" "github.com/icon-project/rewardcalculator/common/codec" "github.com/icon-project/rewardcalculator/common/ipc" @@ -12,7 +13,7 @@ import ( type RollBackRequest struct { BlockHeight uint64 - BlockHash []byte + BlockHash []byte } func (rb *RollBackRequest) String() string { @@ -32,6 +33,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { success := true var req RollBackRequest var err error + mh.mgr.waitGroup.Add(1) if _, err = codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -53,6 +55,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { resp.BlockHash = make([]byte, BlockHashSize) copy(resp.BlockHash, req.BlockHash) + mh.mgr.waitGroup.Done() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgRollBack), id, resp.String()) return c.Send(MsgRollBack, id, &resp) } @@ -70,7 +73,7 @@ func DoRollBack(ctx *Context, req *RollBackRequest) error { } // notify rollback to other goroutines - ctx.Rollback.notifyRollback() + ctx.CancelCalculation.notifyRollback() // must Rollback claim DB first err = rollbackClaimDB(ctx, blockHeight, req.BlockHash) @@ -111,30 +114,50 @@ func checkAccountDBRollback(ctx *Context, rollback uint64) bool { return true } -type Rollback struct { - channel chan struct{} // Do not close channel in normal case +const ( + CancelNone uint64 = 0 + CancelExit = 1 + CancelRollback = 2 +) + +type CancelCalculation struct { + channel chan struct{} // Do not close channel in normal case. close when caught SIGTERM/SIGINT or got rollback message. + mutex sync.Mutex + cancelCode uint64 } -func (rb *Rollback) newChannel() { - rb.channel = make(chan struct{}) +func (c *CancelCalculation) newChannel() { + c.channel = make(chan struct{}) } -func (rb *Rollback) GetChannel() chan struct{} { - return rb.channel +func (c *CancelCalculation) GetChannel() chan struct{} { + return c.channel } -func (rb *Rollback) notifyRollback() { - // close channel to notify Rollback to all listening goroutines - close(rb.channel) +func (c *CancelCalculation) notifyCancelCalculation(cancelPurpose uint64) { + c.mutex.Lock() + defer c.mutex.Unlock() + close(c.channel) // make new channel for notification - rb.newChannel() + c.cancelCode = cancelPurpose + c.newChannel() +} + +func (c *CancelCalculation) notifyRollback() { + // close channel to notify Rollback to all listening goroutines + c.notifyCancelCalculation(CancelRollback) +} + +func (c *CancelCalculation) notifyExit() { + // close channel to notify Exiting RC process to all listening goroutines + c.notifyCancelCalculation(CancelExit) } -func NewRollback() *Rollback { - rb := new(Rollback) - rb.newChannel() - return rb +func NewCancel() *CancelCalculation { + c := new(CancelCalculation) + c.newChannel() + return c } type RollbackLowBlockHeightError struct { diff --git a/core/msg_rollback_test.go b/core/msg_rollback_test.go index 8143048..3cb6c48 100644 --- a/core/msg_rollback_test.go +++ b/core/msg_rollback_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) - func TestMsgRollback_checkRollback(t *testing.T) { ctx := initTest(2) defer finalizeTest(ctx) @@ -17,24 +16,24 @@ func TestMsgRollback_checkRollback(t *testing.T) { ctx.DB.setCalcDoneBH(calcBlockHeight2) tests := []struct { - name string + name string rollback uint64 - error bool - } { + error bool + }{ { - name: "too low1", + name: "too low1", rollback: calcBlockHeight1 - 1, - error: true, + error: true, }, { - name: "too low2", + name: "too low2", rollback: calcBlockHeight1, - error: true, + error: true, }, { - name: "good", + name: "good", rollback: calcBlockHeight1 + 1, - error: false, + error: false, }, } for _, tt := range tests { @@ -60,33 +59,33 @@ func TestMsgRollback_checkAccountDBRollback(t *testing.T) { const calcBlockHeight uint64 = 100 ctx.DB.setCalcDoneBH(calcBlockHeight) - assert.True(t, checkAccountDBRollback(ctx, calcBlockHeight - 1)) + assert.True(t, checkAccountDBRollback(ctx, calcBlockHeight-1)) assert.True(t, checkAccountDBRollback(ctx, calcBlockHeight)) - assert.False(t, checkAccountDBRollback(ctx, calcBlockHeight + 1)) + assert.False(t, checkAccountDBRollback(ctx, calcBlockHeight+1)) } func TestRollback_newChannel(t *testing.T) { - var rb Rollback + var c CancelCalculation - assert.Nil(t, rb.channel) - rb.newChannel() - assert.NotNil(t, rb.channel) + assert.Nil(t, c.channel) + c.newChannel() + assert.NotNil(t, c.channel) } func TestRollback_getChannel(t *testing.T) { - var rb Rollback - rb.newChannel() + var c CancelCalculation + c.newChannel() - assert.Equal(t, rb.channel, rb.GetChannel()) + assert.Equal(t, c.channel, c.GetChannel()) } func TestRollback_notifyRollback(t *testing.T) { - var rb Rollback - rb.newChannel() + var c CancelCalculation + c.newChannel() - oldChannel := rb.GetChannel() + oldChannel := c.GetChannel() - rb.notifyRollback() - assert.NotEqual(t, oldChannel, rb.GetChannel()) - assert.NotNil(t, rb.GetChannel()) + c.notifyRollback() + assert.NotEqual(t, oldChannel, c.GetChannel()) + assert.NotNil(t, c.GetChannel()) } From 7c510c19b268dedb18dda8e847666bd4c8419297 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Fri, 15 May 2020 17:38:09 +0900 Subject: [PATCH 3/7] Do not distinguish CancelCalculation, Write increasing/decreasing of waitGroup counter method in `manager` structure --- core/manager.go | 10 +++++++++- core/msg.go | 12 ++++++------ core/msg_calculate.go | 38 +++++++++++--------------------------- core/msg_calculate_test.go | 6 +++--- core/msg_claim.go | 12 ++++++------ core/msg_debug.go | 4 ++-- core/msg_rollback.go | 30 ++++++------------------------ core/msg_rollback_test.go | 2 +- 8 files changed, 44 insertions(+), 70 deletions(-) diff --git a/core/manager.go b/core/manager.go index 9293bdc..a7a7a06 100644 --- a/core/manager.go +++ b/core/manager.go @@ -71,7 +71,7 @@ func (m *manager) Loop() error { } func (m *manager) Close() error { - m.ctx.CancelCalculation.notifyExit() + m.ctx.CancelCalculation.notifyCancelCalculation() m.waitGroup.Wait() if m.clientMode { m.conn.Close() @@ -99,6 +99,14 @@ func (m *manager) OnClose(c ipc.Connection) error { return nil } +func (m *manager) IncreaseMessageTask() { + m.waitGroup.Add(1) +} + +func (m *manager) DecreaseMessageTask() { + m.waitGroup.Done() +} + func InitManager(cfg *RcConfig) (*manager, error) { var err error diff --git a/core/msg.go b/core/msg.go index 470a95d..d41c2d2 100644 --- a/core/msg.go +++ b/core/msg.go @@ -158,9 +158,9 @@ func (rv *ResponseVersion) String() string { } func (mh *msgHandler) version(c ipc.Connection, id uint32) error { - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() cBI := mh.mgr.ctx.DB.getCurrentBlockInfo() - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() return sendVersion(c, MsgVersion, id, cBI.BlockHeight, cBI.BlockHash) } @@ -190,7 +190,7 @@ func (rq *ResponseQuery) String() string { func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { var addr common.Address - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &addr); err != nil { return err } @@ -198,7 +198,7 @@ func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { resp := DoQuery(mh.mgr.ctx, addr) - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQuery), id, resp.String()) return c.Send(MsgQuery, id, &resp) } @@ -254,7 +254,7 @@ func (resp *ResponseInit) String() string { func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { return err } @@ -267,7 +267,7 @@ func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { resp.Success = false } - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgINIT), id, resp.String()) return c.Send(MsgINIT, id, &resp) } diff --git a/core/msg_calculate.go b/core/msg_calculate.go index eaf83c8..baa9613 100644 --- a/core/msg_calculate.go +++ b/core/msg_calculate.go @@ -294,7 +294,7 @@ func sendCalculateACK(c ipc.Connection, id uint32, status uint16, blockHeight ui func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error { success := true var req CalculateRequest - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -325,7 +325,7 @@ func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error } resp.StateHash = stateHash - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCalculateDone), 0, resp.String()) return c.Send(MsgCalculateDone, 0, &resp) } @@ -444,15 +444,7 @@ func DoCalculate(quit <-chan struct{}, ctx *Context, req *CalculateRequest, c ip wait.Wait() if quit != ctx.CancelCalculation.GetChannel() { - var err error - switch ctx.CancelCalculation.cancelCode { - case CancelExit: - return &CalcCancelByExit{blockHeight}, blockHeight, nil, nil - case CancelRollback: - err = &CalcCancelByRollbackError{blockHeight} - } - ctx.CancelCalculation.cancelCode = CancelNone - return err, blockHeight, nil, nil + return &CalcCancelError{blockHeight}, blockHeight, nil, nil } // update Statistics @@ -904,14 +896,14 @@ func (cs *QueryCalculateStatusResponse) String() string { func (mh *msgHandler) queryCalculateStatus(c ipc.Connection, id uint32, data []byte) error { ctx := mh.mgr.ctx - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() // send QUERY_CALCULATE_STATUS response var resp QueryCalculateStatusResponse DoQueryCalculateStatus(ctx, &resp) - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateStatus), id, resp.String()) return c.Send(MsgQueryCalculateStatus, id, &resp) } @@ -965,7 +957,7 @@ func (cr *QueryCalculateResultResponse) String() string { func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { log.Printf("Failed to unmarshal data. err=%+v", err) return err @@ -979,7 +971,7 @@ func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []b DoQueryCalculateResult(ctx, blockHeight, &resp) - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateResult), id, resp.String()) return c.Send(MsgQueryCalculateResult, id, &resp) } @@ -1016,23 +1008,15 @@ func DoQueryCalculateResult(ctx *Context, blockHeight uint64, resp *QueryCalcula } } -type CalcCancelByRollbackError struct { +type CalcCancelError struct { BlockHeight uint64 } -func (e *CalcCancelByRollbackError) Error() string { - return fmt.Sprintf("CALCULATE(%d) was canceled by ROLLBACK", e.BlockHeight) +func (e *CalcCancelError) Error() string { + return fmt.Sprintf("CALCULATE(%d) was canceled", e.BlockHeight) } func isCalcCancelByRollback(err error) bool { - _, ok := err.(*CalcCancelByRollbackError) + _, ok := err.(*CalcCancelError) return ok } - -type CalcCancelByExit struct { - BlockHeight uint64 -} - -func (e *CalcCancelByExit) Error() string { - return fmt.Sprintf("CALCULATE(%d) was canceled due to RC process shutting down", e.BlockHeight) -} diff --git a/core/msg_calculate_test.go b/core/msg_calculate_test.go index 5aec0b8..a044bde 100644 --- a/core/msg_calculate_test.go +++ b/core/msg_calculate_test.go @@ -735,10 +735,10 @@ func TestMsgCalc_DoCalculate_Error(t *testing.T) { ctx.DB.setCalculatingBH(uint64(50)) quitChannel := ctx.CancelCalculation.GetChannel() - ctx.CancelCalculation.notifyRollback() + ctx.CancelCalculation.notifyCancelCalculation() err, blockHeight, _, _ = DoCalculate(quitChannel, ctx, &req, nil, 0) assert.Error(t, err) - assert.True(t, strings.HasSuffix(err.Error(), "was canceled by ROLLBACK")) + assert.True(t, strings.HasSuffix(err.Error(), "was canceled")) } func newIScoreAccount(addr common.Address, blockHeight uint64, reward common.HexInt) *IScoreAccount { @@ -812,6 +812,6 @@ func TestMsgQueryCalc_DoQueryCalculateResult(t *testing.T) { } func Test_isCalcCancelByRollback(t *testing.T) { - assert.True(t, isCalcCancelByRollback(&CalcCancelByRollbackError{})) + assert.True(t, isCalcCancelByRollback(&CalcCancelError{})) assert.False(t, isCalcCancelByRollback(&os.PathError{})) } diff --git a/core/msg_claim.go b/core/msg_claim.go index 100715f..968cd8d 100644 --- a/core/msg_claim.go +++ b/core/msg_claim.go @@ -45,7 +45,7 @@ func (rc *ResponseClaim) String() string { func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { var req ClaimMessage - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize CLAIM message. err=%+v", err) return err @@ -61,7 +61,7 @@ func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { resp.IScore.Set(&IScore.Int) } - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgClaim), id, resp.String()) return c.Send(MsgClaim, id, &resp) } @@ -169,7 +169,7 @@ func (cc *CommitClaim) String() string { func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) error { var req CommitClaim var err error - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err @@ -182,7 +182,7 @@ func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) erro return nil } - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitClaim), id, "ack") return c.Send(MsgCommitClaim, id, nil) } @@ -222,7 +222,7 @@ func (cb *CommitBlock) String() string { func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) error { var req CommitBlock var err error - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err } @@ -249,7 +249,7 @@ func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) erro resp = req resp.Success = ret - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitBlock), id, resp.String()) return c.Send(MsgCommitBlock, id, &resp) } diff --git a/core/msg_debug.go b/core/msg_debug.go index 9f72dfb..6a28280 100644 --- a/core/msg_debug.go +++ b/core/msg_debug.go @@ -45,7 +45,7 @@ type MessageData struct { func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { var req DebugMessage var result error - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize DEBUG message. err=%+v", err) return err @@ -84,7 +84,7 @@ func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { result = fmt.Errorf("unknown debug message %d", req.Cmd) } - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() return result } diff --git a/core/msg_rollback.go b/core/msg_rollback.go index 44d984b..8cae9d6 100644 --- a/core/msg_rollback.go +++ b/core/msg_rollback.go @@ -33,7 +33,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { success := true var req RollBackRequest var err error - mh.mgr.waitGroup.Add(1) + mh.mgr.IncreaseMessageTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -55,7 +55,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { resp.BlockHash = make([]byte, BlockHashSize) copy(resp.BlockHash, req.BlockHash) - mh.mgr.waitGroup.Done() + mh.mgr.DecreaseMessageTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgRollBack), id, resp.String()) return c.Send(MsgRollBack, id, &resp) } @@ -73,7 +73,7 @@ func DoRollBack(ctx *Context, req *RollBackRequest) error { } // notify rollback to other goroutines - ctx.CancelCalculation.notifyRollback() + ctx.CancelCalculation.notifyCancelCalculation() // must Rollback claim DB first err = rollbackClaimDB(ctx, blockHeight, req.BlockHash) @@ -114,16 +114,9 @@ func checkAccountDBRollback(ctx *Context, rollback uint64) bool { return true } -const ( - CancelNone uint64 = 0 - CancelExit = 1 - CancelRollback = 2 -) - type CancelCalculation struct { - channel chan struct{} // Do not close channel in normal case. close when caught SIGTERM/SIGINT or got rollback message. - mutex sync.Mutex - cancelCode uint64 + channel chan struct{} // Do not close channel in normal case. close when caught SIGTERM/SIGINT or got rollback message. + mutex sync.Mutex } func (c *CancelCalculation) newChannel() { @@ -134,26 +127,15 @@ func (c *CancelCalculation) GetChannel() chan struct{} { return c.channel } -func (c *CancelCalculation) notifyCancelCalculation(cancelPurpose uint64) { +func (c *CancelCalculation) notifyCancelCalculation() { c.mutex.Lock() defer c.mutex.Unlock() close(c.channel) // make new channel for notification - c.cancelCode = cancelPurpose c.newChannel() } -func (c *CancelCalculation) notifyRollback() { - // close channel to notify Rollback to all listening goroutines - c.notifyCancelCalculation(CancelRollback) -} - -func (c *CancelCalculation) notifyExit() { - // close channel to notify Exiting RC process to all listening goroutines - c.notifyCancelCalculation(CancelExit) -} - func NewCancel() *CancelCalculation { c := new(CancelCalculation) c.newChannel() diff --git a/core/msg_rollback_test.go b/core/msg_rollback_test.go index 3cb6c48..28a1c6f 100644 --- a/core/msg_rollback_test.go +++ b/core/msg_rollback_test.go @@ -85,7 +85,7 @@ func TestRollback_notifyRollback(t *testing.T) { oldChannel := c.GetChannel() - c.notifyRollback() + c.notifyCancelCalculation() assert.NotEqual(t, oldChannel, c.GetChannel()) assert.NotNil(t, c.GetChannel()) } From 2a41ce761ad46e0c5cd31dfde4a77acbc6148142 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Fri, 15 May 2020 17:48:56 +0900 Subject: [PATCH 4/7] Distinguish CancelCalculation, Rename increasing/decreasing of waitGroup counter method in `manager` structure" --- core/manager.go | 11 +++++++---- core/msg.go | 12 ++++++------ core/msg_calculate.go | 38 +++++++++++++++++++++++++++----------- core/msg_calculate_test.go | 6 +++--- core/msg_claim.go | 12 ++++++------ core/msg_debug.go | 4 ++-- core/msg_rollback.go | 30 ++++++++++++++++++++++++------ core/msg_rollback_test.go | 2 +- 8 files changed, 76 insertions(+), 39 deletions(-) diff --git a/core/manager.go b/core/manager.go index a7a7a06..b50a66b 100644 --- a/core/manager.go +++ b/core/manager.go @@ -71,8 +71,8 @@ func (m *manager) Loop() error { } func (m *manager) Close() error { - m.ctx.CancelCalculation.notifyCancelCalculation() - m.waitGroup.Wait() + m.ctx.CancelCalculation.notifyExit() + m.WaitMsgTasksDone() if m.clientMode { m.conn.Close() } else { @@ -99,14 +99,17 @@ func (m *manager) OnClose(c ipc.Connection) error { return nil } -func (m *manager) IncreaseMessageTask() { +func (m *manager) IncreaseMsgTask() { m.waitGroup.Add(1) } -func (m *manager) DecreaseMessageTask() { +func (m *manager) DecreaseMsgTask() { m.waitGroup.Done() } +func (m *manager) WaitMsgTasksDone() { + m.waitGroup.Wait() +} func InitManager(cfg *RcConfig) (*manager, error) { var err error diff --git a/core/msg.go b/core/msg.go index d41c2d2..61e4cdf 100644 --- a/core/msg.go +++ b/core/msg.go @@ -158,9 +158,9 @@ func (rv *ResponseVersion) String() string { } func (mh *msgHandler) version(c ipc.Connection, id uint32) error { - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() cBI := mh.mgr.ctx.DB.getCurrentBlockInfo() - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() return sendVersion(c, MsgVersion, id, cBI.BlockHeight, cBI.BlockHash) } @@ -190,7 +190,7 @@ func (rq *ResponseQuery) String() string { func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { var addr common.Address - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &addr); err != nil { return err } @@ -198,7 +198,7 @@ func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { resp := DoQuery(mh.mgr.ctx, addr) - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQuery), id, resp.String()) return c.Send(MsgQuery, id, &resp) } @@ -254,7 +254,7 @@ func (resp *ResponseInit) String() string { func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { return err } @@ -267,7 +267,7 @@ func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { resp.Success = false } - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgINIT), id, resp.String()) return c.Send(MsgINIT, id, &resp) } diff --git a/core/msg_calculate.go b/core/msg_calculate.go index baa9613..08bafea 100644 --- a/core/msg_calculate.go +++ b/core/msg_calculate.go @@ -294,7 +294,7 @@ func sendCalculateACK(c ipc.Connection, id uint32, status uint16, blockHeight ui func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error { success := true var req CalculateRequest - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -325,7 +325,7 @@ func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error } resp.StateHash = stateHash - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCalculateDone), 0, resp.String()) return c.Send(MsgCalculateDone, 0, &resp) } @@ -444,7 +444,15 @@ func DoCalculate(quit <-chan struct{}, ctx *Context, req *CalculateRequest, c ip wait.Wait() if quit != ctx.CancelCalculation.GetChannel() { - return &CalcCancelError{blockHeight}, blockHeight, nil, nil + var err error + switch ctx.CancelCalculation.cancelCode { + case CancelExit: + err = &CalcCancelByExit{blockHeight} + case CancelRollback: + err = &CalcCancelByRollbackError{blockHeight} + } + ctx.CancelCalculation.cancelCode = CancelNone + return err, blockHeight, nil, nil } // update Statistics @@ -896,14 +904,14 @@ func (cs *QueryCalculateStatusResponse) String() string { func (mh *msgHandler) queryCalculateStatus(c ipc.Connection, id uint32, data []byte) error { ctx := mh.mgr.ctx - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() // send QUERY_CALCULATE_STATUS response var resp QueryCalculateStatusResponse DoQueryCalculateStatus(ctx, &resp) - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateStatus), id, resp.String()) return c.Send(MsgQueryCalculateStatus, id, &resp) } @@ -957,7 +965,7 @@ func (cr *QueryCalculateResultResponse) String() string { func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { log.Printf("Failed to unmarshal data. err=%+v", err) return err @@ -971,7 +979,7 @@ func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []b DoQueryCalculateResult(ctx, blockHeight, &resp) - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateResult), id, resp.String()) return c.Send(MsgQueryCalculateResult, id, &resp) } @@ -1008,15 +1016,23 @@ func DoQueryCalculateResult(ctx *Context, blockHeight uint64, resp *QueryCalcula } } -type CalcCancelError struct { +type CalcCancelByRollbackError struct { BlockHeight uint64 } -func (e *CalcCancelError) Error() string { - return fmt.Sprintf("CALCULATE(%d) was canceled", e.BlockHeight) +func (e *CalcCancelByRollbackError) Error() string { + return fmt.Sprintf("CALCULATE(%d) was canceled by ROLLBACK", e.BlockHeight) } func isCalcCancelByRollback(err error) bool { - _, ok := err.(*CalcCancelError) + _, ok := err.(*CalcCancelByRollbackError) return ok } + +type CalcCancelByExit struct { + BlockHeight uint64 +} + +func (e *CalcCancelByExit) Error() string { + return fmt.Sprintf("CALCULATE(%d) was canceled due to RC process shutting down", e.BlockHeight) +} diff --git a/core/msg_calculate_test.go b/core/msg_calculate_test.go index a044bde..5aec0b8 100644 --- a/core/msg_calculate_test.go +++ b/core/msg_calculate_test.go @@ -735,10 +735,10 @@ func TestMsgCalc_DoCalculate_Error(t *testing.T) { ctx.DB.setCalculatingBH(uint64(50)) quitChannel := ctx.CancelCalculation.GetChannel() - ctx.CancelCalculation.notifyCancelCalculation() + ctx.CancelCalculation.notifyRollback() err, blockHeight, _, _ = DoCalculate(quitChannel, ctx, &req, nil, 0) assert.Error(t, err) - assert.True(t, strings.HasSuffix(err.Error(), "was canceled")) + assert.True(t, strings.HasSuffix(err.Error(), "was canceled by ROLLBACK")) } func newIScoreAccount(addr common.Address, blockHeight uint64, reward common.HexInt) *IScoreAccount { @@ -812,6 +812,6 @@ func TestMsgQueryCalc_DoQueryCalculateResult(t *testing.T) { } func Test_isCalcCancelByRollback(t *testing.T) { - assert.True(t, isCalcCancelByRollback(&CalcCancelError{})) + assert.True(t, isCalcCancelByRollback(&CalcCancelByRollbackError{})) assert.False(t, isCalcCancelByRollback(&os.PathError{})) } diff --git a/core/msg_claim.go b/core/msg_claim.go index 968cd8d..b4e472e 100644 --- a/core/msg_claim.go +++ b/core/msg_claim.go @@ -45,7 +45,7 @@ func (rc *ResponseClaim) String() string { func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { var req ClaimMessage - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize CLAIM message. err=%+v", err) return err @@ -61,7 +61,7 @@ func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { resp.IScore.Set(&IScore.Int) } - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgClaim), id, resp.String()) return c.Send(MsgClaim, id, &resp) } @@ -169,7 +169,7 @@ func (cc *CommitClaim) String() string { func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) error { var req CommitClaim var err error - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err @@ -182,7 +182,7 @@ func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) erro return nil } - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitClaim), id, "ack") return c.Send(MsgCommitClaim, id, nil) } @@ -222,7 +222,7 @@ func (cb *CommitBlock) String() string { func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) error { var req CommitBlock var err error - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err } @@ -249,7 +249,7 @@ func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) erro resp = req resp.Success = ret - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitBlock), id, resp.String()) return c.Send(MsgCommitBlock, id, &resp) } diff --git a/core/msg_debug.go b/core/msg_debug.go index 6a28280..d369c82 100644 --- a/core/msg_debug.go +++ b/core/msg_debug.go @@ -45,7 +45,7 @@ type MessageData struct { func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { var req DebugMessage var result error - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize DEBUG message. err=%+v", err) return err @@ -84,7 +84,7 @@ func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { result = fmt.Errorf("unknown debug message %d", req.Cmd) } - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() return result } diff --git a/core/msg_rollback.go b/core/msg_rollback.go index 8cae9d6..9452f5e 100644 --- a/core/msg_rollback.go +++ b/core/msg_rollback.go @@ -33,7 +33,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { success := true var req RollBackRequest var err error - mh.mgr.IncreaseMessageTask() + mh.mgr.IncreaseMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -55,7 +55,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { resp.BlockHash = make([]byte, BlockHashSize) copy(resp.BlockHash, req.BlockHash) - mh.mgr.DecreaseMessageTask() + mh.mgr.DecreaseMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgRollBack), id, resp.String()) return c.Send(MsgRollBack, id, &resp) } @@ -73,7 +73,7 @@ func DoRollBack(ctx *Context, req *RollBackRequest) error { } // notify rollback to other goroutines - ctx.CancelCalculation.notifyCancelCalculation() + ctx.CancelCalculation.notifyRollback() // must Rollback claim DB first err = rollbackClaimDB(ctx, blockHeight, req.BlockHash) @@ -114,9 +114,16 @@ func checkAccountDBRollback(ctx *Context, rollback uint64) bool { return true } +const ( + CancelNone uint64 = 0 + CancelExit = 1 + CancelRollback = 2 +) + type CancelCalculation struct { - channel chan struct{} // Do not close channel in normal case. close when caught SIGTERM/SIGINT or got rollback message. - mutex sync.Mutex + channel chan struct{} // Do not close channel in normal case. close when caught SIGTERM/SIGINT or got rollback message. + mutex sync.Mutex + cancelCode uint64 } func (c *CancelCalculation) newChannel() { @@ -127,15 +134,26 @@ func (c *CancelCalculation) GetChannel() chan struct{} { return c.channel } -func (c *CancelCalculation) notifyCancelCalculation() { +func (c *CancelCalculation) notifyCancelCalculation(cancelPurpose uint64) { c.mutex.Lock() defer c.mutex.Unlock() close(c.channel) // make new channel for notification + c.cancelCode = cancelPurpose c.newChannel() } +func (c *CancelCalculation) notifyRollback() { + // close channel to notify Rollback to all listening goroutines + c.notifyCancelCalculation(CancelRollback) +} + +func (c *CancelCalculation) notifyExit() { + // close channel to notify Exiting RC process to all listening goroutines + c.notifyCancelCalculation(CancelExit) +} + func NewCancel() *CancelCalculation { c := new(CancelCalculation) c.newChannel() diff --git a/core/msg_rollback_test.go b/core/msg_rollback_test.go index 28a1c6f..3cb6c48 100644 --- a/core/msg_rollback_test.go +++ b/core/msg_rollback_test.go @@ -85,7 +85,7 @@ func TestRollback_notifyRollback(t *testing.T) { oldChannel := c.GetChannel() - c.notifyCancelCalculation() + c.notifyRollback() assert.NotEqual(t, oldChannel, c.GetChannel()) assert.NotNil(t, c.GetChannel()) } From 42abfa2a70160821c059ca7ae72961240e3f7335 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Mon, 18 May 2020 10:21:27 +0900 Subject: [PATCH 5/7] Change methods name --- core/manager.go | 4 ++-- core/msg.go | 12 ++++++------ core/msg_calculate.go | 12 ++++++------ core/msg_claim.go | 12 ++++++------ core/msg_debug.go | 4 ++-- core/msg_rollback.go | 4 ++-- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/core/manager.go b/core/manager.go index b50a66b..b67e80d 100644 --- a/core/manager.go +++ b/core/manager.go @@ -99,11 +99,11 @@ func (m *manager) OnClose(c ipc.Connection) error { return nil } -func (m *manager) IncreaseMsgTask() { +func (m *manager) AddMsgTask() { m.waitGroup.Add(1) } -func (m *manager) DecreaseMsgTask() { +func (m *manager) DoneMsgTask() { m.waitGroup.Done() } diff --git a/core/msg.go b/core/msg.go index 61e4cdf..7af0d01 100644 --- a/core/msg.go +++ b/core/msg.go @@ -158,9 +158,9 @@ func (rv *ResponseVersion) String() string { } func (mh *msgHandler) version(c ipc.Connection, id uint32) error { - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() cBI := mh.mgr.ctx.DB.getCurrentBlockInfo() - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() return sendVersion(c, MsgVersion, id, cBI.BlockHeight, cBI.BlockHash) } @@ -190,7 +190,7 @@ func (rq *ResponseQuery) String() string { func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { var addr common.Address - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &addr); err != nil { return err } @@ -198,7 +198,7 @@ func (mh *msgHandler) query(c ipc.Connection, id uint32, data []byte) error { resp := DoQuery(mh.mgr.ctx, addr) - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQuery), id, resp.String()) return c.Send(MsgQuery, id, &resp) } @@ -254,7 +254,7 @@ func (resp *ResponseInit) String() string { func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { return err } @@ -267,7 +267,7 @@ func (mh *msgHandler) init(c ipc.Connection, id uint32, data []byte) error { resp.Success = false } - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgINIT), id, resp.String()) return c.Send(MsgINIT, id, &resp) } diff --git a/core/msg_calculate.go b/core/msg_calculate.go index 08bafea..2495a3b 100644 --- a/core/msg_calculate.go +++ b/core/msg_calculate.go @@ -294,7 +294,7 @@ func sendCalculateACK(c ipc.Connection, id uint32, status uint16, blockHeight ui func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error { success := true var req CalculateRequest - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -325,7 +325,7 @@ func (mh *msgHandler) calculate(c ipc.Connection, id uint32, data []byte) error } resp.StateHash = stateHash - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCalculateDone), 0, resp.String()) return c.Send(MsgCalculateDone, 0, &resp) } @@ -904,14 +904,14 @@ func (cs *QueryCalculateStatusResponse) String() string { func (mh *msgHandler) queryCalculateStatus(c ipc.Connection, id uint32, data []byte) error { ctx := mh.mgr.ctx - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() // send QUERY_CALCULATE_STATUS response var resp QueryCalculateStatusResponse DoQueryCalculateStatus(ctx, &resp) - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateStatus), id, resp.String()) return c.Send(MsgQueryCalculateStatus, id, &resp) } @@ -965,7 +965,7 @@ func (cr *QueryCalculateResultResponse) String() string { func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []byte) error { var blockHeight uint64 - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &blockHeight); err != nil { log.Printf("Failed to unmarshal data. err=%+v", err) return err @@ -979,7 +979,7 @@ func (mh *msgHandler) queryCalculateResult(c ipc.Connection, id uint32, data []b DoQueryCalculateResult(ctx, blockHeight, &resp) - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgQueryCalculateResult), id, resp.String()) return c.Send(MsgQueryCalculateResult, id, &resp) } diff --git a/core/msg_claim.go b/core/msg_claim.go index b4e472e..e4d409b 100644 --- a/core/msg_claim.go +++ b/core/msg_claim.go @@ -45,7 +45,7 @@ func (rc *ResponseClaim) String() string { func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { var req ClaimMessage - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize CLAIM message. err=%+v", err) return err @@ -61,7 +61,7 @@ func (mh *msgHandler) claim(c ipc.Connection, id uint32, data []byte) error { resp.IScore.Set(&IScore.Int) } - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgClaim), id, resp.String()) return c.Send(MsgClaim, id, &resp) } @@ -169,7 +169,7 @@ func (cc *CommitClaim) String() string { func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) error { var req CommitClaim var err error - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err @@ -182,7 +182,7 @@ func (mh *msgHandler) commitClaim(c ipc.Connection, id uint32, data []byte) erro return nil } - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitClaim), id, "ack") return c.Send(MsgCommitClaim, id, nil) } @@ -222,7 +222,7 @@ func (cb *CommitBlock) String() string { func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) error { var req CommitBlock var err error - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); nil != err { return err } @@ -249,7 +249,7 @@ func (mh *msgHandler) commitBlock(c ipc.Connection, id uint32, data []byte) erro resp = req resp.Success = ret - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgCommitBlock), id, resp.String()) return c.Send(MsgCommitBlock, id, &resp) } diff --git a/core/msg_debug.go b/core/msg_debug.go index d369c82..d13c638 100644 --- a/core/msg_debug.go +++ b/core/msg_debug.go @@ -45,7 +45,7 @@ type MessageData struct { func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { var req DebugMessage var result error - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err := codec.MP.UnmarshalFromBytes(data, &req); err != nil { log.Printf("Failed to deserialize DEBUG message. err=%+v", err) return err @@ -84,7 +84,7 @@ func (mh *msgHandler) debug(c ipc.Connection, id uint32, data []byte) error { result = fmt.Errorf("unknown debug message %d", req.Cmd) } - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() return result } diff --git a/core/msg_rollback.go b/core/msg_rollback.go index 9452f5e..d22b150 100644 --- a/core/msg_rollback.go +++ b/core/msg_rollback.go @@ -33,7 +33,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { success := true var req RollBackRequest var err error - mh.mgr.IncreaseMsgTask() + mh.mgr.AddMsgTask() if _, err = codec.MP.UnmarshalFromBytes(data, &req); err != nil { return err } @@ -55,7 +55,7 @@ func (mh *msgHandler) rollback(c ipc.Connection, id uint32, data []byte) error { resp.BlockHash = make([]byte, BlockHashSize) copy(resp.BlockHash, req.BlockHash) - mh.mgr.DecreaseMsgTask() + mh.mgr.DoneMsgTask() log.Printf("Send message. (msg:%s, id:%d, data:%s)", MsgToString(MsgRollBack), id, resp.String()) return c.Send(MsgRollBack, id, &resp) } From d49d1d9816965cf7ff967d408516dd4bcf105876 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Mon, 18 May 2020 11:38:19 +0900 Subject: [PATCH 6/7] Add log messages --- core/manager.go | 1 + core/msg_rollback.go | 1 + 2 files changed, 2 insertions(+) diff --git a/core/manager.go b/core/manager.go index b67e80d..d1abee9 100644 --- a/core/manager.go +++ b/core/manager.go @@ -108,6 +108,7 @@ func (m *manager) DoneMsgTask() { } func (m *manager) WaitMsgTasksDone() { + log.Printf("Wait until all goroutines accessing DB done") m.waitGroup.Wait() } func InitManager(cfg *RcConfig) (*manager, error) { diff --git a/core/msg_rollback.go b/core/msg_rollback.go index d22b150..ee5fdd1 100644 --- a/core/msg_rollback.go +++ b/core/msg_rollback.go @@ -151,6 +151,7 @@ func (c *CancelCalculation) notifyRollback() { func (c *CancelCalculation) notifyExit() { // close channel to notify Exiting RC process to all listening goroutines + log.Printf("Notify goroutines that RC process will exit") c.notifyCancelCalculation(CancelExit) } From ad64e6c1e2c41405145a7d4d7a568e7060275369 Mon Sep 17 00:00:00 2001 From: inwonkim Date: Mon, 18 May 2020 11:43:07 +0900 Subject: [PATCH 7/7] Add log messages --- core/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/manager.go b/core/manager.go index d1abee9..5ecfa57 100644 --- a/core/manager.go +++ b/core/manager.go @@ -82,6 +82,7 @@ func (m *manager) Close() error { } CloseIScoreDB(m.ctx.DB) + log.Printf("Exit Reward Calculator") return nil }