forked from vdesabou/kafka-docker-playground
-
Notifications
You must be signed in to change notification settings - Fork 1
/
salesforce-bulkapi-sink-with-bulkapi-source-repro.sh
executable file
·181 lines (155 loc) · 9.25 KB
/
salesforce-bulkapi-sink-with-bulkapi-source-repro.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/bin/bash
set -e
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
source ${DIR}/../../scripts/utils.sh
SALESFORCE_USERNAME=${SALESFORCE_USERNAME:-$1}
SALESFORCE_PASSWORD=${SALESFORCE_PASSWORD:-$2}
CONSUMER_KEY=${CONSUMER_KEY:-$3}
CONSUMER_PASSWORD=${CONSUMER_PASSWORD:-$4}
SECURITY_TOKEN=${SECURITY_TOKEN:-$5}
SALESFORCE_INSTANCE=${SALESFORCE_INSTANCE:-"https://login.salesforce.com"}
# second account (for Bulk API sink)
SALESFORCE_USERNAME_ACCOUNT2=${SALESFORCE_USERNAME_ACCOUNT2:-$6}
SALESFORCE_PASSWORD_ACCOUNT2=${SALESFORCE_PASSWORD_ACCOUNT2:-$7}
SECURITY_TOKEN_ACCOUNT2=${SECURITY_TOKEN_ACCOUNT2:-$8}
SALESFORCE_INSTANCE_ACCOUNT2=${SALESFORCE_INSTANCE_ACCOUNT2:-"https://login.salesforce.com"}
if [ -z "$SALESFORCE_USERNAME" ]
then
logerror "SALESFORCE_USERNAME is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_PASSWORD" ]
then
logerror "SALESFORCE_PASSWORD is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$CONSUMER_KEY" ]
then
logerror "CONSUMER_KEY is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$CONSUMER_PASSWORD" ]
then
logerror "CONSUMER_PASSWORD is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SECURITY_TOKEN" ]
then
logerror "SECURITY_TOKEN is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_USERNAME_ACCOUNT2" ]
then
logerror "SALESFORCE_USERNAME_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SALESFORCE_PASSWORD_ACCOUNT2" ]
then
logerror "SALESFORCE_PASSWORD_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
if [ -z "$SECURITY_TOKEN_ACCOUNT2" ]
then
logerror "SECURITY_TOKEN_ACCOUNT2 is not set. Export it as environment variable or pass it as argument"
exit 1
fi
${DIR}/../../environment/plaintext/start.sh "${PWD}/docker-compose.plaintext.yml"
log "Login with sfdx CLI"
docker exec sfdx-cli sh -c "sfdx sfpowerkit:auth:login -u \"$SALESFORCE_USERNAME\" -p \"$SALESFORCE_PASSWORD\" -r \"$SALESFORCE_INSTANCE\" -s \"$SECURITY_TOKEN\""
LEAD_FIRSTNAME=John_$RANDOM
LEAD_LASTNAME=Doe_$RANDOM
log "Add a Lead to Salesforce: $LEAD_FIRSTNAME $LEAD_LASTNAME"
docker exec sfdx-cli sh -c "sfdx force:data:record:create -u \"$SALESFORCE_USERNAME\" -s Lead -v \"FirstName='$LEAD_FIRSTNAME' LastName='$LEAD_LASTNAME' Company=Confluent PackageId__c=$LEAD_FIRSTNAME $LEAD_LASTNAM\""
log "Creating Salesforce Bulk API Source connector"
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
"kafka.topic": "sfdc-bulkapi-leads",
"tasks.max": "1",
"curl.logging": "true",
"salesforce.object" : "Lead",
"salesforce.instance" : "'"$SALESFORCE_INSTANCE_ACCOUNT2"'",
"salesforce.username" : "'"$SALESFORCE_USERNAME"'",
"salesforce.password" : "'"$SALESFORCE_PASSWORD"'",
"salesforce.password.token" : "'"$SECURITY_TOKEN"'",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1"
}' \
http://localhost:8083/connectors/salesforce-bulkapi-source/config | jq .
sleep 10
log "Verify we have received the data in sfdc-bulkapi-leads topic"
timeout 60 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic sfdc-bulkapi-leads --from-beginning --max-messages 1
log "Creating Salesforce Bulk API Sink connector"
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
"topics": "sfdc-bulkapi-leads",
"tasks.max": "1",
"curl.logging": "true",
"salesforce.object" : "Lead",
"salesforce.instance" : "'"$SALESFORCE_INSTANCE_ACCOUNT2"'",
"salesforce.username" : "'"$SALESFORCE_USERNAME_ACCOUNT2"'",
"salesforce.password" : "'"$SALESFORCE_PASSWORD_ACCOUNT2"'",
"salesforce.password.token" : "'"$SECURITY_TOKEN_ACCOUNT2"'",
"salesforce.ignore.fields" : "CleanStatus",
"salesforce.ignore.reference.fields" : "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"reporter.bootstrap.servers": "broker:9092",
"reporter.error.topic.name": "error-responses",
"reporter.error.topic.replication.factor": 1,
"reporter.result.topic.name": "success-responses",
"reporter.result.topic.replication.factor": 1,
"transforms" : "InsertField",
"transforms.InsertField.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field" : "_EventType",
"transforms.InsertField.static.value" : "created",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1",
"salesforce.use.custom.id.field": "true",
"override.event.type": "true",
"salesforce.sink.object.operation": "upsert",
"salesforce.custom.id.field.name": "PackageId__c"
}' \
http://localhost:8083/connectors/salesforce-bulkapi-sink/config | jq .
# FIXTHIS getting this if a record has empty PackageId__c:
# [2020-08-26 09:25:46,023] ERROR WorkerSinkTask{id=salesforce-bulkapi-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
# org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
# at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:567)
# at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
# at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
# at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
# at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
# at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
# at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
# at java.util.concurrent.FutureTask.run(FutureTask.java:266)
# at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
# at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
# at java.lang.Thread.run(Thread.java:748)
# Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: PackageId__c is not a valid field name
# at io.confluent.connect.salesforce.SalesforceBulkApiSinkTask.handleException(SalesforceBulkApiSinkTask.java:497)
# at io.confluent.connect.salesforce.SalesforceBulkApiSinkTask.batchRecords(SalesforceBulkApiSinkTask.java:429)
# at io.confluent.connect.salesforce.SalesforceBulkApiSinkTask.put(SalesforceBulkApiSinkTask.java:111)
# at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
# ... 10 more
# Caused by: org.apache.kafka.connect.errors.DataException: PackageId__c is not a valid field name
# at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
# at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
# at io.confluent.connect.salesforce.SalesforceBulkApiSinkTask.validateSinkRecord(SalesforceBulkApiSinkTask.java:481)
# at io.confluent.connect.salesforce.SalesforceBulkApiSinkTask.batchRecords(SalesforceBulkApiSinkTask.java:410)
# ... 12 more
sleep 10
log "Verify topic success-responses"
timeout 60 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic success-responses --from-beginning --max-messages 1
# log "Verify topic error-responses"
# timeout 20 docker exec broker kafka-console-consumer -bootstrap-server broker:9092 --topic error-responses --from-beginning --max-messages 1
log "Login with sfdx CLI on the account #2"
docker exec sfdx-cli sh -c "sfdx sfpowerkit:auth:login -u \"$SALESFORCE_USERNAME_ACCOUNT2\" -p \"$SALESFORCE_PASSWORD_ACCOUNT2\" -r \"$SALESFORCE_INSTANCE_ACCOUNT2\" -s \"$SECURITY_TOKEN_ACCOUNT2\""
log "Get the Lead created on account #2"
docker exec sfdx-cli sh -c "sfdx force:data:record:get -u \"$SALESFORCE_USERNAME_ACCOUNT2\" -s Lead -w \"FirstName='$LEAD_FIRSTNAME' LastName='$LEAD_LASTNAME' Company=Confluent\""