Skip to content

Commit

Permalink
♻️ Refactor kernel model transaction #9338
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Oct 4, 2023
1 parent 828eeee commit 56ace2e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 50 deletions.
1 change: 0 additions & 1 deletion kernel/job/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func StartCron() {
go every(2*time.Hour, model.StatJob)
go every(2*time.Hour, model.RefreshCheckJob)
go every(3*time.Second, model.FlushUpdateRefTextRenameDocJob)
go every(50*time.Millisecond, model.FlushTxJob)
go every(util.SQLFlushInterval, sql.FlushTxJob)
go every(util.SQLFlushInterval, sql.FlushHistoryTxJob)
go every(util.SQLFlushInterval, sql.FlushAssetContentTxJob)
Expand Down
70 changes: 21 additions & 49 deletions kernel/model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -65,11 +64,6 @@ func IsUnfoldHeading(transactions *[]*Transaction) bool {
return false
}

var (
txQueue []*Transaction
txQueueLock = sync.Mutex{}
)

func WaitForWritingFiles() {
var printLog bool
var lastPrintLog bool
Expand All @@ -86,25 +80,34 @@ func WaitForWritingFiles() {
}
}

var (
txQueue = make(chan *Transaction, 7)
flushLock = sync.Mutex{}
)

func isWritingFiles() bool {
time.Sleep(time.Duration(20) * time.Millisecond)
return 0 < len(txQueue) || util.IsMutexLocked(&txQueueLock) || util.IsMutexLocked(&flushLock)
return 0 < len(txQueue) || util.IsMutexLocked(&flushLock)
}

func FlushTxJob() {
flushTx()
func init() {
go func() {
for {
select {
case tx := <-txQueue:
flushTx(tx)
}
}
}()
}

var flushLock = sync.Mutex{}

func flushTx() {
func flushTx(tx *Transaction) {
defer logging.Recover()
flushLock.Lock()
defer flushLock.Unlock()

currentTx := mergeTx()
start := time.Now()
if txErr := performTx(currentTx); nil != txErr {
if txErr := performTx(tx); nil != txErr {
switch txErr.code {
case TxErrCodeBlockNotFound:
util.PushTxErr("Transaction failed", txErr.code, nil)
Expand All @@ -116,48 +119,17 @@ func flushTx() {
}
}
elapsed := time.Now().Sub(start).Milliseconds()
if 0 < len(currentTx.DoOperations) {
if 0 < len(tx.DoOperations) {
if 2000 < elapsed {
logging.LogWarnf("op tx [%dms]", elapsed)
}
}
}

func mergeTx() (ret *Transaction) {
txQueueLock.Lock()
defer txQueueLock.Unlock()

ret = &Transaction{}
var doOps []*Operation
for _, tx := range txQueue {
for _, op := range tx.DoOperations {
if l := len(doOps); 0 < l {
lastOp := doOps[l-1]
if "update" == lastOp.Action && "update" == op.Action && lastOp.ID == op.ID { // 连续相同的更新操作
lastOp.discard = true
}
}
doOps = append(doOps, op)
}
}

for _, op := range doOps {
if !op.discard {
ret.DoOperations = append(ret.DoOperations, op)
}
}

txQueue = nil
return
}

func PerformTransactions(transactions *[]*Transaction) {
txQueueLock.Lock()
txQueue = append(txQueue, *transactions...)
sort.Slice(txQueue, func(i, j int) bool {
return txQueue[i].Timestamp < txQueue[j].Timestamp
})
txQueueLock.Unlock()
for _, tx := range *transactions {
txQueue <- tx
}
return
}

Expand Down

0 comments on commit 56ace2e

Please sign in to comment.