From ec9bf1695c9f09da8e6b639b4d18b0a11cb3b539 Mon Sep 17 00:00:00 2001
From: OxalisCu <2127298698@qq.com>
Date: Sat, 27 Jul 2024 13:38:23 +0800
Subject: [PATCH 1/4] feat: support parallerl writer in sync & scan mode

---
 cmd/redis-shake/main.go                    |  7 ++-
 internal/writer/interface.go               |  2 +
 internal/writer/redis_cluster_writer.go    | 61 ++++++++++++++++------
 internal/writer/redis_standalone_writer.go | 59 +++++++++++++--------
 4 files changed, 89 insertions(+), 40 deletions(-)

diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go
index 1be6dd6c..2e35afc7 100644
--- a/cmd/redis-shake/main.go
+++ b/cmd/redis-shake/main.go
@@ -121,7 +121,10 @@ func main() {
 
 	log.Infof("start syncing...")
 
-	ch := theReader.StartRead(ctx)
+	chr := theReader.StartRead(ctx)
+
+	theWriter.StartWrite(ctx)
+
 	go waitShutdown(cancel)
 
 	ticker := time.NewTicker(1 * time.Second)
@@ -129,7 +132,7 @@ func main() {
 Loop:
 	for {
 		select {
-		case e, ok := <-ch:
+		case e, ok := <-chr:
 			if !ok {
 				// ch has been closed, exit the loop
 				break Loop
diff --git a/internal/writer/interface.go b/internal/writer/interface.go
index 523f14fb..21329d29 100644
--- a/internal/writer/interface.go
+++ b/internal/writer/interface.go
@@ -3,10 +3,12 @@ package writer
 import (
 	"RedisShake/internal/entry"
 	"RedisShake/internal/status"
+	"context"
 )
 
 type Writer interface {
 	status.Statusable
 	Write(entry *entry.Entry)
+	StartWrite(ctx context.Context) (ch chan *entry.Entry)
 	Close()
 }
diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go
index 1fc22b54..14b2d46b 100644
--- a/internal/writer/redis_cluster_writer.go
+++ b/internal/writer/redis_cluster_writer.go
@@ -2,6 +2,7 @@ package writer
 
 import (
 	"context"
+	"sync"
 
 	"RedisShake/internal/entry"
 	"RedisShake/internal/log"
@@ -14,18 +15,22 @@ type RedisClusterWriter struct {
 	addresses []string
 	writers   []Writer
 	router    [KeySlots]Writer
-
-	stat []interface{}
+	ch        chan *entry.Entry
+	chWg      sync.WaitGroup
+	stat      []interface{}
 }
 
 func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
 	rw := new(RedisClusterWriter)
 	rw.loadClusterNodes(ctx, opts)
+	rw.ch = make(chan *entry.Entry, 1024)
 	log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
 	return rw
 }
 
 func (r *RedisClusterWriter) Close() {
+	r.chWg.Wait()
+	close(r.ch)
 	for _, writer := range r.writers {
 		writer.Close()
 	}
@@ -54,24 +59,46 @@ func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWr
 	}
 }
 
-func (r *RedisClusterWriter) Write(entry *entry.Entry) {
-	if len(entry.Slots) == 0 {
-		for _, writer := range r.writers {
-			writer.Write(entry)
-		}
-		return
+func (r *RedisClusterWriter) StartWrite(ctx context.Context) chan *entry.Entry {
+	chs := make(map[string]chan *entry.Entry, len(r.writers))
+	for _, w := range r.writers {
+		stat := w.Status().(struct {
+			Name              string `json:"name"`
+			UnansweredBytes   int64  `json:"unanswered_bytes"`
+			UnansweredEntries int64  `json:"unanswered_entries"`
+		})
+		chs[stat.Name] = w.StartWrite(ctx)
 	}
 
-	lastSlot := -1
-	for _, slot := range entry.Slots {
-		if lastSlot == -1 {
-			lastSlot = slot
-		}
-		if slot != lastSlot {
-			log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
+	r.chWg = sync.WaitGroup{}
+	r.chWg.Add(1)
+	go func() {
+		for entry := range r.ch {
+			if len(entry.Slots) == 0 {
+				for _, writer := range r.writers {
+					writer.Write(entry)
+				}
+				continue
+			}
+			lastSlot := -1
+			for _, slot := range entry.Slots {
+				if lastSlot == -1 {
+					lastSlot = slot
+				}
+				if slot != lastSlot {
+					log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
+				}
+			}
+			r.router[lastSlot].Write(entry)
 		}
-	}
-	r.router[lastSlot].Write(entry)
+		r.chWg.Done()
+	}()
+
+	return r.ch
+}
+
+func (r *RedisClusterWriter) Write(entry *entry.Entry) {
+	r.ch <- entry
 }
 
 func (r *RedisClusterWriter) Consistent() bool {
diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go
index 13049adc..ee9b4470 100644
--- a/internal/writer/redis_standalone_writer.go
+++ b/internal/writer/redis_standalone_writer.go
@@ -34,8 +34,10 @@ type redisStandaloneWriter struct {
 	DbId    int
 
 	chWaitReply chan *entry.Entry
-	chWg        sync.WaitGroup
+	chWaitWg    sync.WaitGroup
 	offReply    bool
+	ch          chan *entry.Entry
+	chWg        sync.WaitGroup
 
 	stat struct {
 		Name              string `json:"name"`
@@ -49,13 +51,14 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
 	rw.address = opts.Address
 	rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
 	rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
+	rw.ch = make(chan *entry.Entry, 1024)
 	if opts.OffReply {
 		log.Infof("turn off the reply of write")
 		rw.offReply = true
 		rw.client.Send("CLIENT", "REPLY", "OFF")
 	} else {
 		rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
-		rw.chWg.Add(1)
+		rw.chWaitWg.Add(1)
 		go rw.processReply()
 	}
 	return rw
@@ -63,29 +66,43 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
 
 func (w *redisStandaloneWriter) Close() {
 	if !w.offReply {
-		close(w.chWaitReply)
+		close(w.ch)
 		w.chWg.Wait()
+		close(w.chWaitReply)
+		w.chWaitWg.Wait()
 	}
 }
 
-func (w *redisStandaloneWriter) Write(e *entry.Entry) {
-	// switch db if we need
-	if w.DbId != e.DbId {
-		w.switchDbTo(e.DbId)
-	}
+func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry {
+	w.chWg = sync.WaitGroup{}
+	w.chWg.Add(1)
+	go func() {
+		for e := range w.ch {
+			// switch db if we need
+			if w.DbId != e.DbId {
+				w.switchDbTo(e.DbId)
+			}
+			// send
+			bytes := e.Serialize()
+			for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
+				time.Sleep(1 * time.Nanosecond)
+			}
+			log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
+			if !w.offReply {
+				w.chWaitReply <- e
+				atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
+				atomic.AddInt64(&w.stat.UnansweredEntries, 1)
+			}
+			w.client.SendBytes(bytes)
+		}
+		w.chWg.Done()
+	}()
 
-	// send
-	bytes := e.Serialize()
-	for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
-		time.Sleep(1 * time.Nanosecond)
-	}
-	log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
-	if !w.offReply {
-		w.chWaitReply <- e
-		atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
-		atomic.AddInt64(&w.stat.UnansweredEntries, 1)
-	}
-	w.client.SendBytes(bytes)
+	return w.ch
+}
+
+func (w *redisStandaloneWriter) Write(e *entry.Entry) {
+	w.ch <- e
 }
 
 func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
@@ -124,7 +141,7 @@ func (w *redisStandaloneWriter) processReply() {
 		atomic.AddInt64(&w.stat.UnansweredBytes, -e.SerializedSize)
 		atomic.AddInt64(&w.stat.UnansweredEntries, -1)
 	}
-	w.chWg.Done()
+	w.chWaitWg.Done()
 }
 
 func (w *redisStandaloneWriter) Status() interface{} {

From c9c5c24f292a6861b233315d1d6251c9451032db Mon Sep 17 00:00:00 2001
From: OxalisCu <2127298698@qq.com>
Date: Tue, 24 Sep 2024 12:49:30 +0800
Subject: [PATCH 2/4] feat: parallel parse, filter, function

Signed-off-by: OxalisCu <2127298698@qq.com>
---
 cmd/redis-shake/main.go                   | 62 ++++++++++++++---------
 internal/reader/aof_reader.go             |  4 +-
 internal/reader/interface.go              |  4 +-
 internal/reader/rdb_reader.go             |  5 +-
 internal/reader/scan_cluster_reader.go    | 21 ++------
 internal/reader/scan_standalone_reader.go |  4 +-
 internal/reader/sync_cluster_reader.go    | 21 ++------
 internal/reader/sync_standalone_reader.go |  4 +-
 8 files changed, 60 insertions(+), 65 deletions(-)

diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go
index 2e35afc7..9d36eca0 100644
--- a/cmd/redis-shake/main.go
+++ b/cmd/redis-shake/main.go
@@ -121,39 +121,55 @@ func main() {
 
 	log.Infof("start syncing...")
 
-	chr := theReader.StartRead(ctx)
+	go waitShutdown(cancel)
+
+	chrs := theReader.StartRead(ctx)
 
 	theWriter.StartWrite(ctx)
 
-	go waitShutdown(cancel)
+	readerDone := make(chan bool)
+
+	for _, chr := range chrs {
+		go func(ch chan *entry.Entry) {
+			for e := range ch {
+				// calc arguments
+				e.Parse()
+				status.AddReadCount(e.CmdName)
+
+				// filter
+				if !filter.Filter(e) {
+					log.Debugf("skip command: %v", e)
+					continue
+				}
+
+				// run lua function
+				log.Debugf("function before: %v", e)
+				entries := luaRuntime.RunFunction(e)
+				log.Debugf("function after: %v", entries)
+
+				// write
+				for _, theEntry := range entries {
+					theEntry.Parse()
+					theWriter.Write(theEntry)
+					status.AddWriteCount(theEntry.CmdName)
+				}
+			}
+			readerDone <- true
+		}(chr)
+	}
 
 	ticker := time.NewTicker(1 * time.Second)
 	defer ticker.Stop()
+	readerCnt := len(chrs)
 Loop:
 	for {
 		select {
-		case e, ok := <-chr:
-			if !ok {
-				// ch has been closed, exit the loop
-				break Loop
+		case done := <-readerDone:
+			if done {
+				readerCnt--
 			}
-			// calc arguments
-			e.Parse()
-			status.AddReadCount(e.CmdName)
-
-			// filter
-			if !filter.Filter(e) {
-				log.Debugf("skip command: %v", e)
-				continue
-			}
-			log.Debugf("function before: %v", e)
-			entries := luaRuntime.RunFunction(e)
-			log.Debugf("function after: %v", entries)
-
-			for _, theEntry := range entries {
-				theEntry.Parse()
-				theWriter.Write(theEntry)
-				status.AddWriteCount(theEntry.CmdName)
+			if readerCnt == 0 {
+				break Loop
 			}
 		case <-ticker.C:
 			pingEntry := entry.NewEntry()
diff --git a/internal/reader/aof_reader.go b/internal/reader/aof_reader.go
index d21d1443..ea3a5fb6 100644
--- a/internal/reader/aof_reader.go
+++ b/internal/reader/aof_reader.go
@@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
 	return r
 }
 
-func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
+func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {
 	//init entry
 	r.ch = make(chan *entry.Entry, 1024)
 
@@ -101,5 +101,5 @@ func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
 
 	}()
 
-	return r.ch
+	return []chan *entry.Entry{r.ch}
 }
diff --git a/internal/reader/interface.go b/internal/reader/interface.go
index 9a84104d..5502bf3e 100644
--- a/internal/reader/interface.go
+++ b/internal/reader/interface.go
@@ -8,5 +8,5 @@ import (
 
 type Reader interface {
 	status.Statusable
-	StartRead(ctx context.Context) chan *entry.Entry
-}
\ No newline at end of file
+	StartRead(ctx context.Context) []chan *entry.Entry
+}
diff --git a/internal/reader/rdb_reader.go b/internal/reader/rdb_reader.go
index f99b49ce..63569e3f 100644
--- a/internal/reader/rdb_reader.go
+++ b/internal/reader/rdb_reader.go
@@ -8,6 +8,7 @@ import (
 	"RedisShake/internal/log"
 	"RedisShake/internal/rdb"
 	"RedisShake/internal/utils"
+
 	"github.com/dustin/go-humanize"
 )
 
@@ -41,7 +42,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
 	return r
 }
 
-func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
+func (r *rdbReader) StartRead(ctx context.Context) []chan *entry.Entry {
 	log.Infof("[%s] start read", r.stat.Name)
 	r.ch = make(chan *entry.Entry, 1024)
 	updateFunc := func(offset int64) {
@@ -58,7 +59,7 @@ func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
 		close(r.ch)
 	}()
 
-	return r.ch
+	return []chan *entry.Entry{r.ch}
 }
 
 func (r *rdbReader) Status() interface{} {
diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go
index 42f6f1e6..4b2b59af 100644
--- a/internal/reader/scan_cluster_reader.go
+++ b/internal/reader/scan_cluster_reader.go
@@ -3,7 +3,6 @@ package reader
 import (
 	"context"
 	"fmt"
-	"sync"
 
 	"RedisShake/internal/entry"
 	"RedisShake/internal/utils"
@@ -26,23 +25,13 @@ func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
 	return rd
 }
 
-func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
-	ch := make(chan *entry.Entry, 1024)
-	var wg sync.WaitGroup
+func (rd *scanClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
+	chs := make([]chan *entry.Entry, 0)
 	for _, r := range rd.readers {
-		wg.Add(1)
-		go func(r Reader) {
-			for e := range r.StartRead(ctx) {
-				ch <- e
-			}
-			wg.Done()
-		}(r)
+		ch := r.StartRead(ctx)
+		chs = append(chs, ch[0])
 	}
-	go func() {
-		wg.Wait()
-		close(ch)
-	}()
-	return ch
+	return chs
 }
 
 func (rd *scanClusterReader) Status() interface{} {
diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go
index a4bea7d2..355edc34 100644
--- a/internal/reader/scan_standalone_reader.go
+++ b/internal/reader/scan_standalone_reader.go
@@ -85,7 +85,7 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade
 	return r
 }
 
-func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
+func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
 	r.ctx = ctx
 	if r.opts.Scan {
 		go r.scan()
@@ -95,7 +95,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
 	}
 	go r.dump()
 	go r.restore()
-	return r.ch
+	return []chan *entry.Entry{r.ch}
 }
 
 func (r *scanStandaloneReader) subscript() {
diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go
index 89986dad..c30d2db8 100644
--- a/internal/reader/sync_cluster_reader.go
+++ b/internal/reader/sync_cluster_reader.go
@@ -3,7 +3,6 @@ package reader
 import (
 	"context"
 	"fmt"
-	"sync"
 
 	"RedisShake/internal/entry"
 	"RedisShake/internal/log"
@@ -30,23 +29,13 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
 	return rd
 }
 
-func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
-	ch := make(chan *entry.Entry, 1024)
-	var wg sync.WaitGroup
+func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
+	chs := make([]chan *entry.Entry, 0)
 	for _, r := range rd.readers {
-		wg.Add(1)
-		go func(r Reader) {
-			defer wg.Done()
-			for e := range r.StartRead(ctx) {
-				ch <- e
-			}
-		}(r)
+		ch := r.StartRead(ctx)
+		chs = append(chs, ch[0])
 	}
-	go func() {
-		wg.Wait()
-		close(ch)
-	}()
-	return ch
+	return chs
 }
 
 func (rd *syncClusterReader) Status() interface{} {
diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go
index 77542f7d..27b83293 100644
--- a/internal/reader/sync_standalone_reader.go
+++ b/internal/reader/sync_standalone_reader.go
@@ -93,7 +93,7 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
 	return r
 }
 
-func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
+func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
 	r.ctx = ctx
 	r.ch = make(chan *entry.Entry, 1024)
 	go func() {
@@ -113,7 +113,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
 		close(r.ch)
 	}()
 
-	return r.ch
+	return []chan *entry.Entry{r.ch}
 }
 
 func (r *syncStandaloneReader) sendReplconfListenPort() {

From 3bb3299dc4277b4a3644847107a2b3a8b8dae7b8 Mon Sep 17 00:00:00 2001
From: OxalisCu <2127298698@qq.com>
Date: Mon, 14 Oct 2024 20:03:06 +0800
Subject: [PATCH 3/4] feat: parallel entry allocation

Signed-off-by: OxalisCu <2127298698@qq.com>
---
 internal/writer/redis_cluster_writer.go | 43 ++++++++++---------------
 1 file changed, 17 insertions(+), 26 deletions(-)

diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go
index 14b2d46b..fbec7344 100644
--- a/internal/writer/redis_cluster_writer.go
+++ b/internal/writer/redis_cluster_writer.go
@@ -70,35 +70,26 @@ func (r *RedisClusterWriter) StartWrite(ctx context.Context) chan *entry.Entry {
 		chs[stat.Name] = w.StartWrite(ctx)
 	}
 
-	r.chWg = sync.WaitGroup{}
-	r.chWg.Add(1)
-	go func() {
-		for entry := range r.ch {
-			if len(entry.Slots) == 0 {
-				for _, writer := range r.writers {
-					writer.Write(entry)
-				}
-				continue
-			}
-			lastSlot := -1
-			for _, slot := range entry.Slots {
-				if lastSlot == -1 {
-					lastSlot = slot
-				}
-				if slot != lastSlot {
-					log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
-				}
-			}
-			r.router[lastSlot].Write(entry)
-		}
-		r.chWg.Done()
-	}()
-
-	return r.ch
+	return nil
 }
 
 func (r *RedisClusterWriter) Write(entry *entry.Entry) {
-	r.ch <- entry
+	if len(entry.Slots) == 0 {
+		for _, writer := range r.writers {
+			writer.Write(entry)
+		}
+		return
+	}
+	lastSlot := -1
+	for _, slot := range entry.Slots {
+		if lastSlot == -1 {
+			lastSlot = slot
+		}
+		if slot != lastSlot {
+			log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
+		}
+	}
+	r.router[lastSlot].Write(entry)
 }
 
 func (r *RedisClusterWriter) Consistent() bool {

From d22dbcc178349cc2816d55a986ad9766d66613d0 Mon Sep 17 00:00:00 2001
From: OxalisCu <2127298698@qq.com>
Date: Tue, 15 Oct 2024 23:53:03 +0800
Subject: [PATCH 4/4] feat: seperate ops log and status monitoring

---
 cmd/redis-shake/main.go        | 41 +++++++++++++++++++++++++++++++---
 internal/status/entry_count.go |  2 +-
 internal/status/status.go      | 33 ++++++---------------------
 3 files changed, 46 insertions(+), 30 deletions(-)

diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go
index 9d36eca0..e30cd725 100644
--- a/cmd/redis-shake/main.go
+++ b/cmd/redis-shake/main.go
@@ -5,6 +5,7 @@ import (
 	_ "net/http/pprof"
 	"os"
 	"os/signal"
+	"sync/atomic"
 	"syscall"
 	"time"
 
@@ -116,8 +117,16 @@ func main() {
 	default:
 		log.Panicf("no writer config entry found")
 	}
+
 	// create status
-	status.Init(theReader, theWriter)
+	if config.Opt.Advanced.StatusPort != 0 {
+		status.Init(theReader, theWriter)
+	}
+	// create log entry count
+	logEntryCount := status.EntryCount{
+		ReadCount:  0,
+		WriteCount: 0,
+	}
 
 	log.Infof("start syncing...")
 
@@ -134,7 +143,13 @@ func main() {
 			for e := range ch {
 				// calc arguments
 				e.Parse()
-				status.AddReadCount(e.CmdName)
+
+				// update reader status
+				if config.Opt.Advanced.StatusPort != 0 {
+					status.AddReadCount(e.CmdName)
+				}
+				// update log entry count
+				atomic.AddUint64(&logEntryCount.ReadCount, 1)
 
 				// filter
 				if !filter.Filter(e) {
@@ -151,13 +166,33 @@ func main() {
 				for _, theEntry := range entries {
 					theEntry.Parse()
 					theWriter.Write(theEntry)
-					status.AddWriteCount(theEntry.CmdName)
+
+					// update writer status
+					if config.Opt.Advanced.StatusPort != 0 {
+						status.AddWriteCount(theEntry.CmdName)
+					}
+					// update log entry count
+					atomic.AddUint64(&logEntryCount.WriteCount, 1)
 				}
 			}
 			readerDone <- true
 		}(chr)
 	}
 
+	// caluate ops and log to screen
+	go func() {
+		if config.Opt.Advanced.LogInterval <= 0 {
+			log.Infof("log interval is 0, will not log to screen")
+			return
+		}
+		ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
+		defer ticker.Stop()
+		for range ticker.C {
+			logEntryCount.UpdateOPS()
+			log.Infof("%s, %s", logEntryCount.String(), theReader.StatusString())
+		}
+	}()
+
 	ticker := time.NewTicker(1 * time.Second)
 	defer ticker.Stop()
 	readerCnt := len(chrs)
diff --git a/internal/status/entry_count.go b/internal/status/entry_count.go
index 02a933cc..42ad663a 100644
--- a/internal/status/entry_count.go
+++ b/internal/status/entry_count.go
@@ -18,7 +18,7 @@ type EntryCount struct {
 }
 
 // call this function every second
-func (e *EntryCount) updateOPS() {
+func (e *EntryCount) UpdateOPS() {
 	nowTimestampSec := float64(time.Now().UnixNano()) / 1e9
 	if e.lastUpdateTimestampSec != 0 {
 		timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec
diff --git a/internal/status/status.go b/internal/status/status.go
index e7c97faf..42d2c084 100644
--- a/internal/status/status.go
+++ b/internal/status/status.go
@@ -2,9 +2,6 @@ package status
 
 import (
 	"time"
-
-	"RedisShake/internal/config"
-	"RedisShake/internal/log"
 )
 
 type Statusable interface {
@@ -32,9 +29,6 @@ var theWriter Statusable
 
 func AddReadCount(cmd string) {
 	ch <- func() {
-		if stat.PerCmdEntriesCount == nil {
-			stat.PerCmdEntriesCount = make(map[string]EntryCount)
-		}
 		cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
 		if !ok {
 			cmdEntryCount = EntryCount{}
@@ -48,9 +42,6 @@ func AddReadCount(cmd string) {
 
 func AddWriteCount(cmd string) {
 	ch <- func() {
-		if stat.PerCmdEntriesCount == nil {
-			stat.PerCmdEntriesCount = make(map[string]EntryCount)
-		}
 		cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
 		if !ok {
 			cmdEntryCount = EntryCount{}
@@ -68,6 +59,11 @@ func Init(r Statusable, w Statusable) {
 	setStatusPort()
 	stat.Time = time.Now().Format("2006-01-02 15:04:05")
 
+	// init per cmd entries count
+	if stat.PerCmdEntriesCount == nil {
+		stat.PerCmdEntriesCount = make(map[string]EntryCount)
+	}
+
 	// for update reader/writer stat
 	go func() {
 		ticker := time.NewTicker(1 * time.Second)
@@ -81,29 +77,14 @@ func Init(r Statusable, w Statusable) {
 				stat.Consistent = lastConsistent && theReader.StatusConsistent() && theWriter.StatusConsistent()
 				lastConsistent = stat.Consistent
 				// update OPS
-				stat.TotalEntriesCount.updateOPS()
+				stat.TotalEntriesCount.UpdateOPS()
 				for _, cmdEntryCount := range stat.PerCmdEntriesCount {
-					cmdEntryCount.updateOPS()
+					cmdEntryCount.UpdateOPS()
 				}
 			}
 		}
 	}()
 
-	// for log to screen
-	go func() {
-		if config.Opt.Advanced.LogInterval <= 0 {
-			log.Infof("log interval is 0, will not log to screen")
-			return
-		}
-		ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
-		defer ticker.Stop()
-		for range ticker.C {
-			ch <- func() {
-				log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString())
-			}
-		}
-	}()
-
 	// run all func in ch
 	go func() {
 		for f := range ch {