Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple issues by switching database drivers #63

Merged
merged 8 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ PostgreSQL's built-in `COPY` functionality for bulk inserting data
into [TimescaleDB.](//github.com/timescale/timescaledb/)

### Getting started
You need the Go runtime (1.6+) installed, then simply `go get` this repo:
You need the Go runtime (1.13+) installed, then simply `go get` this repo:
```bash
$ go install github.com/timescale/timescaledb-parallel-copy/cmd/timescaledb-parallel-copy@latest
```
Expand Down Expand Up @@ -83,3 +83,16 @@ Usage of timescaledb-parallel-copy:

### Contributing
We welcome contributions to this utility, which like TimescaleDB is released under the Apache2 Open Source License. The same [Contributors Agreement](//github.com/timescale/timescaledb/blob/master/CONTRIBUTING.md) applies; please sign the [Contributor License Agreement](https://cla-assistant.io/timescale/timescaledb-parallel-copy) (CLA) if you're a new contributor.

#### Running Tests

Some of the tests require a running Postgres database. Set the `TEST_CONNINFO`
environment variable to point at the database you want to run tests against.
(Assume that the tests may be destructive; in particular it is not advisable to
point the tests at any production database.)

For example:
```
$ createdb gotest
$ TEST_CONNINFO='dbname=gotest user=myuser' go test -v ./...
```
133 changes: 25 additions & 108 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
package main

import (
"bufio"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
_ "github.com/jackc/pgx/v4/stdlib"

"github.com/timescale/timescaledb-parallel-copy/internal/batch"
"github.com/timescale/timescaledb-parallel-copy/internal/db"
)

const (
binName = "timescaledb-parallel-copy"
version = "0.3.0"
version = "0.4.0-dev"
tabCharStr = "\\t"
)

Expand All @@ -39,7 +40,6 @@ var (
skipHeader bool
headerLinesCnt int

tokenSize int
workers int
limit int64
batchSize int
Expand All @@ -51,10 +51,6 @@ var (
rowCount int64
)

type batch struct {
rows []string
}

// Parse args
func init() {
var dbName string
Expand All @@ -71,7 +67,6 @@ func init() {
flag.BoolVar(&skipHeader, "skip-header", false, "Skip the first line of the input")
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "Number of header lines")

flag.IntVar(&tokenSize, "token-size", bufio.MaxScanTokenSize, "Maximum size to use for tokens. By default, this is 64KB, so any value less than that will be ignored")
flag.IntVar(&batchSize, "batch-size", 5000, "Number of rows per insert")
flag.Int64Var(&limit, "limit", 0, "Number of rows to insert overall; 0 means to insert all")
flag.IntVar(&workers, "workers", 1, "Number of parallel requests to make")
Expand Down Expand Up @@ -114,33 +109,35 @@ func main() {
}
}

var scanner *bufio.Scanner
var reader io.Reader
if len(fromFile) > 0 {
file, err := os.Open(fromFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()

scanner = bufio.NewScanner(file)
reader = file
} else {
scanner = bufio.NewScanner(os.Stdin)
reader = os.Stdin
}

if headerLinesCnt <= 0 {
fmt.Printf("WARNING: provided --header-line-count (%d) must be greater than 0\n", headerLinesCnt)
os.Exit(1)
}

if tokenSize != 0 && tokenSize < bufio.MaxScanTokenSize {
fmt.Printf("WARNING: provided --token-size (%d) is smaller than default (%d), ignoring\n", tokenSize, bufio.MaxScanTokenSize)
} else if tokenSize > bufio.MaxScanTokenSize {
buf := make([]byte, tokenSize)
scanner.Buffer(buf, tokenSize)
var skip int
if skipHeader {
skip = headerLinesCnt

if verbose {
fmt.Printf("Skipping the first %d lines of the input.\n", headerLinesCnt)
}
}

var wg sync.WaitGroup
batchChan := make(chan *batch, workers*2)
batchChan := make(chan net.Buffers, workers*2)

// Generate COPY workers
for i := 0; i < workers; i++ {
Expand All @@ -154,11 +151,16 @@ func main() {
}

start := time.Now()
rowsRead := scan(batchSize, scanner, batchChan)
if err := batch.Scan(batchSize, skip, limit, reader, batchChan); err != nil {
log.Fatalf("Error reading input: %s", err.Error())
}

close(batchChan)
wg.Wait()
end := time.Now()
took := end.Sub(start)

rowsRead := atomic.LoadInt64(&rowCount)
rowRate := float64(rowsRead) / float64(took.Seconds())

res := fmt.Sprintf("COPY %d", rowsRead)
Expand Down Expand Up @@ -190,62 +192,18 @@ func report() {

}

// scan reads lines from a bufio.Scanner, each which should be in CSV format
// with a delimiter specified by --split (comma by default)
func scan(itemsPerBatch int, scanner *bufio.Scanner, batchChan chan *batch) int64 {
rows := make([]string, 0, itemsPerBatch)
var linesRead int64

if skipHeader {
if verbose {
fmt.Printf("Skipping the first %d lines of the input.\n", headerLinesCnt)
}
for i := 0; i < headerLinesCnt; i++ {
scanner.Scan()
}
}

for scanner.Scan() {
if limit != 0 && linesRead >= limit {
break
}

rows = append(rows, scanner.Text())
if len(rows) >= itemsPerBatch { // dispatch to COPY worker & reset
batchChan <- &batch{rows}
rows = make([]string, 0, itemsPerBatch)
}
linesRead++
}

if err := scanner.Err(); err != nil {
log.Fatalf("Error reading input: %s", err.Error())
}

// Finished reading input, make sure last batch goes out.
if len(rows) > 0 {
batchChan <- &batch{rows}
}

return linesRead
}

// processBatches reads batches from channel c and copies them to the target
// server while tracking stats on the write.
func processBatches(wg *sync.WaitGroup, c chan *batch) {
func processBatches(wg *sync.WaitGroup, c chan net.Buffers) {
dbx, err := db.Connect(postgresConnect, overrides...)
if err != nil {
panic(err)
}
defer dbx.Close()

delimStr := "'" + splitCharacter + "'"
useSplitChar := splitCharacter
if splitCharacter == tabCharStr {
delimStr = "E" + delimStr
// Need to covert the string-ified version of the character to actual
// character for correct split
useSplitChar = "\t"
}

var copyCmd string
Expand All @@ -257,7 +215,7 @@ func processBatches(wg *sync.WaitGroup, c chan *batch) {

for batch := range c {
start := time.Now()
rows, err := processBatch(dbx, batch, copyCmd, useSplitChar)
rows, err := db.CopyFromLines(dbx, &batch, copyCmd)
if err != nil {
panic(err)
}
Expand All @@ -270,44 +228,3 @@ func processBatches(wg *sync.WaitGroup, c chan *batch) {
}
wg.Done()
}

func processBatch(db *sqlx.DB, b *batch, copyCmd, splitChar string) (int64, error) {
tx, err := db.Begin()
if err != nil {
return 0, err
}

stmt, err := tx.Prepare(copyCmd)
if err != nil {
return 0, err
}

for _, line := range b.rows {
// For some reason this is only needed for tab splitting
if splitChar == "\t" {
sp := strings.Split(line, splitChar)
args := make([]interface{}, len(sp))
for i, v := range sp {
args[i] = v
}
_, err = stmt.Exec(args...)
} else {
_, err = stmt.Exec(line)
}

if err != nil {
return 0, err
}
}

err = stmt.Close()
if err != nil {
return 0, err
}

err = tx.Commit()
if err != nil {
return 0, err
}
return int64(len(b.rows)), nil
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/timescale/timescaledb-parallel-copy

go 1.12
go 1.13

require (
github.com/jackc/pgconn v1.1.0
github.com/jackc/pgconn v1.12.1
github.com/jackc/pgx/v4 v4.16.1
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.2.0
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 // indirect
github.com/lib/pq v1.10.6 // indirect
google.golang.org/appengine v1.6.5 // indirect
)
Loading