Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some bugs #776

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"RedisShake/internal/config"
"RedisShake/internal/entry"
Expand Down Expand Up @@ -120,20 +121,37 @@ func main() {
ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

for e := range ch {
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)

// filter
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

for _, entry := range entries {
entry.Parse()
theWriter.Write(entry)
status.AddWriteCount(entry.CmdName)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Loop:
for {
select {
case e, ok := <-ch:
if !ok {
// ch has been closed, exit the loop
break Loop
}
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)

// filter
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
status.AddWriteCount(theEntry.CmdName)
}
case <-ticker.C:
pingEntry := entry.NewEntry()
pingEntry.DbId = 0
pingEntry.CmdName = "PING"
pingEntry.Argv = []string{"PING"}
pingEntry.Group = "connection"
theWriter.Write(pingEntry)
}
}

Expand Down
6 changes: 4 additions & 2 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo
Timeout: 5 * time.Minute,
KeepAlive: 5 * time.Minute,
}
ctxWithDeadline, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
var err error
if Tls {
tlsDialer := &tls.Dialer{
NetDialer: dialer,
Config: &tls.Config{InsecureSkipVerify: true},
}
conn, err = tlsDialer.DialContext(ctx, "tcp", address)
conn, err = tlsDialer.DialContext(ctxWithDeadline, "tcp", address)
} else {
conn, err = dialer.DialContext(ctx, "tcp", address)
conn, err = dialer.DialContext(ctxWithDeadline, "tcp", address)
}
if err != nil {
log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type AdvancedOptions struct {
Ncpu int `mapstructure:"ncpu" default:"0"`

PprofPort int `mapstructure:"pprof_port" default:"0"`
StatusPort int `mapstructure:"status_port" default:"6479"`
StatusPort int `mapstructure:"status_port" default:"0"`

// log
LogFile string `mapstructure:"log_file" default:"shake.log"`
Expand Down
54 changes: 43 additions & 11 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@ import (
)

const (
kFlagFunction2 = 245 // function library data
kFlagFunction = 246 // old function library data for 7.0 rc1 and rc2
kFlagModuleAux = 247 // Module auxiliary data.
kFlagIdle = 0xf8 // LRU idle time.
kFlagFreq = 0xf9 // LFU frequency.
kFlagAUX = 0xfa // RDB aux field.
kFlagResizeDB = 0xfb // Hash table resize hint.
kFlagExpireMs = 0xfc // Expire time in milliseconds.
kFlagExpire = 0xfd // Old expire time in seconds.
kFlagSelect = 0xfe // DB number of the following keys.
kEOF = 0xff // End of the RDB file.
kFlagFunction2 = 245 // function library data
kFlagFunction = 246 // old function library data for 7.0 rc1 and rc2
kFlagModuleAux = 247 // RDB_OPCODE_MODULE_AUX: Module auxiliary data.
kFlagIdle = 248 // RDB_OPCODE_IDLE: LRU idle time.
kFlagFreq = 249 // RDB_OPCODE_FREQ: LFU frequency.
kFlagAUX = 250 // RDB_OPCODE_AUX: RDB aux field.
kFlagResizeDB = 251 // RDB_OPCODE_RESIZEDB: Hash table resize hint.
kFlagExpireMs = 252 // RDB_OPCODE_EXPIRETIME_MS: Expire time in milliseconds.
kFlagExpire = 253 // RDB_OPCODE_EXPIRETIME: Old expire time in seconds.
kFlagSelect = 254 // RDB_OPCODE_SELECTDB: DB number of the following keys.
kEOF = 255 // RDB_OPCODE_EOF: End of the RDB file.
)

const (
kRDBModuleOpcodeEOF = 0 // RDB_MODULE_OPCODE_EOF: End of module value.
kRDBModuleOpcodeSINT = 1 // RDB_MODULE_OPCODE_SINT: Signed integer.
kRDBModuleOpcodeUINT = 2 // RDB_MODULE_OPCODE_UINT: Unsigned integer.
kRDBModuleOpcodeFLOAT = 3 // RDB_MODULE_OPCODE_FLOAT: Float.
kRDBModuleOpcodeDOUBLE = 4 // RDB_MODULE_OPCODE_DOUBLE: Double.
kRDBModuleOpcodeSTRING = 5 // RDB_MODULE_OPCODE_STRING: String.
)

type Loader struct {
Expand Down Expand Up @@ -113,7 +122,30 @@ func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) {
defer ticker.Stop()
for {
typeByte := structure.ReadByte(rd)
log.Debugf("RDB type byte is: [%d]", typeByte)
switch typeByte {
case kFlagModuleAux:
moduleId := structure.ReadLength(rd) // module id
moduleName := types.ModuleTypeNameByID(moduleId)
log.Debugf("[%s] RDB module aux: module_id=[%d], module_name=[%s]", ld.name, moduleId, moduleName)
_ = structure.ReadLength(rd) // when_opcode
_ = structure.ReadLength(rd) // when
opcode := structure.ReadLength(rd)
for opcode != kRDBModuleOpcodeEOF {
switch opcode {
case kRDBModuleOpcodeSINT, kRDBModuleOpcodeUINT:
_ = structure.ReadLength(rd)
case kRDBModuleOpcodeFLOAT:
_ = structure.ReadFloat(rd)
case kRDBModuleOpcodeDOUBLE:
_ = structure.ReadDouble(rd)
case kRDBModuleOpcodeSTRING:
_ = structure.ReadString(rd)
default:
log.Panicf("module aux opcode not found. module_name=[%s], opcode=[%d]", moduleName, opcode)
}
opcode = structure.ReadLength(rd)
}
case kFlagIdle:
ld.idle = int64(structure.ReadLength(rd))
case kFlagFreq:
Expand Down
2 changes: 1 addition & 1 deletion internal/rdb/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func ParseObject(rd io.Reader, typeByte byte, key string) RedisObject {
return nil
}

func moduleTypeNameByID(moduleId uint64) string {
func ModuleTypeNameByID(moduleId uint64) string {
nameList := make([]byte, 9)
moduleId >>= 10
for i := 8; i >= 0; i-- {
Expand Down
2 changes: 1 addition & 1 deletion internal/rdb/types/module2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func PareseModuleType(rd io.Reader, key string, typeByte byte) ModuleObject {
log.Panicf("module type with version 1 is not supported, key=[%s]", key)
}
moduleId := structure.ReadLength(rd)
moduleName := moduleTypeNameByID(moduleId)
moduleName := ModuleTypeNameByID(moduleId)
switch moduleName {
case "exstrtype":
o := new(TairStringObject)
Expand Down
7 changes: 7 additions & 0 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -133,6 +134,12 @@ func (r *syncStandaloneReader) sendPSync() {

// format: \n\n\n+<reply>\r\n
for {
select {
case <-r.ctx.Done():
close(r.ch)
runtime.Goexit() // stop goroutine
default:
}
bytes, err := r.rd.Peek(1)
if err != nil {
log.Panicf(err.Error())
Expand Down
Loading