Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

cdb: debug tool to read raw extent data from store #171

Merged
merged 5 commits into from
Apr 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ cherami-cassandra-tool: $(DEPS)
cherami-store-tool: $(DEPS)
go build -i $(EMBED) -o cherami-store-tool cmd/tools/store/main.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool
cdb: $(DEPS)
go build -i $(EMBED) -o cdb cmd/tools/cdb/*.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb

cover_profile: lint bins
@mkdir -p $(BUILD)
Expand Down
225 changes: 225 additions & 0 deletions cmd/tools/cdb/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package main

import (
"flag"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"time"

"github.com/uber/cherami-server/storage"
"github.com/uber/cherami-server/storage/manyrocks"
)

type arguments struct {
store string
extent string
baseDir string
start storage.Key
end storage.Key
num int
printVal bool
formatTime bool
help bool
}

func main() {

args := parseArgs()

if args == nil {
return
}

var db storage.ExtentStore
var err error

switch args.store {
case "manyrocks":
db, err = manyrocks.OpenExtentDB(storage.ExtentUUID(args.extent), fmt.Sprintf("%s/%s", args.baseDir, args.extent))
if err != nil {
fmt.Printf("error opening db (%s): %v\n", args.baseDir, err)
return
}

default:
fmt.Printf("unsupported store: %s\n", args.store)
return
}

defer db.CloseExtentDB()

dumpExtentDB(db, args)

return
}

func dumpExtentDB(db storage.ExtentStore, args *arguments) {

addr, key, err := db.SeekCeiling(args.start)

if err != nil {
fmt.Printf("db.SeekCeiling(%x) error: %v\n", args.start, err)
return
}

var val storage.Value

if args.printVal {

_, val, _, _, err = db.Get(addr)

if err != nil {
fmt.Printf("db.Get(%x) errored: %v\n", addr, err)
return
}
}

var num int
for (addr != storage.EOX) && (key < args.end) && (num < args.num) {

// fmt.Printf("key = %d %x %v %p\n", key, key, key, key)

if isSealExtentKey(key) {

sealSeqNum := deconstructSealExtentKey(key)

if sealSeqNum == seqNumUnspecifiedSeal {
fmt.Printf("0x%016v => SEALED (seqnum: unspecified)\n", key)
} else {
fmt.Printf("0x%016v => SEALED (seqnum: %d)\n", key, sealSeqNum)
}

} else {

ts, seqnum := deconstructKey(key)

if args.printVal {

var enqTime, payload, vTime string

msg, errd := deserializeMessage(val)
enq := msg.GetEnqueueTimeUtc()

if errd != nil { // corrupt message?

payload = fmt.Sprintf("ERROR deserializing data: %v (val=%v)", errd, val)

} else {

if args.formatTime {
enqTime = time.Unix(0, enq).Format(time.RFC3339Nano)
} else {
enqTime = strconv.FormatInt(enq, 16)
}

payload = fmt.Sprintf("seq=%d enq=%v data=%d bytes",
msg.GetSequenceNumber(), enqTime, len(msg.GetPayload().GetData()))
// payload = msg.String()
}

if args.formatTime {
vTime = time.Unix(0, ts).Format(time.RFC3339Nano)
} else {
vTime = strconv.FormatInt(ts, 16)
}

fmt.Printf("0x%016v => #%d ts=%v payload:[%v]\n", key, seqnum, vTime, payload)

} else {

fmt.Printf("0x%016v => #%d ts=%v\n", key, seqnum, ts)
}
}

num++

if args.printVal {

if key, val, addr, _, err = db.Get(addr); err != nil {
fmt.Printf("db.Get(%x) errored: %v\n", addr, err)
break
}

} else {

if addr, key, err = db.Next(addr); err != nil {
fmt.Printf("db.Next(%x) errored: %v\n", addr, err)
break
}
}
}

fmt.Printf("summary: dumped %d keys in range [%v, %v)\n", num, args.start, args.end)
return
}

func parseArgs() (args *arguments) {

args = &arguments{}

flag.StringVar(&args.store, "store", "manyrocks", "store")
flag.StringVar(&args.extent, "x", "", "extent")
flag.StringVar(&args.baseDir, "base", ".", "base dir")

var start, end, num string

flag.StringVar(&start, "s", "-1", "start range")
flag.StringVar(&end, "e", "-1", "end range")
flag.StringVar(&num, "n", "-1", "number of values")

flag.BoolVar(&args.printVal, "v", false, "deserialize payload")
flag.BoolVar(&args.formatTime, "t", false, "format time")

flag.BoolVar(&args.help, "?", false, "help")
flag.BoolVar(&args.help, "help", false, "help")

flag.Parse()

switch {
case args.extent == "":
cwd, _ := os.Getwd()

args.extent = filepath.Base(cwd)
args.baseDir = filepath.Dir(cwd)

case args.baseDir == "":
args.baseDir, _ = os.Getwd()
}

switch i, err := strconv.ParseInt(start, 0, 64); {
case err != nil:
fmt.Printf("error parsing start arg (%s): %v\n", start, err)
return nil
case i < 0:
args.start = 0
default:
args.start = storage.Key(i)
}

switch i, err := strconv.ParseInt(end, 0, 64); {
case err != nil:
fmt.Printf("error parsing end arg (%s): %v\n", end, err)
return nil
case i < 0:
args.end = storage.Key(math.MaxInt64)
default:
args.end = storage.Key(i)
}

switch i, err := strconv.ParseInt(num, 0, 64); {
case err != nil:
fmt.Printf("error parsing num arg (%s): %v\n", num, err)
return nil
case i < 0:
args.num = math.MaxInt64
default:
args.num = int(i)
}

// fmt.Printf("args=%v\n", args)

return args
}
54 changes: 54 additions & 0 deletions cmd/tools/cdb/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"math"

"github.com/uber/cherami-server/storage"
"github.com/uber/cherami-thrift/.generated/go/store"

"github.com/apache/thrift/lib/go/thrift"
)

// -- decode message/address -- //
const (
seqNumBits = 26

invalidKey = math.MaxInt64

seqNumBitmask = (int64(1) << seqNumBits) - 1
timestampBitmask = math.MaxInt64 &^ seqNumBitmask
seqNumMax = int64(math.MaxInt64-2) & seqNumBitmask

seqNumUnspecifiedSeal = int64(math.MaxInt64 - 1)
)

func deconstructKey(key storage.Key) (visibilityTime int64, seqNum int64) {
return int64(int64(key) & timestampBitmask), int64(int64(key) & seqNumBitmask)
}

func deconstructSealExtentKey(key storage.Key) (seqNum int64) {

seqNum = int64(key) & seqNumBitmask

// we use the special seqnum ('MaxInt64 - 1') when the extent has been sealed
// at an "unspecified" seqnum; check for this case, and return appropriate value
if seqNum == (seqNumBitmask - 1) {
seqNum = seqNumUnspecifiedSeal
}

return
}

func isSealExtentKey(key storage.Key) bool {
return key != storage.InvalidKey && (int64(key)&timestampBitmask) == timestampBitmask
}

func deserializeMessage(data []byte) (*store.AppendMessage, error) {
msg := &store.AppendMessage{}
deserializer := thrift.NewTDeserializer()
if err := deserializer.Read(msg, data); err != nil {
return nil, err
}

return msg, nil
}
45 changes: 45 additions & 0 deletions storage/manyrocks/manyrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
s "github.com/uber/cherami-server/storage"

"github.com/Sirupsen/logrus"
)

const rocksdbNotExistError = "does not exist (create_if_missing is false)"
Expand Down Expand Up @@ -115,6 +117,49 @@ func New(opts *Opts, log bark.Logger) (*ManyRocks, error) {
}, nil
}

// OpenExtentDB gets a handle to the raw extent DB
func OpenExtentDB(id s.ExtentUUID, path string) (*Rock, error) {

// setup RocksDB options
opts := gorocksdb.NewDefaultOptions()

opts.SetCreateIfMissing(false)

db, err := gorocksdb.OpenDb(opts, path)
if err != nil {
return nil, err
}

// setup read/write options used with IO
readOpts := gorocksdb.NewDefaultReadOptions()

writeOpts := gorocksdb.NewDefaultWriteOptions()

return &Rock{
id: id,
path: path,
keyPattern: s.IncreasingKeys,
notify: func(key s.Key, addr s.Address) {},
db: db,
opts: opts,
readOpts: readOpts,
writeOpts: writeOpts,
store: &ManyRocks{
logger: bark.NewLoggerFromLogrus(logrus.StandardLogger()),
},
}, nil
}

// CloseExtentDB closes the handle to the raw extent DB
func (t *Rock) CloseExtentDB() {

t.writeOpts.Destroy()
t.readOpts.Destroy()
t.db.Close()
t.opts.Destroy()
t.notify = nil
}

// getDBPath returns the base-dir to use for the DB
func (t *ManyRocks) getDBPath(id s.ExtentUUID) string {
return fmt.Sprintf("%s/%v", t.opts.BaseDir, id) // NB: 'baseDir' should already created
Expand Down
11 changes: 11 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type StoreManager interface {
GetExtentInfo(id ExtentUUID) (*ExtentInfo, error)
}

// OpenExtentDB gets a handle to the raw extent DB
// func OpenExtentDB(id ExtentUUID, path string) (ExtentStore, error)

// -- EXTENT STORE -- //
// The extent-store is designed and expected to be used concurrently by
// at most one-writer and any number of readers.
Expand Down Expand Up @@ -302,6 +305,14 @@ type ExtentStore interface {
// Returns:
// none
Close()

// CloseExtentDB is closes the handle when opened using OpenExtentDB
// Args:
// none
//
// Returns:
// none
CloseExtentDB()
}

// -- Misc utility functions -- //
Expand Down