Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use is_deleted and use final #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ INS 606024 (74776.2/s), DEL 139 (7.7/s), LIST 1266 (39.7/s), ROWS 5308 (high=100
INS 658128 (68341.9/s), DEL 141 (7.7/s), LIST 1416 (39.4/s), ROWS 5459 (high=1001), ERR: 0/0/0, 10 sec
```

- manual buffered insert and using insert and FINAL
```
INS 190350 (26272.7/s), DEL 9650 (17331.6/s), LIST 0 (NaN/s), ROWS 0 (high=0), ERR: 0/0/0, 1 sec
INS 248993 (15707.8/s), DEL 12612 (19995.1/s), LIST 25 (6.9/s), ROWS 1023 (high=1001), ERR: 0/0/0, 2 sec
INS 319932 (13438.5/s), DEL 16139 (10769.3/s), LIST 64 (8.3/s), ROWS 3077 (high=1001), ERR: 0/0/0, 3 sec
INS 381250 (12001.2/s), DEL 19243 (12229.0/s), LIST 135 (11.4/s), ROWS 4157 (high=1001), ERR: 0/0/0, 4 sec
INS 457516 (12111.4/s), DEL 22977 (14331.6/s), LIST 239 (15.0/s), ROWS 5277 (high=1001), ERR: 0/0/0, 5 sec
INS 571961 (12076.5/s), DEL 28533 (17193.4/s), LIST 337 (16.9/s), ROWS 5375 (high=1001), ERR: 0/0/0, 6 sec
INS 648152 (12037.2/s), DEL 32342 (18833.9/s), LIST 448 (18.7/s), ROWS 5486 (high=1001), ERR: 0/0/0, 7 sec
INS 749165 (11789.8/s), DEL 37413 (18412.8/s), LIST 546 (19.6/s), ROWS 5584 (high=1001), ERR: 0/0/0, 8 sec
INS 825432 (11905.7/s), DEL 41148 (18178.4/s), LIST 672 (21.0/s), ROWS 5711 (high=1001), ERR: 0/0/0, 9 sec
INS 938205 (11811.2/s), DEL 46630 (17887.1/s), LIST 769 (21.4/s), ROWS 5808 (high=1001), ERR: 0/0/0, 10 sec
```

## Configs

- `insertThread` - number of goroutine to insert
Expand All @@ -71,4 +85,5 @@ INS 658128 (68341.9/s), DEL 141 (7.7/s), LIST 1416 (39.4/s), ROWS 5459 (high=100
- `randomInsertEvery` - insert random pattern every n insert
- `listingTotal` - number of listing query to perform, will stop 10 sec
- `stopAnywaySec` - stop anyway after n second all insert done
- `debug` - throw panic on error
- `debug` - throw panic on error
- `clickhouse.WithStdAsync` if `useDriverAsync` or `wait_for_async_insert`
67 changes: 40 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (c ClickhouseConf) Connect() (a *Ch.Adapter, err error) {
`max_execution_time`: 60,
`allow_experimental_lightweight_delete`: 1,
`async_insert`: 1,
//`async_insert_busy_timeout_ms`: 1000, // 1 sec
`async_insert_stale_timeout_ms`: 1000,
`async_insert_max_query_number`: 40_000, // 40k block

//`wait_for_async_insert`:1,
},
DialTimeout: 5 * time.Second,
Expand Down Expand Up @@ -77,13 +81,14 @@ func (c ClickhouseConf) Connect() (a *Ch.Adapter, err error) {
func main() {

const createTable = `
CREATE TABLE IF NOT EXISTS ver3(
CREATE TABLE IF NOT EXISTS ver4(
root LowCardinality(String) CODEC(LZ4HC),
bucket LowCardinality(String) CODEC(LZ4HC),
key String CODEC(LZ4HC),
version_id String CODEC(LZ4HC),
ver DateTime64 CODEC(LZ4HC)
) engine=ReplacingMergeTree(ver)
ver DateTime64 CODEC(LZ4HC),
is_deleted UInt8 CODEC(LZ4HC)
) engine=ReplacingMergeTree(ver, is_deleted)
ORDER BY (root, bucket, key, version_id)
`

Expand All @@ -100,7 +105,7 @@ ORDER BY (root, bucket, key, version_id)
L.PanicIf(err, `table creation`)

// truncate table
const truncateTable = `TRUNCATE TABLE ver3`
const truncateTable = `TRUNCATE TABLE ver4`
_, err = ch.Exec(truncateTable)
L.PanicIf(err, `table truncation`)

Expand Down Expand Up @@ -141,22 +146,22 @@ ORDER BY (root, bucket, key, version_id)
},
}

var timedBuffer *chBuffer.TimedBuffer
if !insertUseAsync {
const insertEvery = 40_000
timedBuffer = chBuffer.NewTimedBuffer(ch.DB, insertEvery, 1*time.Second, func(tx *sql.Tx) *sql.Stmt {
const insertQuery = `
INSERT INTO ver4 VALUES(?, ?, ?, ?, ?, ?)
`
stmt, err := tx.Prepare(insertQuery)
L.IsError(err, `failed to tx.Prepare: `+insertQuery)
return stmt
})
}
for z := 0; z < insertThread; z++ {
thread := z
eg.Go(func() error {
defer fmt.Println(`insert thread done`, thread)
var timedBuffer *chBuffer.TimedBuffer
if !insertUseAsync {
const insertEvery = 40_000
timedBuffer = chBuffer.NewTimedBuffer(ch.DB, insertEvery, 1*time.Second, func(tx *sql.Tx) *sql.Stmt {
const insertQuery = `
INSERT INTO ver3 VALUES(?, ?, ?, ?, ?)
`
stmt, err := tx.Prepare(insertQuery)
L.IsError(err, `failed to tx.Prepare: `+insertQuery)
return stmt
})
}
for z := 0; z < insertTotal/insertThread; z++ {
atomic.AddUint64(&insertDur, track(func() {
var key string
Expand All @@ -169,7 +174,7 @@ INSERT INTO ver3 VALUES(?, ?, ?, ?, ?)
if insertUseAsync {
// slow and high cpu usage, better use ch-timed-buffer, wait_for_async_insert=0 even worse
const insertQuery = `
INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, ?, ?, ?)
INSERT INTO ver4 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, ?, ?, ?, 0)
`
_, err := ch.Exec(insertQuery, root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`))
if isError(err) {
Expand All @@ -178,7 +183,7 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?,
}
} else {
// fast insert but slow to do other queries, also high cpu usage
if !timedBuffer.Insert([]any{root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`)}) {
if !timedBuffer.Insert([]any{root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`), uint8(0)}) {
atomic.AddUint64(&insertErr, 1)
return
}
Expand All @@ -192,9 +197,6 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?,
if atomic.AddUint32(&insertThreadDone, 1) == insertThread {
close(deleteQueue)
}
if !insertUseAsync {
timedBuffer.Close()
}
return nil
})
}
Expand All @@ -209,15 +211,23 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?,
for z := 0; z < deleteThread; z++ {
thread := z
eg.Go(func() error {

defer fmt.Println(`delete thread done`, thread)
for key := range deleteQueue {
atomic.AddUint64(&deleteDur, track(func() {
const deleteQuery = `
DELETE FROM ver3 WHERE root=? AND bucket=? AND key=? AND version_id=?
if insertUseAsync {
const deleteQuery = `
DELETE FROM ver4 WHERE root=? AND bucket=? AND key=? AND version_id=?
`
_, err := ch.Exec(deleteQuery, root, bucket, key[0], key[1])
if isError(err) {
atomic.AddUint64(&deleteErr, 1)
_, err := ch.Exec(deleteQuery, root, bucket, key[0], key[1])
if isError(err) {
atomic.AddUint64(&deleteErr, 1)
}
} else {
if !timedBuffer.Insert([]any{root, bucket, key[0], key[1], time.Now().Format(`2006-01-02 15:04:05.000000`), uint8(1)}) {
atomic.AddUint64(&insertErr, 1)
return
}
}
atomic.AddUint64(&deleteDone, 1)
}))
Expand Down Expand Up @@ -299,7 +309,7 @@ DELETE FROM ver3 WHERE root=? AND bucket=? AND key=? AND version_id=?
parts := len(strings.Split(pattern, `/`)) + 1
const listingQuery = `
SELECT arrayStringConcat(splitByChar('/', key, ?),'/'), MAX(version_id)
FROM ver3
FROM ver4 FINAL
WHERE root=?
AND bucket=?
AND key LIKE ?
Expand Down Expand Up @@ -366,6 +376,9 @@ LIMIT 1001
if insertThreadDone >= insertThread && len(deleteQueue) == 0 {
exitAfterSec--
if exitAfterSec < 0 {
if !insertUseAsync {
timedBuffer.Close()
}
return nil
}
}
Expand Down
41 changes: 41 additions & 0 deletions testing.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
CREATE TABLE IF NOT EXISTS ver4
(
`root` LowCardinality(String) CODEC(LZ4HC),
`bucket` LowCardinality(String) CODEC(LZ4HC),
`key` String CODEC(LZ4HC),
`version_id` String CODEC(LZ4HC),
`ver` DateTime64 CODEC(LZ4HC),
`is_deleted` UInt8 CODEC(LZ4HC)
)
ENGINE = ReplacingMergeTree(ver, is_deleted)
ORDER BY (root, bucket, key, version_id);

INSERT INTO ver4 VALUES('root1', 'bucket1', 'a', 'a', '2023-05-24', 0);
INSERT INTO ver4 VALUES('root1', 'bucket1', 'a', 'a', '2023-05-24', 1);
SELECT * FROM ver4;
┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐
│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │
└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘
┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐
│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 1 │
└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘

SELECT * FROM ver4 WHERE is_deleted = 0;
┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐
│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │
└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘

-- doesn't merge
SELECT * FROM ver4 FINAL WHERE is_deleted = 0 ;
SELECT * FROM ver4;
┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐
│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │
└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘
┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐
│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 1 │
└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘

-- does merge
SELECT * FROM ver4 FINAL ;
-- empty