diff --git a/contrib/candler/tickcandler/all_test.go b/contrib/candler/tickcandler/all_test.go index dcd6fb05..9fb74c96 100644 --- a/contrib/candler/tickcandler/all_test.go +++ b/contrib/candler/tickcandler/all_test.go @@ -148,5 +148,5 @@ func createTickBucket(symbol, rootDir string, catalogDir *Directory, txnPipe *ex buffer, _ := io.Serialize([]byte{}, row) w.WriteRecords([]time.Time{ts}, buffer, dsv) } - wf.RequestFlush() + wf.RequestFlush(txnPipe) } diff --git a/contrib/polygon/backfill/backfiller/backfiller.go b/contrib/polygon/backfill/backfiller/backfiller.go index 67a115a2..3a156eb1 100644 --- a/contrib/polygon/backfill/backfiller/backfiller.go +++ b/contrib/polygon/backfill/backfiller/backfiller.go @@ -72,7 +72,7 @@ func init() { func main() { rootDir, triggers, walRotateInterval := initConfig() - _, shutdownPending, walWG := initWriter(rootDir, triggers, walRotateInterval) + metadata, shutdownPending, walWG := initWriter(rootDir, triggers, walRotateInterval) // free memory in the background every 1 minute for long running backfills with very high parallelism go func() { @@ -237,7 +237,7 @@ func main() { *shutdownPending = true } walWG.Wait() - executor.FinishAndWait() + executor.FinishAndWait(metadata.TXNPipe) log.Info("[polygon] api call time %s", backfill.ApiCallTime) log.Info("[polygon] wait time %s", backfill.WaitTime) diff --git a/executor/wal.go b/executor/wal.go index bc48b6f5..5bd303b0 100644 --- a/executor/wal.go +++ b/executor/wal.go @@ -890,16 +890,16 @@ func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRota // The function blocks if there are no current queued flushes, and // returns if there is already one queued which will handle the data // present in the write channel, as it will flush as soon as possible. -func (wf *WALFileType) RequestFlush() { +func (wf *WALFileType) RequestFlush(txnPipe *TransactionPipe) { if !haveWALWriter { - wf.FlushToWAL(ThisInstance.TXNPipe) + wf.FlushToWAL(txnPipe) return } // if there's already a queued flush, no need to queue another - if len(ThisInstance.TXNPipe.flushChannel) > 0 { + if len(txnPipe.flushChannel) > 0 { return } f := make(chan struct{}) - ThisInstance.TXNPipe.flushChannel <- f + txnPipe.flushChannel <- f <-f } diff --git a/executor/writer.go b/executor/writer.go index ae83dc59..b0e5a428 100644 --- a/executor/writer.go +++ b/executor/writer.go @@ -285,6 +285,9 @@ func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error) { func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error) { start := time.Now() cDir := ThisInstance.CatalogDir + txnPipe := ThisInstance.TXNPipe + walfile := ThisInstance.WALFile + for tbk, cs := range csm { tf, err := tbk.GetTimeFrame() if err != nil { @@ -366,7 +369,7 @@ func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error) { /* Create a writer for this TimeBucket */ - w, err := NewWriter(tbi, ThisInstance.TXNPipe, cDir, ThisInstance.WALFile) + w, err := NewWriter(tbi, txnPipe, cDir, walfile) if err != nil { return err } @@ -374,8 +377,7 @@ func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error) { rowData := cs.ToRowSeries(tbk, alignData).GetData() w.WriteRecords(times, rowData, dbDSV) } - walfile := ThisInstance.WALFile - walfile.RequestFlush() + walfile.RequestFlush(txnPipe) metrics.WriteCSMDuration.Observe(time.Since(start).Seconds()) return nil } diff --git a/executor/written.go b/executor/written.go index 8d2d26ef..eec5bcd9 100644 --- a/executor/written.go +++ b/executor/written.go @@ -72,10 +72,10 @@ func fire(trig trigger.Trigger, key string, records []trigger.Record) { // FinishAndWait closes the writtenIndexes channel, and waits // for the remaining triggers to fire, returning -func FinishAndWait() { +func FinishAndWait(txnPipe *TransactionPipe) { triggerWg.Wait() for { - if len(ThisInstance.TXNPipe.writeChannel) == 0 && len(c) == 0 { + if len(txnPipe.writeChannel) == 0 && len(c) == 0 { close(c) <-done return diff --git a/sqlparser/insertintostatement.go b/sqlparser/insertintostatement.go index 94ee2736..cfd8eb69 100644 --- a/sqlparser/insertintostatement.go +++ b/sqlparser/insertintostatement.go @@ -122,7 +122,7 @@ func (is *InsertIntoStatement) Materialize(catDir *catalog.Directory) (outputCol } writer.WriteRecords(indexTime, data, targetDSV) - wal.RequestFlush() + wal.RequestFlush(tgc) outputColumnSeries = io.NewColumnSeries() outputColumnSeries.AddColumn("Epoch", diff --git a/uda/adjust/caloader.go b/uda/adjust/caloader.go index 03629d3f..668923f3 100644 --- a/uda/adjust/caloader.go +++ b/uda/adjust/caloader.go @@ -113,7 +113,7 @@ func NewCorporateActions(symbol string) *Actions { } func (act *Actions) Load(catalogDir *catalog.Directory) error { - if executor.ThisInstance == nil || catalogDir == nil { + if catalogDir == nil { return nil } query := planner.NewQuery(catalogDir)