Estamos usando o Docker Compose para implementar os seguintes componentes:
-
PostgreSQL - Pré-requisito para usar um postgres externo
- pgoutput A partir da versão 10 do postgres o debezium consegue usar a replicação logica sem necessidade de adicionar plugins. Baste declarar no conector o nome do plugin: "plugin.name": "pgoutput"
- postgresql,permissões de usuários, permissoões de replicação
- decoderbufs
- wal2json
-
Kafka
- ZooKeeper
- Kafka Broker
- Kafka Connect with Debezium and Elasticsearch Connectors
-
Elasticsearch
Imagens docker do debezium 1.6 estavéis na criação do projeto e o uso de Dockerfile para gerar a imagen do debezium connector com o plugin do ElasticsearchSinkConnector 11.1.0.
docker-compose up --build -d
# aguarde os serviços estarem no ar e criar primeiro o conector do postgres(source).
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @reqs/connections/source.json
Acesse localhost:9000, devemos ter os topics criados e com as configurações da segunda imagem.
./start.sh
Caso o elasticsearch tenho usuário e senha, add os parâmetros abaixo:
"connection.username": "usuario",
"connection.password": "senha",
# Verifique o conteúdo do PostgreSQL database:
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE -c "SELECT * FROM users"'
# Verifique o conteúdo do Elasticsearch database:
curl http://localhost:9200/users-*/_search?pretty
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE'
test_db=# INSERT INTO users (email) VALUES ('apple@gmail.com');
# Verifique o conteúdo do Elasticsearch database:
curl http://localhost:9200/users-*/_search?q=id:6
{
...
"hits": {
"total": 1,
"max_score": 1.0,
"hits": [
{
"_index": "users",
"_type": "_doc",
"_id": "6",
"_score": 1.0,
"_source": {
"id": 6,
"email": "apple@gmail.com"
}
}
]
}
}
test_db=# UPDATE users SET email = 'tesla@gmail.com' WHERE id = 6;
# Verifique o conteúdo do Elasticsearch database:
curl http://localhost:9200/users-*/_search?q=id:6
{
...
"hits": {
"total": 1,
"max_score": 1.0,
"hits": [
{
"_index": "users",
"_type": "_doc",
"_id": "6",
"_score": 1.0,
"_source": {
"id": 6,
"email": "tesla@gmail.com"
}
}
]
}
}
test_db=# DELETE FROM users WHERE id = 6;
# Verifique o conteúdo do Elasticsearch database:
curl http://localhost:9200/users-*/_search?q=id:6
{
...
"hits": {
"total": 1,
"max_score": 1.0,
"hits": []
}
}
Verificar os plugins:
curl http://localhost:8083/connector-plugins -s | jq
Verificar os conectores:
curl http://localhost:8083/connectors -s | jq
Verificar a conexão com as bases:
curl http://localhost:8083/connectors/test_db-connector/status -s | jq
Deletar conector kafka:
curl -X DELETE http://localhost:8083/connectors/nomedoconector
https://github.com/YegorZaremba/sync-postgresql-with-elasticsearch-example