diff --git a/kernel/job/cron.go b/kernel/job/cron.go index 7026d92be99..ea377c15857 100644 --- a/kernel/job/cron.go +++ b/kernel/job/cron.go @@ -35,7 +35,6 @@ func StartCron() { go every(2*time.Hour, model.StatJob) go every(2*time.Hour, model.RefreshCheckJob) go every(3*time.Second, model.FlushUpdateRefTextRenameDocJob) - go every(50*time.Millisecond, model.FlushTxJob) go every(util.SQLFlushInterval, sql.FlushTxJob) go every(util.SQLFlushInterval, sql.FlushHistoryTxJob) go every(util.SQLFlushInterval, sql.FlushAssetContentTxJob) diff --git a/kernel/model/transaction.go b/kernel/model/transaction.go index 4e2713d7756..51917baaf94 100644 --- a/kernel/model/transaction.go +++ b/kernel/model/transaction.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "path/filepath" - "sort" "strings" "sync" "time" @@ -65,11 +64,6 @@ func IsUnfoldHeading(transactions *[]*Transaction) bool { return false } -var ( - txQueue []*Transaction - txQueueLock = sync.Mutex{} -) - func WaitForWritingFiles() { var printLog bool var lastPrintLog bool @@ -86,25 +80,34 @@ func WaitForWritingFiles() { } } +var ( + txQueue = make(chan *Transaction, 7) + flushLock = sync.Mutex{} +) + func isWritingFiles() bool { time.Sleep(time.Duration(20) * time.Millisecond) - return 0 < len(txQueue) || util.IsMutexLocked(&txQueueLock) || util.IsMutexLocked(&flushLock) + return 0 < len(txQueue) || util.IsMutexLocked(&flushLock) } -func FlushTxJob() { - flushTx() +func init() { + go func() { + for { + select { + case tx := <-txQueue: + flushTx(tx) + } + } + }() } -var flushLock = sync.Mutex{} - -func flushTx() { +func flushTx(tx *Transaction) { defer logging.Recover() flushLock.Lock() defer flushLock.Unlock() - currentTx := mergeTx() start := time.Now() - if txErr := performTx(currentTx); nil != txErr { + if txErr := performTx(tx); nil != txErr { switch txErr.code { case TxErrCodeBlockNotFound: util.PushTxErr("Transaction failed", txErr.code, nil) @@ -116,48 +119,17 @@ func flushTx() { } } elapsed := time.Now().Sub(start).Milliseconds() - if 0 < len(currentTx.DoOperations) { + if 0 < len(tx.DoOperations) { if 2000 < elapsed { logging.LogWarnf("op tx [%dms]", elapsed) } } } -func mergeTx() (ret *Transaction) { - txQueueLock.Lock() - defer txQueueLock.Unlock() - - ret = &Transaction{} - var doOps []*Operation - for _, tx := range txQueue { - for _, op := range tx.DoOperations { - if l := len(doOps); 0 < l { - lastOp := doOps[l-1] - if "update" == lastOp.Action && "update" == op.Action && lastOp.ID == op.ID { // 连续相同的更新操作 - lastOp.discard = true - } - } - doOps = append(doOps, op) - } - } - - for _, op := range doOps { - if !op.discard { - ret.DoOperations = append(ret.DoOperations, op) - } - } - - txQueue = nil - return -} - func PerformTransactions(transactions *[]*Transaction) { - txQueueLock.Lock() - txQueue = append(txQueue, *transactions...) - sort.Slice(txQueue, func(i, j int) bool { - return txQueue[i].Timestamp < txQueue[j].Timestamp - }) - txQueueLock.Unlock() + for _, tx := range *transactions { + txQueue <- tx + } return }