Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit c38f305

Browse files
#15 - Store user group information
1 parent 3efa8c6 commit c38f305

File tree

6 files changed

+193
-36
lines changed

6 files changed

+193
-36
lines changed

Diff for: README.md

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ The following parameters can be set in config files or in env variables:
2424
- UBAHN_UPDATE_TOPIC: the update ubahn entity Kafka message topic, default value is 'u-bahn.action.update'
2525
- UBAHN_DELETE_TOPIC: the delete ubahn entity Kafka message topic, default value is 'u-bahn.action.delete'
2626
- UBAHN_AGGREGATE_TOPIC: the ubahn entity aggregate topic, that contains create, update and delete topics. Default value is 'u-bahn.action.aggregate'
27+
- GROUPS_MEMBER_ADD_TOPIC: the add groups member Kafka message topic, default value is 'groups.notification.member.add'
28+
- GROUPS_MEMBER_DELETE_TOPIC: the delete groups member Kafka message topic, default value is 'groups.notification.member.delete'
29+
- GROUPS_MEMBERSHIP_TYPE: the groups membership type that should be processed, default value is 'user'
2730
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
2831
- ES.AWS_REGION: The Amazon region to use when using AWS Elasticsearch service, default value is 'us-east-1'
2932
- ES.API_VERSION: Elasticsearch API version, default value is '7.4'
@@ -49,6 +52,7 @@ The following parameters can be set in config files or in env variables:
4952
- ES.USER_ROLE_PROPERTY_NAME: the user property name of role, default value is 'roles',
5053
- ES.USER_SKILL_PROPERTY_NAME: the user property name of skill, default value is 'skills'
5154
- ES.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: the org property name of org skill providers, default value is 'skillProviders'
55+
- ES.USER_GROUP_PROPERTY_NAME: the user property name of group, default value is 'groups'
5256

5357
There is a `/health` endpoint that checks for the health of the app. This sets up an expressjs server and listens on the environment variable `PORT`. It's not part of the configuration file and needs to be passed as an environment variable
5458

Diff for: VERIFICATION.md

+59-16
Original file line numberDiff line numberDiff line change
@@ -24,41 +24,38 @@ docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-produ
2424
```
2525

2626
1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
27-
2. start kafka-console-producer to write messages to `u-bahn.action.create`
28-
topic:
29-
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.create`
30-
3. write message:
31-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212","handle":"user"}}`
27+
2. write message:
28+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212","handle":"user","originalTopic":"u-bahn.action.create"}}`
3229
4. Watch the app console, It will show message successfully handled.
3330
5. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data.
3431

3532
6. write message:
36-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievement","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","achievementsProviderId":"c77326d8-ef16-4be0-b844-d5c384b7bb8b","name":"achievement","uri":"https://google.com","certifierId":"b8726ca1-557e-4502-8f9b-25044b9c123d","certifiedDate":"2019-07-08T00:00:00.000Z"}}`
33+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievement","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","achievementsProviderId":"c77326d8-ef16-4be0-b844-d5c384b7bb8b","name":"achievement","uri":"https://google.com","certifierId":"b8726ca1-557e-4502-8f9b-25044b9c123d","certifiedDate":"2019-07-08T00:00:00.000Z","originalTopic":"u-bahn.action.create"}}`
3734
7. Watch the app console, It will show message successfully handled.
3835
8. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data.
3936

4037
9. write message:
41-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievementprovider","id":"7b4f98b1-5831-45fe-a71f-8454d11eb8e8","name":"achievementprovider"}}`
38+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievementprovider","id":"7b4f98b1-5831-45fe-a71f-8454d11eb8e8","name":"achievementprovider","originalTopic":"u-bahn.action.create"}}`
4239
10. Watch the app console, It will show message successfully handled.
4340
11. Run Command `npm run view-data achievementprovider 7b4f98b1-5831-45fe-a71f-8454d11eb8e8` to verify the elastic data.
4441

4542
12. write message:
46-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"attributegroup","id":"720c34f9-0fd4-46fd-9293-4a8cfdcd3e96","organizationId":"017733ad-4704-4c7e-ae60-36b3332731df","name":"attributegroup"}}`
43+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"attributegroup","id":"720c34f9-0fd4-46fd-9293-4a8cfdcd3e96","organizationId":"017733ad-4704-4c7e-ae60-36b3332731df","name":"attributegroup","originalTopic":"u-bahn.action.create"}}`
4744
13. Watch the app console, It will show message successfully handled.
4845
14. Run Command `npm run view-data attributegroup 720c34f9-0fd4-46fd-9293-4a8cfdcd3e96` to verify the elastic data.
4946

5047
15. write message:
51-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"externalprofile","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","organizationId":"e2aecf8b-532d-4625-b8e2-575110b9f944","uri":"https:google.com"}}`
48+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"externalprofile","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","organizationId":"e2aecf8b-532d-4625-b8e2-575110b9f944","uri":"https:google.com","originalTopic":"u-bahn.action.create"}}`
5249
16. Watch the app console, It will show message successfully handled.
5350
17. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data.
5451

5552
18. write message:
56-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"organization","id":"603d4264-cdb0-47f1-914e-f053abc60422","name":"organization"}}`
53+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"organization","id":"603d4264-cdb0-47f1-914e-f053abc60422","name":"organization","originalTopic":"u-bahn.action.create"}}`
5754
19. Watch the app console, It will show message successfully handled.
5855
20. Run Command `npm run view-data organization 603d4264-cdb0-47f1-914e-f053abc60422` to verify the elastic data.
5956

6057
21. write message:
61-
`{"topic":"u-bahn.action.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"role","id":"188446f1-02dc-4fc7-b74e-ab7ea3033a57","name":"role"}}`
58+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"role","id":"188446f1-02dc-4fc7-b74e-ab7ea3033a57","name":"role","originalTopic":"u-bahn.action.create"}}`
6259
22. Watch the app console, It will show message successfully handled.
6360
23. Run Command `npm run view-data role 188446f1-02dc-4fc7-b74e-ab7ea3033a57` to verify the elastic data.
6461

@@ -89,17 +86,18 @@ topic:
8986

9087
39. Repeat step 3 again and you will see error message in app console indicate conflict error.
9188

92-
40. start kafka-console-producer to write messages to `u-bahn.action.update`
93-
topic:
94-
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.update`
9589

90+
91+
92+
93+
40. Now, let's perform the update operations and verify.
9694
41. write message to update the user:
97-
`{"topic":"u-bahn.action.update","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212","handle":"update_user"}}`
95+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212","handle":"update_user","originalTopic":"u-bahn.action.update"}}`
9896
42. Watch the app console, It will show message successfully handled.
9997
43. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data has been updated.
10098

10199
44. write message to update the achievement:
102-
`{"topic":"u-bahn.action.update","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievement","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","achievementsProviderId":"c77326d8-ef16-4be0-b844-d5c384b7bb8b","name":"update_name","uri":"https://facebook.com"}}`
100+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"achievement","userId":"391a3656-9a01-47d4-8c6d-64b68c44f212","achievementsProviderId":"c77326d8-ef16-4be0-b844-d5c384b7bb8b","name":"update_name","uri":"https://facebook.com","originalTopic":"u-bahn.action.update"}}`
103101
45. Watch the app console, It will show message successfully handled.
104102
46. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data has been updated.
105103

@@ -211,3 +209,48 @@ topic:
211209
`{"topic":"u-bahn.action.delete","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212"}}`
212210
109. Watch the app console, It will show message successfully handled.
213211
110. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data has been deleted.
212+
213+
214+
# Verification (with groups)
215+
216+
1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
217+
2. start kafka-console-producer to write messages to `u-bahn.action.aggregate`
218+
topic:
219+
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.create`
220+
3. write message:
221+
`{"topic":"u-bahn.action.aggregate","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212","handle":"user","originalTopic":"u-bahn.action.aggregate"}}`
222+
4. Watch the app console, It will show message successfully handled.
223+
5. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data.
224+
225+
6. start kafka-console-producer to write messages to `groups.notification.member.add`
226+
topic:
227+
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic groups.notification.member.add`
228+
7. write message:
229+
`{"topic":"groups.notification.member.add","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"c2f302cf-759a-4847-8acd-843e258359db","groupId":"036cc9c1-189a-4cf6-853b-0f5bc9b4ce75","oldId":"20000309","name":"House Stark","createdAt":"2020-09-11T13:14:54.108Z","createdBy":"8547899","universalUID":"391a3656-9a01-47d4-8c6d-64b68c44f212","membershipType":"user"}}`
230+
8. Watch the app console, It will show message successfully handled.
231+
9. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data.
232+
233+
234+
10. Repeat again and you will see error message in app console indicate conflict error.
235+
write message:
236+
`{"topic":"groups.notification.member.add","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"c2f302cf-759a-4847-8acd-843e258359db","groupId":"036cc9c1-189a-4cf6-853b-0f5bc9b4ce75","oldId":"20000309","name":"House Stark","createdAt":"2020-09-11T13:14:54.108Z","createdBy":"8547899","universalUID":"391a3656-9a01-47d4-8c6d-64b68c44f212","membershipType":"user"}}`
237+
238+
11. start kafka-console-producer to write messages to `groups.notification.member.delete`
239+
topic:
240+
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic groups.notification.member.delete`
241+
12. write message to remove the groups user:
242+
`{"topic":"groups.notification.member.delete","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"groupId":"036cc9c1-189a-4cf6-853b-0f5bc9b4ce75","name":".NET Taas Project","oldId":"20000335","memberId":"00000000","universalUID":"391a3656-9a01-47d4-8c6d-64b68c44f212"}}`
243+
13. Watch the app console, It will show message successfully handled.
244+
14. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data
245+
246+
15. Repeat again and you will see error message in app console indicate not found error.
247+
write message:
248+
`{"topic":"groups.notification.member.delete","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"groupId":"036cc9c1-189a-4cf6-853b-0f5bc9b4ce75","name":".NET Taas Project","oldId":"20000335","memberId":"00000000","universalUID":"391a3656-9a01-47d4-8c6d-64b68c44f212"}}`
249+
250+
16. start kafka-console-producer to write messages to `u-bahn.action.delete`
251+
topic:
252+
`docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.delete`
253+
17. write message to delete the user:
254+
`{"topic":"u-bahn.action.delete","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"resource":"user","id":"391a3656-9a01-47d4-8c6d-64b68c44f212"}}`
255+
18. Watch the app console, It will show message successfully handled.
256+
19. Run Command `npm run view-data user 391a3656-9a01-47d4-8c6d-64b68c44f212` to verify the elastic data has been deleted.

Diff for: config/default.js

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ module.exports = {
1818
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
1919
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
2020
UBAHN_AGGREGATE_TOPIC: process.env.UBAHN_AGGREGATE_TOPIC || 'u-bahn.action.aggregate',
21+
GROUPS_MEMBER_ADD_TOPIC: process.env.GROUPS_MEMBER_ADD_TOPIC || 'groups.notification.member.add',
22+
GROUPS_MEMBER_DELETE_TOPIC: process.env.GROUPS_MEMBER_DELETE_TOPIC || 'groups.notification.member.delete',
23+
GROUPS_MEMBERSHIP_TYPE: process.env.GROUPS_MEMBERSHIP_TYPE || 'user',
2124

2225
ES: {
2326
HOST: process.env.ES_HOST || 'localhost:9200',
@@ -45,6 +48,7 @@ module.exports = {
4548
USER_ATTRIBUTE_PROPERTY_NAME: process.env.USER_ATTRIBUTE_PROPERTY_NAME || 'attributes',
4649
USER_ROLE_PROPERTY_NAME: process.env.USER_ROLE_PROPERTY_NAME || 'roles',
4750
USER_SKILL_PROPERTY_NAME: process.env.USER_SKILL_PROPERTY_NAME || 'skills',
51+
USER_GROUP_PROPERTY_NAME: process.env.USER_GROUP_PROPERTY_NAME || 'groups',
4852

4953
ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
5054
}

Diff for: docker-kafka-es/docker-compose.yml

+8-7
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
# KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
16-
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1"
15+
KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1,groups.notification.member.add:1:1,groups.notification.member.delete:1:1"
1716
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
18-
# esearch:
19-
# image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
20-
# container_name: ubahn-data-processor-es_es
21-
# ports:
22-
# - "9200:9200"
17+
esearch:
18+
image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
19+
container_name: ubahn-data-processor-es_es
20+
ports:
21+
- "9200:9200"
22+
environment:
23+
- discovery.type=single-node

Diff for: src/app.js

+27-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const healthcheck = require('topcoder-healthcheck-dropin')
1010
const logger = require('./common/logger')
1111
const helper = require('./common/helper')
1212
const ProcessorService = require('./services/ProcessorService')
13+
const GroupsProcessorService = require('./services/GroupsProcessorService')
1314
const Mutex = require('async-mutex').Mutex
1415

1516
// Start kafka consumer
@@ -69,18 +70,31 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6970
}
7071
const transactionId = _.uniqueId('transaction_')
7172
try {
72-
switch (messageJSON.payload.originalTopic) {
73-
case config.UBAHN_CREATE_TOPIC:
74-
await ProcessorService.processCreate(messageJSON, transactionId)
75-
break
76-
case config.UBAHN_UPDATE_TOPIC:
77-
await ProcessorService.processUpdate(messageJSON, transactionId)
78-
break
79-
case config.UBAHN_DELETE_TOPIC:
80-
await ProcessorService.processDelete(messageJSON, transactionId)
81-
break
82-
default:
83-
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
73+
if (messageJSON.payload.originalTopic) {
74+
switch (messageJSON.payload.originalTopic) {
75+
case config.UBAHN_CREATE_TOPIC:
76+
await ProcessorService.processCreate(messageJSON, transactionId)
77+
break
78+
case config.UBAHN_UPDATE_TOPIC:
79+
await ProcessorService.processUpdate(messageJSON, transactionId)
80+
break
81+
case config.UBAHN_DELETE_TOPIC:
82+
await ProcessorService.processDelete(messageJSON, transactionId)
83+
break
84+
default:
85+
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
86+
}
87+
} else {
88+
switch (topic) {
89+
case config.GROUPS_MEMBER_ADD_TOPIC:
90+
await GroupsProcessorService.processMemberAdd(messageJSON, transactionId)
91+
break
92+
case config.GROUPS_MEMBER_DELETE_TOPIC:
93+
await GroupsProcessorService.processMemberDelete(messageJSON, transactionId)
94+
break
95+
default:
96+
throw new Error(`Unknown topic: ${topic}`)
97+
}
8498
}
8599

86100
logger.debug(`Successfully processed message with count ${messageCount}`)
@@ -109,7 +123,7 @@ const check = () => {
109123
}
110124

111125
// const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
112-
const topics = [config.UBAHN_AGGREGATE_TOPIC]
126+
const topics = [config.UBAHN_AGGREGATE_TOPIC, config.GROUPS_MEMBER_ADD_TOPIC, config.GROUPS_MEMBER_DELETE_TOPIC]
113127

114128
consumer
115129
.init([{

0 commit comments

Comments
 (0)