Skip to content

Commit

Permalink
Merge #29445
Browse files Browse the repository at this point in the history
29445: release-2.1: cli: periodically flush csv/tsv output r=knz a=knz

Backport 1/1 commits from #28688.

/cc @cockroachdb/release

---

Fixes #28654.

The "sinkless" version of changefeeds continuously streams back
results to the user over pgwire. Prior to this patch, this data could
not be consumed effectively with `cockroach sql` using the tsv/csv
output, because the tsv/csv formatter buffers rows internally.

This patch makes tsv/csv output in `cockroach sql` an effective way to
consume changefeeds by ensuring an upper bound on the time rows stays
buffered inside the formatter. The flush period is fixed to 5 seconds.

For context, all the other formatters except for `table` are
line-buffered and thus flush on every row. `table` is a world of its
own which buffers *all* the rows until the query is complete, and that
is unlikely to change any time soon, so this patch doesn't touch that
either.

Release note (cli change): The `csv` and `tsv` formats for `cockroach`
commands that output result rows now buffer data for a maximum of 5
seconds. This makes it possible to e.g. view SQL changefeeds
interactively with `cockroach sql` and `cockroach demo`.


Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
  • Loading branch information
craig[bot] and knz committed Sep 1, 2018
2 parents 5231e15 + 053b08c commit 0011569
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 22 deletions.
86 changes: 67 additions & 19 deletions pkg/cli/format_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"reflect"
"strings"
"text/tabwriter"
"time"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -300,36 +302,75 @@ func (p *asciiTableReporter) doneRows(w io.Writer, seenRows int) error {
func (p *asciiTableReporter) doneNoRows(_ io.Writer) error { return nil }

type csvReporter struct {
csvWriter *csv.Writer
mu struct {
syncutil.Mutex
csvWriter *csv.Writer
}
stop chan struct{}
}

// csvFlushInterval is the maximum time between flushes of the
// buffered CSV/TSV data.
const csvFlushInterval = 5 * time.Second

func makeCSVReporter(w io.Writer, format tableDisplayFormat) (*csvReporter, func()) {
r := &csvReporter{}
r.mu.csvWriter = csv.NewWriter(w)
if format == tableDisplayTSV {
r.mu.csvWriter.Comma = '\t'
}

// Set up a flush daemon. This is useful when e.g. visualizing data
// from change feeds.
r.stop = make(chan struct{}, 1)
go func() {
ticker := time.NewTicker(csvFlushInterval)
for {
select {
case <-ticker.C:
r.mu.Lock()
r.mu.csvWriter.Flush()
r.mu.Unlock()
case <-r.stop:
return
}
}
}()
cleanup := func() {
close(r.stop)
}
return r, cleanup
}

func (p *csvReporter) describe(w io.Writer, cols []string) error {
p.csvWriter = csv.NewWriter(w)
if cliCtx.tableDisplayFormat == tableDisplayTSV {
p.csvWriter.Comma = '\t'
}
p.mu.Lock()
if len(cols) == 0 {
_ = p.csvWriter.Write([]string{"# no columns"})
_ = p.mu.csvWriter.Write([]string{"# no columns"})
} else {
_ = p.csvWriter.Write(cols)
_ = p.mu.csvWriter.Write(cols)
}
p.mu.Unlock()
return nil
}

func (p *csvReporter) iter(_ io.Writer, _ int, row []string) error {
p.mu.Lock()
if len(row) == 0 {
_ = p.csvWriter.Write([]string{"# empty"})
_ = p.mu.csvWriter.Write([]string{"# empty"})
} else {
_ = p.csvWriter.Write(row)
_ = p.mu.csvWriter.Write(row)
}
p.mu.Unlock()
return nil
}

func (p *csvReporter) beforeFirstRow(_ io.Writer, _ rowStrIter) error { return nil }
func (p *csvReporter) doneNoRows(_ io.Writer) error { return nil }

func (p *csvReporter) doneRows(w io.Writer, seenRows int) error {
p.csvWriter.Flush()
p.mu.Lock()
p.mu.csvWriter.Flush()
p.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -527,39 +568,46 @@ func (p *sqlReporter) doneRows(w io.Writer, seenRows int) error {
return nil
}

func makeReporter() (rowReporter, error) {
// makeReporter instantiates a table formatter. It returns the
// formatter and a cleanup function that must be called in all cases
// when the formatting completes.
func makeReporter(w io.Writer) (rowReporter, func(), error) {
switch cliCtx.tableDisplayFormat {
case tableDisplayTable:
return newASCIITableReporter(), nil
return newASCIITableReporter(), nil, nil

case tableDisplayTSV:
fallthrough
case tableDisplayCSV:
return &csvReporter{}, nil
reporter, cleanup := makeCSVReporter(w, cliCtx.tableDisplayFormat)
return reporter, cleanup, nil

case tableDisplayRaw:
return &rawReporter{}, nil
return &rawReporter{}, nil, nil

case tableDisplayHTML:
return &htmlReporter{escape: true, rowStats: true}, nil
return &htmlReporter{escape: true, rowStats: true}, nil, nil

case tableDisplayRecords:
return &recordReporter{}, nil
return &recordReporter{}, nil, nil

case tableDisplaySQL:
return &sqlReporter{}, nil
return &sqlReporter{}, nil, nil

default:
return nil, errors.Errorf("unhandled display format: %d", cliCtx.tableDisplayFormat)
return nil, nil, errors.Errorf("unhandled display format: %d", cliCtx.tableDisplayFormat)
}
}

// printQueryOutput takes a list of column names and a list of row
// contents writes a formatted table to 'w'.
func printQueryOutput(w io.Writer, cols []string, allRows rowStrIter) error {
reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(w)
if err != nil {
return err
}
if cleanup != nil {
defer cleanup()
}
return render(reporter, w, cols, allRows, nil)
}
5 changes: 4 additions & 1 deletion pkg/cli/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,13 @@ Output the list of cluster settings known to this binary.
rows = append(rows, row)
}

reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(os.Stdout)
if err != nil {
return err
}
if cleanup != nil {
defer cleanup()
}
if hr, ok := reporter.(*htmlReporter); ok {
hr.escape = false
hr.rowStats = false
Expand Down
9 changes: 7 additions & 2 deletions pkg/cli/sql_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,11 +768,16 @@ func runQueryAndFormatResults(conn *sqlConn, w io.Writer, fn queryFunc) error {
}

cols := getColumnStrings(rows, true)
reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(w)
if err != nil {
return err
}
if err := render(reporter, w, cols, newRowIter(rows, true), noRowsHook); err != nil {
if err := func() error {
if cleanup != nil {
defer cleanup()
}
return render(reporter, w, cols, newRowIter(rows, true), noRowsHook)
}(); err != nil {
return err
}

Expand Down

0 comments on commit 0011569

Please sign in to comment.