Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3636): batching implementation for position prepare messages #968

Merged
merged 88 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 86 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
1f02497
feat: added skeleton and comments for prepare bin implementation
vijayg10 Sep 6, 2023
078f212
chore: some changes
vijayg10 Sep 6, 2023
44e4735
feat: added new position handler v2
vijayg10 Sep 7, 2023
83db860
feat: added some functionality to new position handler v2
vijayg10 Sep 7, 2023
4221b8c
feat: added some todos
vijayg10 Sep 7, 2023
6caa4e4
feat: changed new handler name
vijayg10 Sep 8, 2023
f8ac314
fix: lint
vijayg10 Sep 8, 2023
c5377d6
feat: add some changes
vijayg10 Sep 8, 2023
0112ccb
fix: lint
vijayg10 Sep 8, 2023
4424513
chore: added copyright header
vijayg10 Sep 8, 2023
451982e
feat: added some logic
vijayg10 Sep 8, 2023
026c4ef
fix: lint
vijayg10 Sep 8, 2023
41de3fc
feat: added unit tests
vijayg10 Sep 11, 2023
ca1fd41
feat: some improvements
vijayg10 Sep 11, 2023
39e74ee
feat: added bin processor logic
vijayg10 Sep 12, 2023
c300269
fix: lint
vijayg10 Sep 12, 2023
47a821c
feat; refactor
vijayg10 Sep 13, 2023
56e1b53
feat: added limit
vijayg10 Sep 13, 2023
7cf5cc4
fix: unit tests
vijayg10 Sep 13, 2023
4a2cd64
Merge branch 'main' of https://github.com/mojaloop/central-ledger int…
vijayg10 Sep 14, 2023
0ebb926
feat: added integration test
vijayg10 Sep 14, 2023
ea75e60
feat: refactor
vijayg10 Sep 14, 2023
8e775e9
fix: unit tests
vijayg10 Sep 14, 2023
98c48bc
fix: integration tests
vijayg10 Sep 14, 2023
6318ce1
feat: add position prepare bin logic (#970)
kleyow Sep 14, 2023
ba418ea
chore: add tests (#972)
kleyow Sep 15, 2023
e29b8c3
fix: metrics
vijayg10 Sep 15, 2023
6c84aa7
Merge branch feature/position-prepare-binnning of https://github.com/…
vijayg10 Sep 15, 2023
2190012
fix: some issue with bin processor
vijayg10 Sep 18, 2023
26f4b37
feat(mojaloop/#3489): add cached calls for participant currency, upda…
kleyow Sep 19, 2023
e958543
chore: improve coverage and edit metric and query name (#974)
kleyow Sep 21, 2023
d26d222
chore(snapshot): 17.3.0-snapshot.0
vijayg10 Sep 22, 2023
39c5281
fix: test downgrade stream lib
vijayg10 Sep 22, 2023
ac12981
chore(snapshot): 17.3.0-snapshot.1
vijayg10 Sep 22, 2023
8df11e9
fix: notification event action
vijayg10 Sep 25, 2023
8dbcdd2
chore(snapshot): 17.3.0-snapshot.2
vijayg10 Sep 25, 2023
71faf0d
fix: notification event
vijayg10 Sep 25, 2023
531bc0a
chore(snapshot): 17.3.0-snapshot.3
vijayg10 Sep 25, 2023
983dc51
fix: notification messages
vijayg10 Sep 25, 2023
72062f6
chore(snapshot): 17.3.0-snapshot.4
vijayg10 Sep 25, 2023
e9227c3
chore: updated conf
vijayg10 Sep 26, 2023
d393f71
chore: update deps
vijayg10 Sep 26, 2023
e24886e
chore(snapshot): 17.3.0-snapshot.5
vijayg10 Sep 26, 2023
b8b4cb2
fix(mojaloop/#3529): fix high latency (#981)
aaronreynoza Oct 2, 2023
c9032a4
chore(release): 17.2.1 [skip ci]
Oct 2, 2023
695f5e4
feat(mojaloop/#3489): add negative integration tests (#975)
sri-miriyala Sep 27, 2023
48d1ddb
Merge branch main of https://github.com/mojaloop/central-ledger into …
vijayg10 Oct 10, 2023
bc09d45
chore: foreach to for loop
vijayg10 Oct 10, 2023
c3a70c4
chore: cleanup
vijayg10 Oct 10, 2023
fdeb31e
Update src/handlers/index.js
vijayg10 Oct 10, 2023
ceda96a
chore: added readme doc
vijayg10 Oct 10, 2023
557df15
fix: lint
vijayg10 Oct 10, 2023
d8a27a2
fix: unit tests
vijayg10 Oct 10, 2023
72c333e
fix: audit
vijayg10 Oct 10, 2023
e9b191f
chore: cleanup
vijayg10 Oct 10, 2023
ab06b45
fix(mojaloop/#3533): helm v15.2.0-rc fixes (#982)
mdebarros Oct 27, 2023
627a795
chore(release): 17.3.0 [skip ci]
Oct 27, 2023
31ad569
chore: remove uuid4 (#976)
marco-ippolito Nov 1, 2023
898cf29
chore(release): 17.3.1 [skip ci]
Nov 1, 2023
92079d1
fix(mojaloop/#3615): upgrade dependencies (#985)
oderayi Nov 6, 2023
bdc531b
chore(release): 17.3.2 [skip ci]
Nov 6, 2023
3b34e67
fix: remove unneeded async/await
oderayi Nov 9, 2023
12acf12
feat: add validation for participantCurrencyIds / accountIds mapping
oderayi Nov 9, 2023
982bf6e
chore: remove resolved TODOs
oderayi Nov 9, 2023
7871fa8
test: Add test coverage
oderayi Nov 9, 2023
ae88d68
chore: remove whitespaces
oderayi Nov 9, 2023
008b751
fix: fixrebase merge conflict
oderayi Nov 9, 2023
bd499f2
chore: update dependencies
oderayi Nov 9, 2023
3689a6c
chore: resolve conflicts
oderayi Nov 9, 2023
518439f
doc: fix typo
oderayi Nov 10, 2023
2b2c429
chore(snapshot): 17.4.0-snapshot.0
oderayi Nov 10, 2023
b339177
chore(snapshot): 17.4.0-snapshot.1
oderayi Nov 13, 2023
27f7491
test: update unit tests
oderayi Nov 13, 2023
c6aae40
chore(snapshot): 17.4.0-snapshot.2
oderayi Nov 13, 2023
378dc42
fix: route error callback for batch correctly
oderayi Nov 14, 2023
eb02c2b
chore(snapshot): 17.4.0-snapshot.3
oderayi Nov 14, 2023
7b21d9b
fix(batch): fix error callback message routing. update tests
oderayi Nov 14, 2023
5c00e8e
chore(snapshot): 17.4.0-snapshot.4
oderayi Nov 14, 2023
cc92e28
fix(batch): fix message routing / properties for more error cases
oderayi Nov 15, 2023
1533b5a
chore(snapshot): 17.4.0-snapshot.5
oderayi Nov 15, 2023
da295b6
test: update test coverage
oderayi Nov 15, 2023
6079f61
chore(snapshot): 17.4.0-snapshot.6
oderayi Nov 15, 2023
3cd3ced
fix: route bulk-prepare messages to non-batch prepare handler
oderayi Nov 16, 2023
06049e8
chore(snapshot): 17.4.0-snapshot.7
oderayi Nov 16, 2023
e325497
doc: update comment
oderayi Nov 16, 2023
d04cfef
chore: update deps. update README for batch.
oderayi Nov 20, 2023
d827219
doc: Address TODO regarding participanLimits query optimization
oderayi Nov 21, 2023
2b7be99
feat: default to transformGeneralTopicName when batch prepare topic i…
oderayi Dec 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,48 @@ NOTE: Only POSITION.PREPARE is supported at this time, with additional event-typ
}
```

### Batch Processing Configuration Guide

Batch processing can be enabled in the transfer execution flow. Follow the steps below to enable batch processing for a more efficient transfer execution:

- **Step 1:** **Create a New Kafka Topic**

Create a new Kafka topic named `topic-transfer-position-batch` to handle batch processing events.
- **Step 2:** **Configure Action Type Mapping**

Point the prepare handler to the newly created topic for the action type `prepare` using the `KAFKA.EVENT_TYPE_ACTION_TOPIC_MAP` configuration as shown below:
```
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch",
"BULK_PREPARE": "topic-transfer-position"
}
}
}
```
_NOTE_: `BULK_PREPARE` configuration property is added to aid routing of `bulk-prepare` events to non-batch handler since the batch handler does not support `bulk-prepare` events.

- **Step 3:** **Run Batch Processing Handlers**

Run the position batch handler along with the existing position handler using the following configuration options:
- **Configure Event Type Action Topic Map for Batch Handler:**

Configure the `EVENT_TYPE_ACTION_TOPIC_MAP` parameter for the position batch handler to consume events from the new topic (topic-transfer-position-batch).

- **Set Batch Size:**

Adjust the batch size using the parameter `KAFKA.CONSUMER.TRANSFER.POSITION_BATCH.config.options.batchSize`. This parameter determines the number of messages fetched in each batch.

- **Set Consume Timeout:**

Configure the consume timeout using the parameter `KAFKA.CONSUMER.TRANSFER.POSITION_BATCH.config.options.consumeTimeout`. This parameter specifies the number of milliseconds to wait for a batch of messages to be fetched if the specified batch size of messages is not immediately available.

Example Command to Run Handlers:
```
node src/handlers/index.js handler --positionbatch
```

## API

For endpoint documentation, see the [API documentation](API.md).
Expand Down Expand Up @@ -187,6 +229,25 @@ If you want to run integration tests in a repetitive manner, you can startup the

If you want to run override position topic tests you can repeat the above and use `npm run test:int-override` after configuring settings found [here](#kafka-position-event-type-action-topic-map)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to include a section in the docs about running Batch processing in production (i.e. using the event-type-action-topic map to override prepares, etc, and handle the migration for each action as batch processing support is supported - with also the reason why this is important). It doesn't have to be here, it could be part of the docs PR (mojaloop/documentation#415) - in which case we should potentially link to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a section in README


#### For running integration tests for batch processing interactively
- Run dependecies
```
docker-compose up -d mysql kafka init-kafka kafka-debug-console
npm run wait-4-docker
```
- Run central-ledger services
```
nvm use
npm run migrate
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" npm start
```
- Additionally, run position batch handler in a new terminal
```
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" "CLEDG_HANDLERS__API__DISABLED=true" node src/handlers/index.js handler --positionbatch
```
- Run tests using `npx tape 'test/integration-override/**/handlerBatch.test.js'`


If you want to just run all of the integration suite non-interactively then use npm run `test:integration`.
It will handle docker start up, migration, service starting and testing. Be sure to exit any previously ran handlers or docker commands.

Expand Down
33 changes: 31 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": null
"PREPARE": null,
"BULK_PREPARE": null
}
},
"TOPIC_TEMPLATES": {
Expand Down Expand Up @@ -289,7 +290,35 @@
"group.id": "cl-group-transfer-position",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true,
"allow.auto.create.topics": true
"allow.auto.create.topics": true,
"partition.assignment.strategy": "cooperative-sticky",
"enable.auto.commit": false
},
"topicConf": {
"auto.offset.reset": "earliest"
}
}
},
"POSITION_BATCH": {
"config": {
"options": {
"mode": 2,
"batchSize": 10,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
"messageAsJSON": true,
"sync": true,
"consumeTimeout": 10
},
"rdkafkaConf": {
"client.id": "cl-con-transfer-position-batch",
"group.id": "cl-group-transfer-position-batch",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true,
"allow.auto.create.topics": true,
"partition.assignment.strategy": "cooperative-sticky",
"enable.auto.commit": false
},
"topicConf": {
"auto.offset.reset": "earliest"
Expand Down
Loading
Loading