Skip to content

Commit

Permalink
Merge #28688 #29446
Browse files Browse the repository at this point in the history
28688: cli: periodically flush csv/tsv output r=knz a=knz

Fixes #28654.

The "sinkless" version of changefeeds continuously streams back
results to the user over pgwire. Prior to this patch, this data could
not be consumed effectively with `cockroach sql` using the tsv/csv
output, because the tsv/csv formatter buffers rows internally.

This patch makes tsv/csv output in `cockroach sql` an effective way to
consume changefeeds by ensuring an upper bound on the time rows stays
buffered inside the formatter. The flush period is fixed to 5 seconds.

For context, all the other formatters except for `table` are
line-buffered and thus flush on every row. `table` is a world of its
own which buffers *all* the rows until the query is complete, and that
is unlikely to change any time soon, so this patch doesn't touch that
either.

Release note (cli change): The `csv` and `tsv` formats for `cockroach`
commands that output result rows now buffer data for a maximum of 5
seconds. This makes it possible to e.g. view SQL changefeeds
interactively with `cockroach sql` and `cockroach demo`.

29446: cli: avoid deprecation warnings for deprecated flags r=knz a=knz

Suggested/recommended by @a-robinson.

The flags `--host`, `--advertise-host`, etc are now deprecated, but
there is no cost in continuing to support them. Also users migrating
from previous versions are not losing anything (or missing out) by
continuing to use them. Forcing a warning to appear when they are used
does not bring any tangible benefit and risks creating operator
fatigue.

So this patch removes the warning (but keeps the deprecated flags
hidden, so that new users are guided to the new flags).

Release note: None

Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
  • Loading branch information
craig[bot] and knz committed Aug 31, 2018
3 parents 094b873 + 1dc7788 + 5a41e06 commit f184661
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 69 deletions.
45 changes: 22 additions & 23 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,44 +256,43 @@ func init() {
// special severity value DEFAULT instead.
pf.Lookup(logflags.LogToStderrName).NoOptDefVal = log.Severity_DEFAULT.String()

// Remember we are starting in the background as the `start` command will
// avoid printing some messages to standard output in that case.
_, startCtx.inBackground = envutil.EnvString(backgroundEnvVar, 1)
// If we are relaunching in the background due to the processing of --background,
// any warning about deprecated flags have been reported already; in that
// case we do not want warnings to be printed a second time.
markDeprecated := !startCtx.inBackground

{
f := StartCmd.Flags()

// Server flags.
VarFlag(f, addrSetter{&startCtx.serverListenAddr, &serverListenPort}, cliflags.ListenAddr)
VarFlag(f, addrSetter{&serverAdvertiseAddr, &serverAdvertisePort}, cliflags.AdvertiseAddr)
VarFlag(f, addrSetter{&serverHTTPAddr, &serverHTTPPort}, cliflags.ListenHTTPAddr)

// Backward-compatibility flags.

// These are deprecated but until we have qualitatively new
// functionality in the flags above, there is no need to nudge the
// user away from them with a deprecation warning. So we keep
// them, but hidden from docs so that they don't appear as
// redundant with the main flags.
VarFlag(f, aliasStrVar{&startCtx.serverListenAddr}, cliflags.ServerHost)
_ = f.MarkHidden(cliflags.ServerHost.Name)
VarFlag(f, aliasStrVar{&serverListenPort}, cliflags.ServerPort)
if markDeprecated {
_ = f.MarkDeprecated(cliflags.ServerHost.Name, "use --listen-addr/--advertise-addr instead.")
_ = f.MarkDeprecated(cliflags.ServerPort.Name, "use --listen-addr=...:<port> instead.")
}
_ = f.MarkHidden(cliflags.ServerPort.Name)

VarFlag(f, addrSetter{&serverAdvertiseAddr, &serverAdvertisePort}, cliflags.AdvertiseAddr)
VarFlag(f, aliasStrVar{&serverAdvertiseAddr}, cliflags.AdvertiseHost)
_ = f.MarkHidden(cliflags.AdvertiseHost.Name)
VarFlag(f, aliasStrVar{&serverAdvertisePort}, cliflags.AdvertisePort)
if markDeprecated {
_ = f.MarkDeprecated(cliflags.AdvertiseHost.Name, "use --advertise-addr instead.")
_ = f.MarkDeprecated(cliflags.AdvertisePort.Name, "use --advertise-addr=...:<port> instead.")
}
_ = f.MarkHidden(cliflags.AdvertisePort.Name)

VarFlag(f, &localityAdvertiseHosts, cliflags.LocalityAdvertiseAddr)
if markDeprecated {
_ = f.MarkDeprecated(cliflags.AdvertisePort.Name, "use --advertise-addr=...:<port> instead.")
}

VarFlag(f, addrSetter{&serverHTTPAddr, &serverHTTPPort}, cliflags.ListenHTTPAddr)
VarFlag(f, aliasStrVar{&serverHTTPAddr}, cliflags.ListenHTTPAddrAlias)
_ = f.MarkHidden(cliflags.ListenHTTPAddrAlias.Name)
VarFlag(f, aliasStrVar{&serverHTTPPort}, cliflags.ListenHTTPPort)
if markDeprecated {
_ = f.MarkDeprecated(cliflags.ListenHTTPAddrAlias.Name, "use --http-addr instead.")
_ = f.MarkDeprecated(cliflags.ListenHTTPPort.Name, "use --http-addr=...:<port> instead.")
}
_ = f.MarkHidden(cliflags.ListenHTTPPort.Name)

// More server flags.

VarFlag(f, &localityAdvertiseHosts, cliflags.LocalityAdvertiseAddr)

StringFlag(f, &serverCfg.Attrs, cliflags.Attrs, serverCfg.Attrs)
VarFlag(f, &serverCfg.Locality, cliflags.Locality)
Expand Down
86 changes: 67 additions & 19 deletions pkg/cli/format_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"reflect"
"strings"
"text/tabwriter"
"time"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -300,36 +302,75 @@ func (p *asciiTableReporter) doneRows(w io.Writer, seenRows int) error {
func (p *asciiTableReporter) doneNoRows(_ io.Writer) error { return nil }

type csvReporter struct {
csvWriter *csv.Writer
mu struct {
syncutil.Mutex
csvWriter *csv.Writer
}
stop chan struct{}
}

// csvFlushInterval is the maximum time between flushes of the
// buffered CSV/TSV data.
const csvFlushInterval = 5 * time.Second

func makeCSVReporter(w io.Writer, format tableDisplayFormat) (*csvReporter, func()) {
r := &csvReporter{}
r.mu.csvWriter = csv.NewWriter(w)
if format == tableDisplayTSV {
r.mu.csvWriter.Comma = '\t'
}

// Set up a flush daemon. This is useful when e.g. visualizing data
// from change feeds.
r.stop = make(chan struct{}, 1)
go func() {
ticker := time.NewTicker(csvFlushInterval)
for {
select {
case <-ticker.C:
r.mu.Lock()
r.mu.csvWriter.Flush()
r.mu.Unlock()
case <-r.stop:
return
}
}
}()
cleanup := func() {
close(r.stop)
}
return r, cleanup
}

func (p *csvReporter) describe(w io.Writer, cols []string) error {
p.csvWriter = csv.NewWriter(w)
if cliCtx.tableDisplayFormat == tableDisplayTSV {
p.csvWriter.Comma = '\t'
}
p.mu.Lock()
if len(cols) == 0 {
_ = p.csvWriter.Write([]string{"# no columns"})
_ = p.mu.csvWriter.Write([]string{"# no columns"})
} else {
_ = p.csvWriter.Write(cols)
_ = p.mu.csvWriter.Write(cols)
}
p.mu.Unlock()
return nil
}

func (p *csvReporter) iter(_ io.Writer, _ int, row []string) error {
p.mu.Lock()
if len(row) == 0 {
_ = p.csvWriter.Write([]string{"# empty"})
_ = p.mu.csvWriter.Write([]string{"# empty"})
} else {
_ = p.csvWriter.Write(row)
_ = p.mu.csvWriter.Write(row)
}
p.mu.Unlock()
return nil
}

func (p *csvReporter) beforeFirstRow(_ io.Writer, _ rowStrIter) error { return nil }
func (p *csvReporter) doneNoRows(_ io.Writer) error { return nil }

func (p *csvReporter) doneRows(w io.Writer, seenRows int) error {
p.csvWriter.Flush()
p.mu.Lock()
p.mu.csvWriter.Flush()
p.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -527,39 +568,46 @@ func (p *sqlReporter) doneRows(w io.Writer, seenRows int) error {
return nil
}

func makeReporter() (rowReporter, error) {
// makeReporter instantiates a table formatter. It returns the
// formatter and a cleanup function that must be called in all cases
// when the formatting completes.
func makeReporter(w io.Writer) (rowReporter, func(), error) {
switch cliCtx.tableDisplayFormat {
case tableDisplayTable:
return newASCIITableReporter(), nil
return newASCIITableReporter(), nil, nil

case tableDisplayTSV:
fallthrough
case tableDisplayCSV:
return &csvReporter{}, nil
reporter, cleanup := makeCSVReporter(w, cliCtx.tableDisplayFormat)
return reporter, cleanup, nil

case tableDisplayRaw:
return &rawReporter{}, nil
return &rawReporter{}, nil, nil

case tableDisplayHTML:
return &htmlReporter{escape: true, rowStats: true}, nil
return &htmlReporter{escape: true, rowStats: true}, nil, nil

case tableDisplayRecords:
return &recordReporter{}, nil
return &recordReporter{}, nil, nil

case tableDisplaySQL:
return &sqlReporter{}, nil
return &sqlReporter{}, nil, nil

default:
return nil, errors.Errorf("unhandled display format: %d", cliCtx.tableDisplayFormat)
return nil, nil, errors.Errorf("unhandled display format: %d", cliCtx.tableDisplayFormat)
}
}

// printQueryOutput takes a list of column names and a list of row
// contents writes a formatted table to 'w'.
func printQueryOutput(w io.Writer, cols []string, allRows rowStrIter) error {
reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(w)
if err != nil {
return err
}
if cleanup != nil {
defer cleanup()
}
return render(reporter, w, cols, allRows, nil)
}
5 changes: 4 additions & 1 deletion pkg/cli/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,13 @@ Output the list of cluster settings known to this binary.
rows = append(rows, row)
}

reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(os.Stdout)
if err != nil {
return err
}
if cleanup != nil {
defer cleanup()
}
if hr, ok := reporter.(*htmlReporter); ok {
hr.escape = false
hr.rowStats = false
Expand Down
24 changes: 0 additions & 24 deletions pkg/cli/interactive_tests/test_flags.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,6 @@ interrupt
eexpect ":/# "
end_test

start_test "Check that --host causes a deprecation warning."
send "$argv start --insecure --host=localhost\r"
eexpect "host has been deprecated, use --listen-addr/--advertise-addr instead."
eexpect "node starting"
interrupt
eexpect ":/# "
end_test

start_test "Check that server --port causes a deprecation warning."
send "$argv start --insecure --port=26257\r"
eexpect "port has been deprecated, use --listen-addr=...:<port> instead."
eexpect "node starting"
interrupt
eexpect ":/# "
end_test

start_test "Check that server --advertise-port causes a deprecation warning."
send "$argv start --insecure --advertise-port=12345\r"
eexpect "advertise-port has been deprecated, use --advertise-addr=...:<port> instead."
eexpect "node starting"
interrupt
eexpect ":/# "
end_test

start_test "Check that not using --host nor --advertise causes a user warning."
send "$argv start --insecure\r"
eexpect "WARNING: neither --listen-addr nor --advertise-addr was specified"
Expand Down
9 changes: 7 additions & 2 deletions pkg/cli/sql_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,11 +768,16 @@ func runQueryAndFormatResults(conn *sqlConn, w io.Writer, fn queryFunc) error {
}

cols := getColumnStrings(rows, true)
reporter, err := makeReporter()
reporter, cleanup, err := makeReporter(w)
if err != nil {
return err
}
if err := render(reporter, w, cols, newRowIter(rows, true), noRowsHook); err != nil {
if err := func() error {
if cleanup != nil {
defer cleanup()
}
return render(reporter, w, cols, newRowIter(rows, true), noRowsHook)
}(); err != nil {
return err
}

Expand Down

0 comments on commit f184661

Please sign in to comment.