Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: implement COPY ... TO STDOUT for CSV & TEXT #94408

Merged
merged 6 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ FILES = [
"column_table_def",
"comment",
"commit_transaction",
"copy_from_stmt",
"copy_stmt",
"create_as_col_qual_list",
"create_as_constraint_def",
"create_changefeed_stmt",
Expand Down
4 changes: 0 additions & 4 deletions docs/generated/sql/bnf/copy_from_stmt.bnf

This file was deleted.

10 changes: 10 additions & 0 deletions docs/generated/sql/bnf/copy_stmt.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
copy_stmt ::=
'COPY' table_name opt_column_list 'FROM' 'STDIN' 'WITH' copy_options ( ( copy_options ) )*
| 'COPY' table_name opt_column_list 'FROM' 'STDIN' copy_options ( ( copy_options ) )*
| 'COPY' table_name opt_column_list 'FROM' 'STDIN'
| 'COPY' table_name opt_column_list 'TO' 'STDOUT' 'WITH' copy_options ( ( copy_options ) )*
| 'COPY' table_name opt_column_list 'TO' 'STDOUT' copy_options ( ( copy_options ) )*
| 'COPY' table_name opt_column_list 'TO' 'STDOUT'
| 'COPY' '(' preparable_stmt ')' 'TO' 'STDOUT' 'WITH' copy_options ( ( copy_options ) )*
| 'COPY' '(' preparable_stmt ')' 'TO' 'STDOUT' copy_options ( ( copy_options ) )*
| 'COPY' '(' preparable_stmt ')' 'TO' 'STDOUT'
7 changes: 5 additions & 2 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ stmt ::=
stmt_without_legacy_transaction ::=
preparable_stmt
| analyze_stmt
| copy_from_stmt
| copy_stmt
| comment_stmt
| execute_stmt
| deallocate_stmt
Expand Down Expand Up @@ -63,8 +63,10 @@ analyze_stmt ::=
'ANALYZE' analyze_target
| 'ANALYSE' analyze_target

copy_from_stmt ::=
copy_stmt ::=
'COPY' table_name opt_column_list 'FROM' 'STDIN' opt_with_copy_options opt_where_clause
| 'COPY' table_name opt_column_list 'TO' 'STDOUT' opt_with_copy_options
| 'COPY' '(' preparable_stmt ')' 'TO' 'STDOUT' opt_with_copy_options

comment_stmt ::=
'COMMENT' 'ON' 'DATABASE' database_name 'IS' comment_text
Expand Down Expand Up @@ -1368,6 +1370,7 @@ unreserved_keyword ::=
| 'STATEMENTS'
| 'STATISTICS'
| 'STDIN'
| 'STDOUT'
| 'STOP'
| 'STORAGE'
| 'STORE'
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/stmt_without_legacy_transaction.bnf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
stmt_without_legacy_transaction ::=
preparable_stmt
| analyze_stmt
| copy_from_stmt
| copy_stmt
| comment_stmt
| execute_stmt
| deallocate_stmt
Expand Down
6 changes: 6 additions & 0 deletions pkg/cli/clisqlclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type DriverConn interface {
Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error)
Exec(ctx context.Context, query string, args ...interface{}) error
CopyFrom(ctx context.Context, reader io.Reader, query string) (int64, error)
CopyTo(ctx context.Context, w io.Writer, query string) error
}

type driverConnAdapter struct {
Expand Down Expand Up @@ -224,3 +225,8 @@ func (d *driverConnAdapter) CopyFrom(
}
return cmdTag.RowsAffected(), nil
}

func (d *driverConnAdapter) CopyTo(ctx context.Context, w io.Writer, query string) error {
_, err := d.c.conn.PgConn().CopyTo(ctx, w, query)
return err
}
6 changes: 6 additions & 0 deletions pkg/cli/clisqlclient/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"github.com/jackc/pgconn"
)

// BeginCopyTo starts a COPY TO query.
func BeginCopyTo(ctx context.Context, conn Conn, w io.Writer, query string) (CommandTag, error) {
copyConn := conn.(*sqlConn).conn.PgConn()
return copyConn.CopyTo(ctx, w, query)
}

// CopyFromState represents an in progress COPY FROM.
type CopyFromState struct {
conn *pgconn.PgConn
Expand Down
12 changes: 12 additions & 0 deletions pkg/cli/clisqlshell/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,8 @@ func (c *cliState) doCheckStatement(startState, contState, execState cliStateEnu
return nextState
}

var copyToRe = regexp.MustCompile(`(?i)COPY.*TO\s+STDOUT`)

// doRunStatements runs all the statements that have been accumulated by
// concatLines.
func (c *cliState) doRunStatements(nextState cliStateEnum) cliStateEnum {
Expand Down Expand Up @@ -1968,6 +1970,16 @@ func (c *cliState) doRunStatements(nextState cliStateEnum) cliStateEnum {
// Now run the statement/query.
c.exitErr = c.runWithInterruptableCtx(func(ctx context.Context) error {
if scanner.FirstLexicalToken(c.concatLines) == lexbase.COPY {
// Ideally this is parsed using the parser, but we've avoided doing so
// for clisqlshell to be small.
if copyToRe.MatchString(c.concatLines) {
defer c.maybeFlushOutput()
// We don't print the tag, following psql.
if _, err := clisqlclient.BeginCopyTo(ctx, c.conn, c.iCtx.queryOutput, c.concatLines); err != nil {
return err
}
return nil
}
return c.beginCopyFrom(ctx, c.concatLines)
}
q := clisqlclient.MakeQuery(c.concatLines)
Expand Down
30 changes: 16 additions & 14 deletions pkg/cli/interactive_tests/test_copy.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ eexpect root@
send "COPY t FROM STDIN CSV;SELECT 1;\r"
eexpect "COPY together with other statements in a query string is not supported"
eexpect root@
send "COPY t TO STDOUT CSV;SELECT 1;\r"
eexpect "COPY together with other statements in a query string is not supported"
eexpect root@
send "SELECT 1;COPY t TO STDOUT CSV;\r"
eexpect "COPY together with other statements in a query string is not supported"
eexpect root@
end_test

start_test "Copy in transaction"
Expand Down Expand Up @@ -100,16 +106,14 @@ send_eof
eexpect "COPY 1"
eexpect root@

send "SELECT * FROM t ORDER BY id ASC;\r"

eexpect "1 | text with semicolon;"
eexpect "2 | beat chef@;"
eexpect "3 | more&text"
eexpect "4 | epa! epa!"
eexpect "11 | cat"
eexpect "12 | dog"
send "COPY t TO STDOUT;\r"

eexpect "(6 rows)"
eexpect "1\ttext with semicolon;"
eexpect "2\tbeat chef@;"
eexpect "3\tmore&text"
eexpect "4\tepa! epa!"
eexpect "11\tcat"
eexpect "12\tdog"

eexpect root@

Expand Down Expand Up @@ -148,7 +152,6 @@ eexpect root@
send_eof
eexpect eof


spawn /bin/bash
send "PS1=':''/# '\r"
eexpect ":/# "
Expand Down Expand Up @@ -182,10 +185,9 @@ send "echo -e '2\\tb' >> /tmp/test_copy.sql\r"
eexpect ":/# "
send "$argv sql --insecure -f /tmp/test_copy.sql\r"
eexpect ":/# "
send "$argv sql --insecure -e 'SELECT * FROM t ORDER BY id'\r"
eexpect "1 | a"
eexpect "2 | b"
eexpect "(2 rows)"
send "$argv sql --insecure -e 'COPY t TO STDOUT'\r"
eexpect "1\ta"
eexpect "2\tb"
eexpect ":/# "

send "$argv sql --insecure -e 'TRUNCATE TABLE t'\r"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/docgen/diagrams.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ var specs = []stmtSpec{
match: []*regexp.Regexp{regexp.MustCompile("'COMMIT'|'END'")},
},
{
name: "copy_from_stmt",
name: "copy_stmt",
inline: []string{"opt_with_copy_options", "copy_options_list", "opt_with", "opt_where_clause", "where_clause"},
exclude: []*regexp.Regexp{regexp.MustCompile("'WHERE'")},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/gen/bnf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ BNF_SRCS = [
"//docs/generated/sql/bnf:column_table_def.bnf",
"//docs/generated/sql/bnf:comment.bnf",
"//docs/generated/sql/bnf:commit_transaction.bnf",
"//docs/generated/sql/bnf:copy_from_stmt.bnf",
"//docs/generated/sql/bnf:copy_stmt.bnf",
"//docs/generated/sql/bnf:create_as_col_qual_list.bnf",
"//docs/generated/sql/bnf:create_as_constraint_def.bnf",
"//docs/generated/sql/bnf:create_changefeed_stmt.bnf",
Expand Down
2 changes: 1 addition & 1 deletion pkg/gen/diagrams.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ DIAGRAMS_SRCS = [
"//docs/generated/sql/bnf:column_table_def.html",
"//docs/generated/sql/bnf:comment.html",
"//docs/generated/sql/bnf:commit_transaction.html",
"//docs/generated/sql/bnf:copy_from.html",
"//docs/generated/sql/bnf:copy.html",
"//docs/generated/sql/bnf:create.html",
"//docs/generated/sql/bnf:create_as_col_qual_list.html",
"//docs/generated/sql/bnf:create_as_constraint_def.html",
Expand Down
2 changes: 1 addition & 1 deletion pkg/gen/docs.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ DOCS_SRCS = [
"//docs/generated/sql/bnf:column_table_def.bnf",
"//docs/generated/sql/bnf:comment.bnf",
"//docs/generated/sql/bnf:commit_transaction.bnf",
"//docs/generated/sql/bnf:copy_from_stmt.bnf",
"//docs/generated/sql/bnf:copy_stmt.bnf",
"//docs/generated/sql/bnf:create_as_col_qual_list.bnf",
"//docs/generated/sql/bnf:create_as_constraint_def.bnf",
"//docs/generated/sql/bnf:create_changefeed_stmt.bnf",
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ go_library(
"conn_io.go",
"control_jobs.go",
"control_schedules.go",
"copy.go",
"copy_file_upload.go",
"copy_from.go",
"copy_to.go",
"crdb_internal.go",
"create_database.go",
"create_extension.go",
Expand Down Expand Up @@ -584,7 +585,9 @@ go_test(
"conn_executor_savepoints_test.go",
"conn_executor_test.go",
"conn_io_test.go",
"copy_from_test.go",
"copy_test.go",
"copy_to_test.go",
"crdb_internal_test.go",
"create_function_test.go",
"create_stats_test.go",
Expand Down
111 changes: 103 additions & 8 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io"
"math"
"math/rand"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -2130,6 +2131,19 @@ func (ex *connExecutor) execCmd() error {
if err != nil {
return err
}
case CopyOut:
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryReceived, tcmd.TimeReceived)
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionStartParse, tcmd.ParseStart)
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndParse, tcmd.ParseEnd)
res = ex.clientComm.CreateCopyInResult(pos)
ev, payload = ex.execCopyOut(ctx, tcmd)
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
// because:
// - stats use ex.statsCollector, not ex.phasetimes.
// - ex.statsCollector merely contains a copy of the times, that
// was created when the statement started executing (via the
// reset() method).
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now())
case DrainRequest:
// We received a drain request. We terminate immediately if we're not in a
// transaction. If we are in a transaction, we'll finish as soon as a Sync
Expand Down Expand Up @@ -2354,6 +2368,8 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
canAdvance = true
case CopyIn:
// Can't advance.
case CopyOut:
// Can't advance.
case DrainRequest:
canAdvance = true
case Flush:
Expand Down Expand Up @@ -2421,6 +2437,92 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
stmt.Table.Table() == UserFileUploadTable) && stmt.Table.SchemaName == CrdbInternalName
}

func (ex *connExecutor) execCopyOut(
ctx context.Context, cmd CopyOut,
) (fsm.Event, fsm.EventPayload) {
err := func() error {
ex.incrementStartedStmtCounter(cmd.Stmt)
var copyErr error
var numOutputRows int

// Log the query for sampling.
ex.setCopyLoggingFields(cmd.ParsedStmt)
defer func() {
// These fields are not available in COPY, so use the empty value.
f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(cmd.Stmt)
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
copyErr != nil,
ex.implicitTxn(),
ex.planner.CurrentDatabase(),
)
var stats topLevelQueryStats
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
true, /* isCopy */
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
numOutputRows,
0, /* bulkJobId */
copyErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
&stats,
)
}()

return ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
// Re-use the current transaction if available.
txn := ex.planner.Txn()
if txn == nil || !txn.IsOpen() {
// Setup an implicit transaction for COPY TO.
txn = ex.server.cfg.DB.NewTxn(ctx, cmd.Stmt.String())
defer func(txn *kv.Txn, ctx context.Context) {
err := txn.Rollback(ctx)
if err != nil {
log.SqlExec.Errorf(ctx, "error rolling black implicit txn in %s: %+v", cmd, err)
}
}(txn, ctx)
}

var err error
if numOutputRows, err = runCopyTo(ctx, &ex.planner, txn, cmd); err != nil {
return err
}

// Finalize execution by sending the statement tag and number of rows read.
dummy := tree.CopyTo{}
tag := []byte(dummy.StatementTag())
tag = append(tag, ' ')
tag = strconv.AppendInt(tag, int64(numOutputRows), 10 /* base */)
return cmd.Conn.SendCommandComplete(tag)
})
}()
if err != nil {
log.SqlExec.Errorf(ctx, "error executing %s: %+v", cmd, err)
return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{
err: err,
}
}
ex.incrementExecutedStmtCounter(cmd.Stmt)
return nil, nil
}

func (ex *connExecutor) setCopyLoggingFields(stmt parser.Statement) {
// These fields need to be set for logging purposes.
ex.planner.stmt = Statement{
Statement: stmt,
}
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation)
}

// We handle the CopyFrom statement by creating a copyMachine and handing it
// control over the connection until the copying is done. The contract is that,
// when this is called, the pgwire.conn is not reading from the network
Expand Down Expand Up @@ -2497,14 +2599,7 @@ func (ex *connExecutor) execCopyIn(
ex.resetPlanner(ctx, p, txn, stmtTS)
}

// These fields need to be set for logging purposes.
ex.planner.stmt = Statement{
Statement: cmd.ParsedStmt,
}
ann := tree.MakeAnnotations(0)
ex.planner.extendedEvalCtx.Context.Annotations = &ann
ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{}
ex.planner.curPlan.init(&ex.planner.stmt, &ex.planner.instrumentation)
ex.setCopyLoggingFields(cmd.ParsedStmt)

var cm copyMachineInterface
var copyErr error
Expand Down
Loading