Skip to content

Latest commit

 

History

History
 
 

connect-azure-service-bus-source

Azure Service Bus Source connector

asciinema

Objective

Quickly test Azure Service Bus Source connector.

How to run

Simply run:

$ ./azure-service-bus.sh

Details of what the script is doing

Logging to Azure using browser (or using environment variables AZ_USER and AZ_PASS if set)

az login

All the Service Bus setup is automated:

AZURE_NAME=playground$USER$TRAVIS_JOB_NUMBER
AZURE_NAME=${AZURE_NAME//[-._]/}
AZURE_RESOURCE_GROUP=$AZURE_NAME
AZURE_SERVICE_BUS_NAMESPACE=$AZURE_NAME
AZURE_SERVICE_BUS_QUEUE_NAME=$AZURE_NAME
AZURE_REGION=westeurope

# Creating Azure Resource Group $AZURE_RESOURCE_GROUP"
az group create \
    --name $AZURE_RESOURCE_GROUP \
    --location $AZURE_REGION
# Creating Azure Service Bus namespace"
az servicebus namespace create \
    --name $AZURE_SERVICE_BUS_NAMESPACE \
    --resource-group $AZURE_RESOURCE_GROUP \
    --location $AZURE_REGION
# Creating Azure Service Bus Queue"
az servicebus queue create \
    --name $AZURE_SERVICE_BUS_QUEUE_NAME \
    --resource-group $AZURE_RESOURCE_GROUP \
    --namespace-name $AZURE_SERVICE_BUS_NAMESPACE
# Get SAS key for RootManageSharedAccessKey"
AZURE_SAS_KEY=$(az servicebus namespace authorization-rule keys list \
    --resource-group $AZURE_RESOURCE_GROUP \
    --namespace-name $AZURE_SERVICE_BUS_NAMESPACE \
    --name "RootManageSharedAccessKey" | jq -r '.primaryKey')

The connector is created with:

$ curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
                "connector.class": "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
                "kafka.topic": "servicebus-topic",
                "tasks.max": "1",
                "azure.servicebus.sas.keyname": "RootManageSharedAccessKey",
                "azure.servicebus.sas.key": "'"$AZURE_SAS_KEY"'",
                "azure.servicebus.namespace": "'"$AZURE_SERVICE_BUS_NAMESPACE"'",
                "azure.servicebus.entity.name": "'"$AZURE_SERVICE_BUS_QUEUE_NAME"'",
                "azure.servicebus.subscription" : "",
                "azure.servicebus.max.message.count" : "10",
                "azure.servicebus.max.waiting.time.seconds" : "30",
                "confluent.license": "",
                "confluent.topic.bootstrap.servers": "broker:9092",
                "confluent.topic.replication.factor": "1"
          }' \
     http://localhost:8083/connectors/azure-service-bus-source/config | jq .

Inject data in Service Bus, using QueuesGettingStarted java program

$ SB_SAMPLES_CONNECTIONSTRING="Endpoint=sb://$AZURE_SERVICE_BUS_NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$AZURE_SAS_KEY"
$ docker exec -e SB_SAMPLES_CONNECTIONSTRING="$SB_SAMPLES_CONNECTIONSTRING" -e AZURE_SERVICE_BUS_QUEUE_NAME="$AZURE_SERVICE_BUS_QUEUE_NAME" simple-send bash -c "java -jar queuesgettingstarted-1.0.0-jar-with-dependencies.jar"

Verifying topic servicebus-topic

$ timeout 60 docker exec connect kafka-avro-console-consumer -bootstrap-server broker:9092 --property schema.registry.url=http://schema-registry:8081 --topic servicebus-topic --from-beginning --max-messages 10

Results:

{
    "contentType": "application/json",
    "correlationId": null,
    "deadLetterSource": null,
    "deliveryCount": 0,
    "enqueuedTimeUtc": 1584370640,
    "getTo": null,
    "label": "Scientist",
    "lockToken": {
        "string": "b840ea3f-94ac-4085-baba-f03ed929602b"
    },
    "lockedUntilUtc": {
        "long": 1584370700108
    },
    "messageBody": "{\"firstName\":\"Isaac\",\"name\":\"Newton\"}",
    "messageProperties": null,
    "partitionKey": null,
    "replyTo": null,
    "replyToSessionId": null,
    "sequenceNumber": {
        "long": 7
    },
    "sessionId": null,
    "timeToLive": 120000
}

Deleting resource group:

$ az group delete --name $AZURE_RESOURCE_GROUP --yes

N.B: Control Center is reachable at http://127.0.0.1:9021