From 22127148662f7a6b41723ac1b94ece83f170a2d0 Mon Sep 17 00:00:00 2001 From: Kiswono Prayogo Date: Wed, 24 May 2023 05:22:05 +0700 Subject: [PATCH] use is_deleted and use final --- README.md | 17 +++++++++++++- main.go | 67 ++++++++++++++++++++++++++++++++--------------------- testing.txt | 41 ++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 testing.txt diff --git a/README.md b/README.md index 098b2d2..8e6f1b5 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,20 @@ INS 606024 (74776.2/s), DEL 139 (7.7/s), LIST 1266 (39.7/s), ROWS 5308 (high=100 INS 658128 (68341.9/s), DEL 141 (7.7/s), LIST 1416 (39.4/s), ROWS 5459 (high=1001), ERR: 0/0/0, 10 sec ``` +- manual buffered insert and using insert and FINAL +``` +INS 190350 (26272.7/s), DEL 9650 (17331.6/s), LIST 0 (NaN/s), ROWS 0 (high=0), ERR: 0/0/0, 1 sec +INS 248993 (15707.8/s), DEL 12612 (19995.1/s), LIST 25 (6.9/s), ROWS 1023 (high=1001), ERR: 0/0/0, 2 sec +INS 319932 (13438.5/s), DEL 16139 (10769.3/s), LIST 64 (8.3/s), ROWS 3077 (high=1001), ERR: 0/0/0, 3 sec +INS 381250 (12001.2/s), DEL 19243 (12229.0/s), LIST 135 (11.4/s), ROWS 4157 (high=1001), ERR: 0/0/0, 4 sec +INS 457516 (12111.4/s), DEL 22977 (14331.6/s), LIST 239 (15.0/s), ROWS 5277 (high=1001), ERR: 0/0/0, 5 sec +INS 571961 (12076.5/s), DEL 28533 (17193.4/s), LIST 337 (16.9/s), ROWS 5375 (high=1001), ERR: 0/0/0, 6 sec +INS 648152 (12037.2/s), DEL 32342 (18833.9/s), LIST 448 (18.7/s), ROWS 5486 (high=1001), ERR: 0/0/0, 7 sec +INS 749165 (11789.8/s), DEL 37413 (18412.8/s), LIST 546 (19.6/s), ROWS 5584 (high=1001), ERR: 0/0/0, 8 sec +INS 825432 (11905.7/s), DEL 41148 (18178.4/s), LIST 672 (21.0/s), ROWS 5711 (high=1001), ERR: 0/0/0, 9 sec +INS 938205 (11811.2/s), DEL 46630 (17887.1/s), LIST 769 (21.4/s), ROWS 5808 (high=1001), ERR: 0/0/0, 10 sec +``` + ## Configs - `insertThread` - number of goroutine to insert @@ -71,4 +85,5 @@ INS 658128 (68341.9/s), DEL 141 (7.7/s), LIST 1416 (39.4/s), ROWS 5459 (high=100 - `randomInsertEvery` - insert random pattern every n insert - `listingTotal` - number of listing query to perform, will stop 10 sec - `stopAnywaySec` - stop anyway after n second all insert done -- `debug` - throw panic on error \ No newline at end of file +- `debug` - throw panic on error +- `clickhouse.WithStdAsync` if `useDriverAsync` or `wait_for_async_insert` \ No newline at end of file diff --git a/main.go b/main.go index 4f1da18..9b16705 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,10 @@ func (c ClickhouseConf) Connect() (a *Ch.Adapter, err error) { `max_execution_time`: 60, `allow_experimental_lightweight_delete`: 1, `async_insert`: 1, + //`async_insert_busy_timeout_ms`: 1000, // 1 sec + `async_insert_stale_timeout_ms`: 1000, + `async_insert_max_query_number`: 40_000, // 40k block + //`wait_for_async_insert`:1, }, DialTimeout: 5 * time.Second, @@ -77,13 +81,14 @@ func (c ClickhouseConf) Connect() (a *Ch.Adapter, err error) { func main() { const createTable = ` -CREATE TABLE IF NOT EXISTS ver3( +CREATE TABLE IF NOT EXISTS ver4( root LowCardinality(String) CODEC(LZ4HC), bucket LowCardinality(String) CODEC(LZ4HC), key String CODEC(LZ4HC), version_id String CODEC(LZ4HC), - ver DateTime64 CODEC(LZ4HC) -) engine=ReplacingMergeTree(ver) + ver DateTime64 CODEC(LZ4HC), + is_deleted UInt8 CODEC(LZ4HC) +) engine=ReplacingMergeTree(ver, is_deleted) ORDER BY (root, bucket, key, version_id) ` @@ -100,7 +105,7 @@ ORDER BY (root, bucket, key, version_id) L.PanicIf(err, `table creation`) // truncate table - const truncateTable = `TRUNCATE TABLE ver3` + const truncateTable = `TRUNCATE TABLE ver4` _, err = ch.Exec(truncateTable) L.PanicIf(err, `table truncation`) @@ -141,22 +146,22 @@ ORDER BY (root, bucket, key, version_id) }, } + var timedBuffer *chBuffer.TimedBuffer + if !insertUseAsync { + const insertEvery = 40_000 + timedBuffer = chBuffer.NewTimedBuffer(ch.DB, insertEvery, 1*time.Second, func(tx *sql.Tx) *sql.Stmt { + const insertQuery = ` +INSERT INTO ver4 VALUES(?, ?, ?, ?, ?, ?) +` + stmt, err := tx.Prepare(insertQuery) + L.IsError(err, `failed to tx.Prepare: `+insertQuery) + return stmt + }) + } for z := 0; z < insertThread; z++ { thread := z eg.Go(func() error { defer fmt.Println(`insert thread done`, thread) - var timedBuffer *chBuffer.TimedBuffer - if !insertUseAsync { - const insertEvery = 40_000 - timedBuffer = chBuffer.NewTimedBuffer(ch.DB, insertEvery, 1*time.Second, func(tx *sql.Tx) *sql.Stmt { - const insertQuery = ` -INSERT INTO ver3 VALUES(?, ?, ?, ?, ?) -` - stmt, err := tx.Prepare(insertQuery) - L.IsError(err, `failed to tx.Prepare: `+insertQuery) - return stmt - }) - } for z := 0; z < insertTotal/insertThread; z++ { atomic.AddUint64(&insertDur, track(func() { var key string @@ -169,7 +174,7 @@ INSERT INTO ver3 VALUES(?, ?, ?, ?, ?) if insertUseAsync { // slow and high cpu usage, better use ch-timed-buffer, wait_for_async_insert=0 even worse const insertQuery = ` -INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, ?, ?, ?) +INSERT INTO ver4 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, ?, ?, ?, 0) ` _, err := ch.Exec(insertQuery, root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`)) if isError(err) { @@ -178,7 +183,7 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, } } else { // fast insert but slow to do other queries, also high cpu usage - if !timedBuffer.Insert([]any{root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`)}) { + if !timedBuffer.Insert([]any{root, bucket, key, verId, time.Now().Format(`2006-01-02 15:04:05.000000`), uint8(0)}) { atomic.AddUint64(&insertErr, 1) return } @@ -192,9 +197,6 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, if atomic.AddUint32(&insertThreadDone, 1) == insertThread { close(deleteQueue) } - if !insertUseAsync { - timedBuffer.Close() - } return nil }) } @@ -209,15 +211,23 @@ INSERT INTO ver3 SETTINGS async_insert=1, wait_for_async_insert=1 VALUES (?, ?, for z := 0; z < deleteThread; z++ { thread := z eg.Go(func() error { + defer fmt.Println(`delete thread done`, thread) for key := range deleteQueue { atomic.AddUint64(&deleteDur, track(func() { - const deleteQuery = ` -DELETE FROM ver3 WHERE root=? AND bucket=? AND key=? AND version_id=? + if insertUseAsync { + const deleteQuery = ` +DELETE FROM ver4 WHERE root=? AND bucket=? AND key=? AND version_id=? ` - _, err := ch.Exec(deleteQuery, root, bucket, key[0], key[1]) - if isError(err) { - atomic.AddUint64(&deleteErr, 1) + _, err := ch.Exec(deleteQuery, root, bucket, key[0], key[1]) + if isError(err) { + atomic.AddUint64(&deleteErr, 1) + } + } else { + if !timedBuffer.Insert([]any{root, bucket, key[0], key[1], time.Now().Format(`2006-01-02 15:04:05.000000`), uint8(1)}) { + atomic.AddUint64(&insertErr, 1) + return + } } atomic.AddUint64(&deleteDone, 1) })) @@ -299,7 +309,7 @@ DELETE FROM ver3 WHERE root=? AND bucket=? AND key=? AND version_id=? parts := len(strings.Split(pattern, `/`)) + 1 const listingQuery = ` SELECT arrayStringConcat(splitByChar('/', key, ?),'/'), MAX(version_id) -FROM ver3 +FROM ver4 FINAL WHERE root=? AND bucket=? AND key LIKE ? @@ -366,6 +376,9 @@ LIMIT 1001 if insertThreadDone >= insertThread && len(deleteQueue) == 0 { exitAfterSec-- if exitAfterSec < 0 { + if !insertUseAsync { + timedBuffer.Close() + } return nil } } diff --git a/testing.txt b/testing.txt new file mode 100644 index 0000000..85b6c80 --- /dev/null +++ b/testing.txt @@ -0,0 +1,41 @@ +CREATE TABLE IF NOT EXISTS ver4 +( + `root` LowCardinality(String) CODEC(LZ4HC), + `bucket` LowCardinality(String) CODEC(LZ4HC), + `key` String CODEC(LZ4HC), + `version_id` String CODEC(LZ4HC), + `ver` DateTime64 CODEC(LZ4HC), + `is_deleted` UInt8 CODEC(LZ4HC) +) +ENGINE = ReplacingMergeTree(ver, is_deleted) +ORDER BY (root, bucket, key, version_id); + +INSERT INTO ver4 VALUES('root1', 'bucket1', 'a', 'a', '2023-05-24', 0); +INSERT INTO ver4 VALUES('root1', 'bucket1', 'a', 'a', '2023-05-24', 1); +SELECT * FROM ver4; +┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐ +│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │ +└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘ +┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐ +│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 1 │ +└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘ + + SELECT * FROM ver4 WHERE is_deleted = 0; +┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐ +│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │ +└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘ + +-- doesn't merge +SELECT * FROM ver4 FINAL WHERE is_deleted = 0 ; +SELECT * FROM ver4; +┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐ +│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 0 │ +└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘ +┌─root──┬─bucket──┬─key─┬─version_id─┬─────────────────────ver─┬─is_deleted─┐ +│ root1 │ bucket1 │ a │ a │ 2023-05-24 00:00:00.000 │ 1 │ +└───────┴─────────┴─────┴────────────┴─────────────────────────┴────────────┘ + +-- does merge +SELECT * FROM ver4 FINAL ; +-- empty +