-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): add DynamoDB sink #16670
Conversation
src/connector/src/sink/dynamodb.rs
Outdated
| ScalarRefImpl::Struct(_) | ||
| ScalarRefImpl::Jsonb(_)) => AttributeValue::S(string.to_text()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I'm not sure, the struct here must be named, since in
create sink
? - Does this mean providing an
TimestampHandlingMode
-like option forjsonb
format to DynamoDB sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can consider struct as a list of named key values pairs, right?
struct <
id varchar,
name varchar
>
Map<String, Value> {
"id": "123",
"name": "jinser"
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think the current implementation is like this
refactor dynamodb sink to use batch_write_item
DynamoDB can be deployed locally via docker image: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html |
Should I add it to this PR? Or should I open another PR? |
@xiangjinwu @fuyufjh @yuhao-su I've made the changes, please take a review when you have time 😃 |
Both are fine, up to you 😃 |
async fn write_chunk(&mut self) -> Result<()> { | ||
if !self.request_items.is_empty() { | ||
let table = self.table.clone(); | ||
let req_items = std::mem::take(&mut self.request_items) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to use take here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be no problem, is there anything to worry about
9a903db
to
7e8f23c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
#[serde(flatten)] | ||
pub aws_auth_props: AwsAuthProps, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it enable IAM like auth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for Cargo.lock
Co-authored-by: xxhZs <1060434431@qq.com> Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
1:1 mapping DynamoDB table sink.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
new sink: