From 043502a93d188e17ec2e6892fcee0d422f35b6dd Mon Sep 17 00:00:00 2001 From: nabarun Date: Thu, 23 Jun 2022 16:39:21 +0530 Subject: [PATCH] Close files in CSV writer --- cmd/geth/config.go | 2 +- cmd/geth/main.go | 2 +- cmd/geth/usage.go | 2 +- cmd/utils/flags.go | 4 +- statediff/indexer/database/file/config.go | 2 +- statediff/indexer/database/file/csv_writer.go | 38 +++++++++++++------ statediff/indexer/database/file/interfaces.go | 13 +++++-- statediff/types/schema.go | 2 +- 8 files changed, 43 insertions(+), 22 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 6ece54ed66d3..8871790598ee 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -219,7 +219,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { indexerConfig = file.Config{ Mode: fileMode, - OutputDir: ctx.GlobalString(utils.StateDiffFileCsvOutput.Name), + OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name), FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name), } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b9d18b6af8e0..29f983c9dc59 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -172,7 +172,7 @@ var ( utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, utils.StateDiffFileMode, - utils.StateDiffFileCsvOutput, + utils.StateDiffFileCsvDir, utils.StateDiffFilePath, utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 7251a3c561bf..407f569b4d25 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -245,7 +245,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, utils.StateDiffFileMode, - utils.StateDiffFileCsvOutput, + utils.StateDiffFileCsvDir, utils.StateDiffFilePath, utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cb3cb1601acd..9689fd8cd7a8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -907,8 +907,8 @@ var ( Usage: "Statediff file writing mode (current options: csv, sql)", Value: "csv", } - StateDiffFileCsvOutput = cli.StringFlag{ - Name: "statediff.file.csvoutput", + StateDiffFileCsvDir = cli.StringFlag{ + Name: "statediff.file.csvdir", Usage: "Full path of output directory to write statediff data out to when operating in csv file mode", } StateDiffFilePath = cli.StringFlag{ diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index 5b1ccb70dc6a..2a69b4555cb8 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -45,7 +45,7 @@ func ResolveFileMode(str string) (FileMode, error) { } } -// Config holds params for writing CSV files out to a directory +// Config holds params for writing out CSV or SQL files type Config struct { Mode FileMode OutputDir string diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index a33a36106b19..ae4f68e1391d 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -67,6 +67,7 @@ type CSVWriter struct { type fileWriter struct { *csv.Writer + file *os.File } // fileWriters wraps the file writers for each output table @@ -77,13 +78,13 @@ func newFileWriter(path string) (ret fileWriter, err error) { if err != nil { return } - ret = fileWriter{csv.NewWriter(file)} - return -} -func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error { - row := tbl.ToCsvRow(args...) - return tx[tbl.Name].Write(row) + ret = fileWriter{ + Writer: csv.NewWriter(file), + file: file, + } + + return } func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) { @@ -101,6 +102,21 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) { return writers, nil } +func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error { + row := tbl.ToCsvRow(args...) + return tx[tbl.Name].Write(row) +} + +func (tx fileWriters) close() error { + for _, w := range tx { + err := w.file.Close() + if err != nil { + return err + } + } + return nil +} + func (tx fileWriters) flush() error { for _, w := range tx { w.Flush() @@ -137,10 +153,10 @@ func (csw *CSVWriter) Loop() { for { select { case row := <-csw.rows: - // TODO: Check available buffer size and flush - csw.writers.flush() - - csw.writers.write(&row.table, row.values...) + err := csw.writers.write(&row.table, row.values...) + if err != nil { + panic(fmt.Sprintf("error writing csv buffer: %v", err)) + } case <-csw.quitChan: if err := csw.writers.flush(); err != nil { panic(fmt.Sprintf("error writing csv buffer to file: %v", err)) @@ -168,7 +184,7 @@ func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv") func (csw *CSVWriter) Close() error { close(csw.quitChan) <-csw.doneChan - return nil + return csw.writers.close() } func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { diff --git a/statediff/indexer/database/file/interfaces.go b/statediff/indexer/database/file/interfaces.go index 700df4eac1f3..f751a2c196f8 100644 --- a/statediff/indexer/database/file/interfaces.go +++ b/statediff/indexer/database/file/interfaces.go @@ -25,14 +25,13 @@ import ( // Writer interface required by the file indexer type FileWriter interface { + // Methods used to control the writer Loop() Close() error Flush() + + // Methods to write out data to tables upsertNode(node nodeinfo.Info) - upsertIPLD(ipld models.IPLDModel) - upsertIPLDDirect(blockNumber, key string, value []byte) - upsertIPLDNode(blockNumber string, i node.Node) - upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) upsertHeaderCID(header models.HeaderModel) upsertUncleCID(uncle models.UncleModel) upsertTransactionCID(transaction models.TxModel) @@ -42,4 +41,10 @@ type FileWriter interface { upsertStateCID(stateNode models.StateNodeModel) upsertStateAccount(stateAccount models.StateAccountModel) upsertStorageCID(storageCID models.StorageNodeModel) + upsertIPLD(ipld models.IPLDModel) + + // Methods to upsert IPLD in different ways + upsertIPLDDirect(blockNumber, key string, value []byte) + upsertIPLDNode(blockNumber string, i node.Node) + upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) } diff --git a/statediff/types/schema.go b/statediff/types/schema.go index d2018a98dbb1..e7eba3526075 100644 --- a/statediff/types/schema.go +++ b/statediff/types/schema.go @@ -161,7 +161,7 @@ var TableLog = Table{ } var TableStateAccount = Table{ - "eth.state_account", + "eth.state_accounts", []column{ {name: "block_number", typ: bigint}, {name: "header_id", typ: varchar},