Skip to content

Commit

Permalink
Refactor kinesis to fit the style and organization of the library
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiade committed May 9, 2023
1 parent d5a5614 commit 85a8e00
Show file tree
Hide file tree
Showing 3 changed files with 585 additions and 353 deletions.
60 changes: 24 additions & 36 deletions examples/kinesis.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import exec from 'k6/execution'

import { AWSConfig, KinesisClient } from '../build/kinesis.js'
import encoding from 'k6/encoding';
import { describe, expect } from 'https://jslib.k6.io/k6chaijs/4.3.4.2/index.js';
import { fail } from 'k6';
import encoding from 'k6/encoding'
import { describe, expect } from 'https://jslib.k6.io/k6chaijs/4.3.4.2/index.js'
import { fail } from 'k6'

const dummyStream = `kinesis-test-stream-provisioned`

Expand All @@ -17,35 +17,26 @@ const awsConfig = new AWSConfig({
const kinesis = new KinesisClient(awsConfig)

const getShardIds = () => {
const res = kinesis.listShards({
StreamName: dummyStream,
})
const shardIds = res.Shards.map(shard => shard.ShardId);
const res = kinesis.listShards(dummyStream)
const shardIds = res.Shards.map((shard) => shard.ShardId)

return shardIds
}

const getShardIterator = (shardId) => {
const res = kinesis.getShardIterator({
StreamName: dummyStream,
ShardId: shardId,
ShardIteratorType: `TRIM_HORIZON`
})
const res = kinesis.getShardIterator(dummyStream, shardId, `TRIM_HORIZON`)
return res.ShardIterator
}

export default function () {
describe('01. Create kinesis Stream', () => {
try {
// Valid Values: PROVISIONED | ON_DEMAND
kinesis.createStream({

"ShardCount": 10,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
kinesis.createStream(dummyStream, {
ShardCount: 10,
StreamModeDetails: {
StreamMode: 'PROVISIONED',
},
"StreamName": dummyStream

})
} catch (err) {
fail(err)
Expand All @@ -55,54 +46,52 @@ export default function () {
describe('02. List Kinesis streams', () => {
try {
const res = kinesis.listStreams()
expect(res.StreamNames.length, "number of streams").to.equal(1);
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
})


describe('03. List kinesis stream with arguments', () => {
try {
const res = kinesis.listStreams({ Limit: 1 })
expect(res.StreamNames.length, "number of streams").to.equal(1);
const res = kinesis.listStreams({ limit: 1 })
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
sleep(2)
})


describe('04. publish to kinesis Stream', () => {
try {
for (let i = 0; i < 50; i++) {
const res = kinesis.putRecords({
StreamName: dummyStream,
Records: [
{
Data: encoding.b64encode(JSON.stringify({ 'this': 'is', 'a': 'test' })),
PartitionKey: "partitionKey1"
Data: encoding.b64encode(JSON.stringify({ this: 'is', a: 'test' })),
PartitionKey: 'partitionKey1',
},
{
Data: encoding.b64encode(JSON.stringify([{ 'this': 'is', 'second': 'test' }])),
PartitionKey: "partitionKey2"
}
]
Data: encoding.b64encode(
JSON.stringify([{ this: 'is', second: 'test' }])
),
PartitionKey: 'partitionKey2',
},
],
})
expect(res.FailedRecordCount, `Failed Records to publish`).to.equal(0);
expect(res.Records.length, `Total Records`).to.equal(2);
expect(res.FailedRecordCount, `Failed Records to publish`).to.equal(0)
expect(res.Records.length, `Total Records`).to.equal(2)
}
} catch (err) {
fail(err)
}
})



describe('05. Gets an Amazon Kinesis read all data ', () => {
try {
const shards = getShardIds()
shards.map(shard => {
shards.map((shard) => {
let iterator = getShardIterator(shard)
while (true) {
const res = kinesis.getRecords({ ShardIterator: iterator })
Expand All @@ -118,7 +107,6 @@ export default function () {
}
})


describe('06. Delete kinesis Stream', () => {
try {
kinesis.deleteStream({ StreamName: dummyStream })
Expand Down
Loading

0 comments on commit 85a8e00

Please sign in to comment.