Skip to content

Commit

Permalink
Fix Scanner error: token limit exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi authored and jchappelow committed Jul 15, 2024
1 parent 4702a91 commit c674ee9
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ api/openapi-spec/api/v0/api.swagger.json
.goreleaser.yaml
.env
test/stress/stress
**/debug.test*

gen/
# helm charts deps
Expand Down
9 changes: 7 additions & 2 deletions cmd/kwil-admin/cmds/snapshot/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ WHERE pg_stat_activity.datname = 'kwild'

func createCmd() *cobra.Command {
var snapshotDir, dbName, dbUser, dbPass, dbHost, dbPort string
var maxRowSize int
cmd := &cobra.Command{
Use: "create",
Short: "Creates a snapshot of the database.",
Expand All @@ -61,7 +62,7 @@ func createCmd() *cobra.Command {
return fmt.Errorf("failed to expand snapshot directory path: %w", err)
}

return pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, snapshotDir)
return pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, maxRowSize, snapshotDir)
},
}

Expand All @@ -71,6 +72,7 @@ func createCmd() *cobra.Command {
cmd.Flags().StringVar(&dbPass, "password", "", "Password for the database user")
cmd.Flags().StringVar(&dbHost, "host", "localhost", "Host of the database")
cmd.Flags().StringVar(&dbPort, "port", "5432", "Port of the database")
cmd.Flags().IntVar(&maxRowSize, "max-row-size", 4*1024*1024, "Maximum row size to read from pg_dump (default: 4MB). Adjust this accordingly if you encounter 'bufio.Scanner: token too long' error.")

return cmd
}
Expand All @@ -86,7 +88,7 @@ func expandPath(path string) (string, error) {
return filepath.Abs(path)
}

func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort, snapshotDir string) (err error) {
func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, maxRowSize int, snapshotDir string) (err error) {
// Check if the snapshot directory exists, if not create it
err = os.MkdirAll(snapshotDir, 0755)
if err != nil {
Expand Down Expand Up @@ -170,7 +172,10 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort, snapsho
var totalBytes int64

// Pass the output of pg_dump through scanner to sanitize it
buf := make([]byte, maxRowSize)
scanner := bufio.NewScanner(pgDumpOutput)
scanner.Buffer(buf, maxRowSize)

for scanner.Scan() {
line := scanner.Text()

Expand Down
2 changes: 2 additions & 0 deletions cmd/kwild/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type SnapshotConfig struct {
RecurringHeight uint64 `mapstructure:"recurring_height"`
MaxSnapshots uint64 `mapstructure:"max_snapshots"`
SnapshotDir string `mapstructure:"snapshot_dir"`
MaxRowSize int `mapstructure:"max_row_size"`
}

type ChainRPCConfig struct {
Expand Down Expand Up @@ -579,6 +580,7 @@ func DefaultConfig() *KwildConfig {
RecurringHeight: 14400, // 1 day at 6s block time
MaxSnapshots: 3,
SnapshotDir: SnapshotDirName,
MaxRowSize: 4 * 1024 * 1024,
},
GenesisState: "",
},
Expand Down
2 changes: 2 additions & 0 deletions cmd/kwild/config/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ recurring_height = 10000
# Maximum number of snapshots to store
max_snapshots = 3

# Max row size that can be parsed by the snapshot store
max_row_size = 4194304
#######################################################################
### Logging Config Options ###
#######################################################################
Expand Down
1 change: 1 addition & 0 deletions cmd/kwild/config/test_data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ recurring_height = 10000
# Maximum number of snapshots to store
max_snapshots = 3

max_row_size = 4194304
#######################################################################
### Logging Config Options ###
#######################################################################
Expand Down
1 change: 1 addition & 0 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ func buildSnapshotter(d *coreDependencies) *statesync.SnapshotStore {
SnapshotDir: cfg.Snapshots.SnapshotDir,
RecurringHeight: cfg.Snapshots.RecurringHeight,
MaxSnapshots: int(cfg.Snapshots.MaxSnapshots),
MaxRowSize: cfg.Snapshots.MaxRowSize,
}

ss, err := statesync.NewSnapshotStore(snapshotCfg, dbCfg, *d.log.Named("snapshot-store"))
Expand Down
7 changes: 6 additions & 1 deletion internal/statesync/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ const (
type Snapshotter struct {
dbConfig *DBConfig
snapshotDir string
maxRowSize int
log log.Logger
}

func NewSnapshotter(cfg *DBConfig, dir string, logger log.Logger) *Snapshotter {
func NewSnapshotter(cfg *DBConfig, dir string, MaxRowSize int, logger log.Logger) *Snapshotter {
return &Snapshotter{
dbConfig: cfg,
snapshotDir: dir,
maxRowSize: MaxRowSize,
log: logger,
}
}
Expand Down Expand Up @@ -190,7 +192,10 @@ func (s *Snapshotter) sanitizeDump(height uint64, format uint32) ([]byte, error)
defer outputFile.Close()

// Scanner to read the dump file line by line
buf := make([]byte, s.maxRowSize)
scanner := bufio.NewScanner(dumpInst1)
scanner.Buffer(buf, s.maxRowSize)

var inCopyBlock bool
var lineHashes []hashedLine
var offset int64
Expand Down
3 changes: 2 additions & 1 deletion internal/statesync/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
const (
dump1File = "test_data/dump1.sql"
dump2File = "test_data/dump2.sql"
tokenSize = 64 * 1024 // 64KB
)

func TestSanitizeLogicalDump(t *testing.T) {
dir := t.TempDir()
logger := log.NewStdOut(log.DebugLevel)
// Create a snapshotter
snapshotter := NewSnapshotter(nil, dir, logger)
snapshotter := NewSnapshotter(nil, dir, tokenSize, logger)

// Create snapshot directory
height := uint64(1)
Expand Down
3 changes: 2 additions & 1 deletion internal/statesync/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type SnapshotConfig struct {
SnapshotDir string
MaxSnapshots int
RecurringHeight uint64
MaxRowSize int
}

type SnapshotStore struct {
Expand All @@ -80,7 +81,7 @@ type DBSnapshotter interface {
}

func NewSnapshotStore(cfg *SnapshotConfig, dbCfg *DBConfig, logger log.Logger) (*SnapshotStore, error) {
snapshotter := NewSnapshotter(dbCfg, cfg.SnapshotDir, logger)
snapshotter := NewSnapshotter(dbCfg, cfg.SnapshotDir, cfg.MaxRowSize, logger)
ss := &SnapshotStore{
cfg: cfg,
snapshots: make(map[uint64]*Snapshot),
Expand Down

0 comments on commit c674ee9

Please sign in to comment.