Skip to content

Latest commit

 

History

History
 
 

connect-aws-dynamodb-sink

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

AWS DynamoDB Sink connector

asciinema

Objective

Quickly test AWS DynamoDB connector.

AWS Setup

This project assumes ~/.aws/credentials is set, see docker-compose.ymlfile for connect:

    connect:
    <snip>
    volumes:
        - $HOME/.aws/credentials:/root/.aws/credentials:ro

How to run

Simply run:

$ ./dynamodb.sh

Details of what the script is doing

Sending messages to topic topic1

$ seq -f "{\"f1\": \"value%g\"}" 10 | docker exec -i connect kafka-avro-console-producer --broker-list broker:9092 --property schema.registry.url=http://schema-registry:8081 --topic topic1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Creating AWS DynamoDB Sink connector

$ curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class": "io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector",
                    "tasks.max": "1",
                    "topics": "topic1",
                    "aws.dynamodb.region": "us-east-1",
                    "aws.dynamodb.endpoint": "https://dynamodb.us-east-1.amazonaws.com",
                    "confluent.license": "",
                    "confluent.topic.bootstrap.servers": "broker:9092",
                    "confluent.topic.replication.factor": "1"
          }' \
     http://localhost:8083/connectors/dynamodb-sink/config | jq .

Verify data is in DynamoDB

$ aws dynamodb scan --table-name topic1 --region us-east-1

Results:

{
    "Items": [
        {
            "f1": {
                "S": "value1"
            },
            "offset": {
                "N": "0"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value2"
            },
            "offset": {
                "N": "1"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value3"
            },
            "offset": {
                "N": "2"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value4"
            },
            "offset": {
                "N": "3"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value5"
            },
            "offset": {
                "N": "4"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value6"
            },
            "offset": {
                "N": "5"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value7"
            },
            "offset": {
                "N": "6"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value8"
            },
            "offset": {
                "N": "7"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value9"
            },
            "offset": {
                "N": "8"
            },
            "partition": {
                "N": "0"
            }
        },
        {
            "f1": {
                "S": "value10"
            },
            "offset": {
                "N": "9"
            },
            "partition": {
                "N": "0"
            }
        }
    ],
    "Count": 10,
    "ScannedCount": 10,
    "ConsumedCapacity": null
}

N.B: Control Center is reachable at http://127.0.0.1:9021