Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly once support for clickhouse sink #147

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions pkg/providers/clickhouse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
## **Exactly Once Support**

The `ExactlyOnceSink` is a wrapper layer that provides exactly-once delivery semantics for a data pipeline. This ensures that each batch of data is processed and delivered to the destination exactly once, even in the presence of retries or failures.

The core components of this implementation are:
- **Deduper**: Manages deduplication and state transitions for each data partition.
- **InsertBlockStore**: Tracks the state of processed data blocks (e.g., "before" and "after" states).
- **Sink**: Responsible for pushing the actual data to the destination.
- **Batch**: Represents incoming data to be processed.

Key assumptions to support for exactly once in Clickhouse:

- Clickhouse is replicated, i.e. contains either Zookeeper or CHKeeper.
- `KeeperMap` table engine is enabled at target database.
- Source provide `PartID`-metadata, to split logical table into sequence of related changes
- Source provide `_offset`-column, each offset should be consistently growing.
- Source provide at-least-once delivery semantic.

The diagrams below explain the flow, structure, and interactions within the system.

---

### **1. Sequence Diagram**
**Purpose:** The sequence diagram illustrates the interaction between components (`deduper`, `store`, `sink`, and `batch`) over time. It focuses on the sequence of operations performed to achieve exactly-once semantics.

Key Highlights:
- The `deduper` retrieves the last processed block and evaluates the current batch to determine actions.
- Possible actions include skipping already processed items, retrying partially processed blocks, or inserting new data.
- State transitions in the `store` occur before and after data is pushed to the sink.

```mermaid
sequenceDiagram
participant deduper
participant store
participant sink
participant batch

deduper->>store: Get(lastBlock, lastStatus)
alt LastBlock is nil
deduper->>store: Set(currBlock, Before)
deduper->>sink: Push(batch)
deduper->>store: Set(currBlock, After)
else LastBlock exists
deduper->>batch: Split(batch into skip, retry, insert)
alt Retry Needed
deduper->>sink: Push(retry)
deduper->>store: Set(lastBlock, After)
end
alt Insert Needed
deduper->>store: Set(currBlock, Before)
deduper->>sink: Push(insert)
deduper->>store: Set(currBlock, After)
end
end
```

---

### **2. Class Diagram**
**Purpose:** This diagram shows the relationships and dependencies between the key components (`deduper`, `sink`, `store`, and `batch`).

Key Highlights:
- The `Deduper` is the central orchestrator, responsible for processing batches and coordinating actions.
- The `InsertBlockStore` is used to store the state of data blocks to support deduplication.
- The `Sink` is the final destination for processed data.

```mermaid
classDiagram
class Deduper {
- Sinker sink
- InsertBlockStore store
- TablePartID part
+ Process(batch []ChangeItem) func() error
- splitBatch(batch []ChangeItem, lastBlock *InsertBlock) (skip, retry, insert)
}

class InsertBlockStore {
+ Get(part TablePartID) (block *InsertBlock, status InsertBlockStatus, err error)
+ Set(part TablePartID, block *InsertBlock, status InsertBlockStatus) error
}

class Sinker {
+ Push(batch []ChangeItem) error
}

class Batch {
- Offset() (uint64, bool)
}

Deduper --> InsertBlockStore : uses
Deduper --> Sinker : uses
Deduper --> Batch : processes
```

---

### **3. Component Diagram**
**Purpose:** The component diagram highlights the interaction between the `ExactlyOnceSink` wrapper and the existing system components.

Key Highlights:
- The `ExactlyOnceSink` manages the deduplication and state tracking for incoming batches.
- It wraps a non-exactly-once sink and uses the `deduper` and `InsertBlockStore` to ensure exactly-once delivery.
- The `deduper` performs the actual processing for each data partition.

```mermaid
graph TD
subgraph "Exactly Once"
direction LR
A[ExactlyOnceSink] -->|Uses| D[deduper]
A -->|Uses| C[InsertBlockStore]
D -->|Split| F[InsertBlock]
end

subgraph "Sink System"
direction LR
G[Non-Exactly Once Sink abstract.Sinker]
end

D -->|Push| G
```

---

### **4. Data Flow Diagram**
**Purpose:** The data flow diagram visualizes how data (e.g., batches and blocks) flows through the system and is processed.

Key Highlights:
- Data is split into different actions (skip, retry, or insert) based on its offset and the state of the last processed block.
- State transitions (`before` and `after`) are stored in the `InsertBlockStore` to track progress.

```mermaid
flowchart TD
Input[Input: Batch of Items] --> Deduper[Deduper Logic]
Deduper -->|New Block| StoredBefore[Store: Before]
Deduper -->|Skip Items| Log[Log Skipped Items]
Deduper -->|Retry Block| Sink[Sink: Push Batch]
StoredBefore -->|Insert Block| Sink
Sink --> StoreAfter[Store: After]
```

---

### **5. Decision Tree Diagram**
**Purpose:** The decision tree diagram outlines the branching logic of the `splitBatch` function, which determines how items in the batch are categorized.

Key Highlights:
- Each item in the batch is evaluated to decide whether it should be skipped, retried, or inserted.
- The decision is based on the offset of the item compared to the range of the last processed block.

```mermaid
graph TD
Start[Start: Iterate Batch] --> OffsetCheck[Check Item Offset]
OffsetCheck -->|Offset < lastBlock.min| Skip[Add Item to Skip]
OffsetCheck -->|Offset >= lastBlock.min AND Offset < lastBlock.max| Retry[Add Item to Retry]
OffsetCheck -->|Offset >= lastBlock.max| Insert[Add Item to Insert]
Skip --> Next[Next Item]
Retry --> Next
Insert --> Next
Next -->|More Items| OffsetCheck
Next -->|No More Items| End[Split Complete]
```

---

Let me know if further refinements are needed!
2 changes: 2 additions & 0 deletions pkg/providers/clickhouse/model/model_ch_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type ChDestination struct {
InflightBuffer int // deprecated: use BufferTriggingSize instead. Items' count triggering a buffer flush
BufferTriggingSize uint64
RootCACertPaths []string

ExactlyOnce bool // experimental: enables keeper map to store offsets and keep consistence block of inserts.
}

type InsertParams struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/providers/clickhouse/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error) {
if err != nil {
return nil, xerrors.Errorf("failed to create ClickHouse sinker: %w", err)
}
dst, ok := p.transfer.Dst.(*model.ChDestination)
if !ok {
return nil, xerrors.Errorf("unexpected source type: %T", p.transfer.Src)
}
if dst.ExactlyOnce {
db, err := makeShardConnection(dst.ToStorageParams(), "")
if err != nil {
return nil, xerrors.Errorf("unable to init keeper db connection: %w", err)
}
keeperStore, err := NewKeeperBlockStore("exactly_once_keeper", db)
if err != nil {
return nil, xerrors.Errorf("unable to init keeper block store: %w", err)
}
return NewExactlyOnce(s, keeperStore, log.With(p.logger, log.Any("component", "exactly_once"))), nil
}
return s, nil
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/providers/clickhouse/sink_exactly_once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package clickhouse

import (
"golang.org/x/sync/errgroup"

"github.com/doublecloud/transfer/pkg/abstract"
"go.ytsaurus.tech/library/go/core/log"
)

var (
_ abstract.Sinker = (*ExactlyOnceSink)(nil)
)

type ExactlyOnceSink struct {
sink abstract.Sinker
store InsertBlockStore
lgr log.Logger

sm map[abstract.TablePartID]*deduper
}

func (e *ExactlyOnceSink) Close() error {
return e.sink.Close()
}

func (e *ExactlyOnceSink) Push(items []abstract.ChangeItem) error {
partitionBatches := map[abstract.TablePartID][]abstract.ChangeItem{}
for _, item := range items {
partitionBatches[item.TablePartID()] = append(partitionBatches[item.TablePartID()], item)
}
gr := errgroup.Group{}
for part, batch := range partitionBatches {
if _, ok := e.sm[part]; !ok {
e.sm[part] = newDeduper(
part,
e.sink,
e.store,
e.lgr,
)
}
gr.Go(e.sm[part].Process(batch))
}
return gr.Wait()
}

func NewExactlyOnce(sink abstract.Sinker, store InsertBlockStore, lgr log.Logger) *ExactlyOnceSink {
return &ExactlyOnceSink{
sink: sink,
store: store,
lgr: lgr,
sm: map[abstract.TablePartID]*deduper{},
}
}
32 changes: 32 additions & 0 deletions pkg/providers/clickhouse/sink_exactly_once_block_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package clickhouse

import (
"fmt"

"github.com/doublecloud/transfer/pkg/abstract"
)

type InsertBlockStore interface {
Get(id abstract.TablePartID) (*InsertBlock, InsertBlockStatus, error)
Set(id abstract.TablePartID, block *InsertBlock, status InsertBlockStatus) error
}

type InsertBlockStatus string

const (
InsertBlockStatusEmpty = InsertBlockStatus("")
InsertBlockStatusBefore = InsertBlockStatus("before")
InsertBlockStatusAfter = InsertBlockStatus("after")
)

var (
_ fmt.Stringer = (*InsertBlock)(nil)
)

type InsertBlock struct {
min, max uint64
}

func (b InsertBlock) String() string {
return fmt.Sprintf("%v->%v", b.min, b.max)
}
72 changes: 72 additions & 0 deletions pkg/providers/clickhouse/sink_exactly_once_block_store_keeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package clickhouse

import (
"database/sql"
"fmt"

"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
)

var (
_ InsertBlockStore = (*KeeperBlockStore)(nil)
)

type KeeperBlockStore struct {
table string
db *sql.DB
}

func (k *KeeperBlockStore) Get(id abstract.TablePartID) (*InsertBlock, InsertBlockStatus, error) {
lastBlockRow := k.db.QueryRow(fmt.Sprintf(`
SELECT min_offset, max_offset, status
FROM %s
WHERE part_id = ?;
`, k.table), id.FqtnWithPartID())
var block InsertBlock
var status InsertBlockStatus
if err := lastBlockRow.Scan(&block.min, &block.max, &status); err != nil {
if xerrors.Is(err, sql.ErrNoRows) {
return nil, InsertBlockStatusEmpty, nil
}
return nil, "", xerrors.Errorf("unable to get block: %w", err)
}
return &block, status, nil
}

func (k *KeeperBlockStore) Set(id abstract.TablePartID, block *InsertBlock, status InsertBlockStatus) error {
_, err := k.db.Exec(fmt.Sprintf(`
ALTER TABLE %s
UPDATE min_offset = ?, max_offset = ?, status = ?
WHERE part_id = ?;
`, k.table), block.min, block.max, status, id.FqtnWithPartID())
if err != nil {
return xerrors.Errorf("unable to set block: %w", err)
}
return nil
}

func NewKeeperBlockStore(
table string,
db *sql.DB,
) (*KeeperBlockStore, error) {
ddl := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %[1]s
(
part_id String,
min_offset UInt64,
max_offset UInt64,
status String
)
ENGINE = KeeperMap('/%[1]s', 128)
PRIMARY KEY part_id
`, table)
_, err := db.Exec(ddl)
if err != nil {
return nil, xerrors.Errorf("unable to init keeper block store table: %s: %w", ddl, err)
}
return &KeeperBlockStore{
table: table,
db: db,
}, nil
}
Loading
Loading