Skip to content

Commit

Permalink
feat: kafka json append only sink (#3682)
Browse files Browse the repository at this point in the history
* stage

Signed-off-by: tabVersion <tabvision@bupt.icu>

* add datum_to_json_object

Signed-off-by: Peng Chen <peng@singularity-data.com>

* stage

Signed-off-by: tabVersion <tabvision@bupt.icu>

* finishing

Signed-off-by: tabVersion <tabvision@bupt.icu>

* make risedev happy

Signed-off-by: tabVersion <tabvision@bupt.icu>

* add ut

Signed-off-by: tabVersion <tabvision@bupt.icu>

* pending error: Transaction error: Operation not valid in state Ready

Signed-off-by: tabVersion <tabvision@bupt.icu>

* fix by changing epoch number

Signed-off-by: tabVersion <tabvision@bupt.icu>

* add doc for sink trait

Signed-off-by: tabVersion <tabvision@bupt.icu>

* add comment in exec

Signed-off-by: tabVersion <tabvision@bupt.icu>

* remove take_snapshot

Signed-off-by: tabVersion <tabvision@bupt.icu>

* fix warning

Signed-off-by: tabVersion <tabvision@bupt.icu>

* fix table_id

Signed-off-by: tabVersion <tabvision@bupt.icu>

Co-authored-by: Peng Chen <peng@singularity-data.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 12, 2022
1 parent 0b65e68 commit fd0b7e2
Show file tree
Hide file tree
Showing 14 changed files with 793 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ message SourceNode {
message SinkNode {
uint32 table_id = 1;
repeated int32 column_ids = 2;
map<string, string> properties = 3;
}

message ProjectNode {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub enum ErrorCode {
},
#[error("Invalid Parameter Value: {0}")]
InvalidParameterValue(String),
#[error("MySQL error: {0}")]
#[error("Sink error: {0}")]
SinkError(BoxedError),

/// This error occurs when the meta node receives heartbeat from a previous removed worker
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ static_assertions = "1"
tempfile = "3"
thiserror = "1"
tokio = { version = "=0.2.0-alpha.3", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] }
tokio-retry = "0.3"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { version = "=0.2.0-alpha.3", package = "madsim-tonic" }
tracing = { version = "0.1" }
twox-hash = "1"
url = "2"
urlencoding = "2"
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![feature(generators)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![warn(clippy::dbg_macro)]
#![warn(clippy::disallowed_methods)]
#![warn(clippy::doc_markdown)]
Expand Down
Loading

0 comments on commit fd0b7e2

Please sign in to comment.