Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: stop using ThisInstance singleton #447

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/candler/tickcandler/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,5 +148,5 @@ func createTickBucket(symbol, rootDir string, catalogDir *Directory, txnPipe *ex
buffer, _ := io.Serialize([]byte{}, row)
w.WriteRecords([]time.Time{ts}, buffer, dsv)
}
wf.RequestFlush()
wf.RequestFlush(txnPipe)
}
4 changes: 2 additions & 2 deletions contrib/polygon/backfill/backfiller/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func init() {

func main() {
rootDir, triggers, walRotateInterval := initConfig()
_, shutdownPending, walWG := initWriter(rootDir, triggers, walRotateInterval)
metadata, shutdownPending, walWG := initWriter(rootDir, triggers, walRotateInterval)

// free memory in the background every 1 minute for long running backfills with very high parallelism
go func() {
Expand Down Expand Up @@ -237,7 +237,7 @@ func main() {
*shutdownPending = true
}
walWG.Wait()
executor.FinishAndWait()
executor.FinishAndWait(metadata.TXNPipe)

log.Info("[polygon] api call time %s", backfill.ApiCallTime)
log.Info("[polygon] wait time %s", backfill.WaitTime)
Expand Down
8 changes: 4 additions & 4 deletions executor/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,16 +890,16 @@ func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRota
// The function blocks if there are no current queued flushes, and
// returns if there is already one queued which will handle the data
// present in the write channel, as it will flush as soon as possible.
func (wf *WALFileType) RequestFlush() {
func (wf *WALFileType) RequestFlush(txnPipe *TransactionPipe) {
if !haveWALWriter {
wf.FlushToWAL(ThisInstance.TXNPipe)
wf.FlushToWAL(txnPipe)
return
}
// if there's already a queued flush, no need to queue another
if len(ThisInstance.TXNPipe.flushChannel) > 0 {
if len(txnPipe.flushChannel) > 0 {
return
}
f := make(chan struct{})
ThisInstance.TXNPipe.flushChannel <- f
txnPipe.flushChannel <- f
<-f
}
8 changes: 5 additions & 3 deletions executor/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error) {
func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error) {
start := time.Now()
cDir := ThisInstance.CatalogDir
txnPipe := ThisInstance.TXNPipe
walfile := ThisInstance.WALFile

for tbk, cs := range csm {
tf, err := tbk.GetTimeFrame()
if err != nil {
Expand Down Expand Up @@ -366,16 +369,15 @@ func WriteCSMInner(csm io.ColumnSeriesMap, isVariableLength bool) (err error) {
/*
Create a writer for this TimeBucket
*/
w, err := NewWriter(tbi, ThisInstance.TXNPipe, cDir, ThisInstance.WALFile)
w, err := NewWriter(tbi, txnPipe, cDir, walfile)
if err != nil {
return err
}

rowData := cs.ToRowSeries(tbk, alignData).GetData()
w.WriteRecords(times, rowData, dbDSV)
}
walfile := ThisInstance.WALFile
walfile.RequestFlush()
walfile.RequestFlush(txnPipe)
metrics.WriteCSMDuration.Observe(time.Since(start).Seconds())
return nil
}
4 changes: 2 additions & 2 deletions executor/written.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func fire(trig trigger.Trigger, key string, records []trigger.Record) {

// FinishAndWait closes the writtenIndexes channel, and waits
// for the remaining triggers to fire, returning
func FinishAndWait() {
func FinishAndWait(txnPipe *TransactionPipe) {
triggerWg.Wait()
for {
if len(ThisInstance.TXNPipe.writeChannel) == 0 && len(c) == 0 {
if len(txnPipe.writeChannel) == 0 && len(c) == 0 {
close(c)
<-done
return
Expand Down
2 changes: 1 addition & 1 deletion sqlparser/insertintostatement.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (is *InsertIntoStatement) Materialize(catDir *catalog.Directory) (outputCol
}

writer.WriteRecords(indexTime, data, targetDSV)
wal.RequestFlush()
wal.RequestFlush(tgc)

outputColumnSeries = io.NewColumnSeries()
outputColumnSeries.AddColumn("Epoch",
Expand Down
2 changes: 1 addition & 1 deletion uda/adjust/caloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewCorporateActions(symbol string) *Actions {
}

func (act *Actions) Load(catalogDir *catalog.Directory) error {
if executor.ThisInstance == nil || catalogDir == nil {
if catalogDir == nil {
return nil
}
query := planner.NewQuery(catalogDir)
Expand Down