Skip to content

Add support for Kinesis #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@ At the moment, this library provides the following:
* `KMS`: allows to list KMS keys and generate a unique symmetric data key for use outside of AWS KMS
* `SSM`: allows to retrieve a parameter from AWS Systems Manager
* `V4 signature`: allows to sign requests to amazon AWS services
* `KinesisClient`: allows all APIs for Kinesis available by AWS.


## Want to contribute?

@@ -251,6 +253,52 @@ export default function () {
}
```


### Kinesis

Consult the `KinesisClient` [dedicated k6 documentation page](https://k6.io/docs/javascript-api/jslib/aws/kinesisclient) for more details on its methods and how to use it.

```javascript
import exec from 'k6/execution'

import { AWSConfig, KinesisClient } from 'https://jslib.k6.io/aws/0.7.0/kinesis.js'
import encoding from 'k6/encoding';
import { describe, expect } from 'https://jslib.k6.io/k6chaijs/4.3.4.2/index.js';
import { fail } from 'k6';

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
sessionToken: __ENV.AWS_SESSION_TOKEN,
})
const kinesis = new KinesisClient(awsConfig)

export default function () {
describe('01. List Kinesis streams', () => {
try {
const res = kinesis.listStreams()
expect(res.StreamNames.length,"number of streams").to.equal(6);
} catch(err) {
fail(err)
}

})

describe('02. List kinesis stream with arguments', () => {
try {
const res = kinesis.listStreams({Limit: 1})
expect(res.StreamNames.length,"number of streams").to.equal(1);
} catch(err) {
fail(err)
}
})
}

```



## Development

### Contributing
2 changes: 1 addition & 1 deletion build/aws.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/aws.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/index.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/index.js.map

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions build/kinesis.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions build/kinesis.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/kms.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/kms.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/s3.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/s3.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/secrets-manager.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/secrets-manager.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/signature.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/signature.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/sqs.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/sqs.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/ssm.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build/ssm.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ services:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- SERVICES=s3,secretsmanager,sqs,kms,ssm
- SERVICES=s3,secretsmanager,sqs,kms,ssm,kinesis
- S3_SKIP_SIGNATURE_VALIDATION=0 # enable signature validation
- DEBUG=${DEBUG-}
- PERSISTENCE=${PERSISTENCE-}
117 changes: 117 additions & 0 deletions examples/kinesis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import exec from 'k6/execution'

import { AWSConfig, KinesisClient } from '../build/kinesis.js'
import encoding from 'k6/encoding'
import { describe, expect } from 'https://jslib.k6.io/k6chaijs/4.3.4.2/index.js'
import { fail } from 'k6'

const dummyStream = `kinesis-test-stream-provisioned`

const awsConfig = new AWSConfig({
region: __ENV.AWS_REGION,
accessKeyId: __ENV.AWS_ACCESS_KEY_ID,
secretAccessKey: __ENV.AWS_SECRET_ACCESS_KEY,
sessionToken: __ENV.AWS_SESSION_TOKEN,
})

const kinesis = new KinesisClient(awsConfig)

const getShardIds = () => {
const res = kinesis.listShards(dummyStream)
const shardIds = res.Shards.map((shard) => shard.ShardId)

return shardIds
}

const getShardIterator = (shardId) => {
const res = kinesis.getShardIterator(dummyStream, shardId, `TRIM_HORIZON`)
return res.ShardIterator
}

export default function () {
describe('01. Create kinesis Stream', () => {
try {
// Valid Values: PROVISIONED | ON_DEMAND
kinesis.createStream(dummyStream, {
ShardCount: 10,
StreamModeDetails: {
StreamMode: 'PROVISIONED',
},
})
} catch (err) {
fail(err)
}
})

describe('02. List Kinesis streams', () => {
try {
const res = kinesis.listStreams()
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
})

describe('03. List kinesis stream with arguments', () => {
try {
const res = kinesis.listStreams({ limit: 1 })
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
sleep(2)
})

describe('04. publish to kinesis Stream', () => {
try {
for (let i = 0; i < 50; i++) {
const res = kinesis.putRecords({
StreamName: dummyStream,
Records: [
{
Data: encoding.b64encode(JSON.stringify({ this: 'is', a: 'test' })),
PartitionKey: 'partitionKey1',
},
{
Data: encoding.b64encode(
JSON.stringify([{ this: 'is', second: 'test' }])
),
PartitionKey: 'partitionKey2',
},
],
})
expect(res.FailedRecordCount, `Failed Records to publish`).to.equal(0)
expect(res.Records.length, `Total Records`).to.equal(2)
}
} catch (err) {
fail(err)
}
})

describe('05. Gets an Amazon Kinesis read all data ', () => {
try {
const shards = getShardIds()
shards.map((shard) => {
let iterator = getShardIterator(shard)
while (true) {
const res = kinesis.getRecords({ ShardIterator: iterator })
iterator = res.NextShardIterator

if (!res.MillisBehindLatest || res.MillisBehindLatest == `0`) {
break
}
}
})
} catch (err) {
fail(err)
}
})

describe('06. Delete kinesis Stream', () => {
try {
kinesis.deleteStream({ StreamName: dummyStream })
} catch (err) {
fail(err)
}
})
}
39 changes: 20 additions & 19 deletions examples/s3.js
Original file line number Diff line number Diff line change
@@ -21,23 +21,24 @@ export default function () {
// gives us access to.
const buckets = s3.listBuckets()

// If our test bucket does not exist, abort the execution.
if (buckets.filter((b) => b.name === testBucketName).length == 0) {
exec.test.abort()
}

// Let's upload our test file to the bucket
s3.putObject(testBucketName, testFileKey, testFile)

// Let's list the test bucket objects
const objects = s3.listObjects(testBucketName)

// And verify it does contain our test object
if (objects.filter((o) => o.key === testFileKey).length == 0) {
exec.test.abort()
}

// Let's redownload it verify it's correct, and delete it
const obj = s3.getObject(testBucketName, testFileKey)
s3.deleteObject(testBucketName, testFileKey)
console.log(buckets)
// // If our test bucket does not exist, abort the execution.
// if (buckets.filter((b) => b.name === testBucketName).length == 0) {
// exec.test.abort()
// }

// // Let's upload our test file to the bucket
// s3.putObject(testBucketName, testFileKey, testFile)

// // Let's list the test bucket objects
// const objects = s3.listObjects(testBucketName)

// // And verify it does contain our test object
// if (objects.filter((o) => o.key === testFileKey).length == 0) {
// exec.test.abort()
// }

// // Let's redownload it verify it's correct, and delete it
// const obj = s3.getObject(testBucketName, testFileKey)
// s3.deleteObject(testBucketName, testFileKey)
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -16,3 +16,4 @@ export {
SystemsManagerServiceError,
} from './internal/ssm'
export { SQSClient } from './sqs'
export { KinesisClient } from './internal/kinesis'
758 changes: 758 additions & 0 deletions src/internal/kinesis.ts

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Import only symbols we wish to re-export publicly
import { AWSConfig, InvalidAWSConfigError } from './internal/config'
import { InvalidSignatureError } from './internal/signature'
import { KinesisClient } from './internal/kinesis'

// Re-Export public symbols
export {
InvalidSignatureError,
// Aws Config
AWSConfig,
InvalidAWSConfigError,
// Kinesis
KinesisClient
}
2 changes: 2 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import { s3TestSuite } from './internal/s3.js'
import { secretsManagerTestSuite } from './internal/secrets-manager.js'
import { kmsTestSuite } from './internal/kms.js'
import { ssmTestSuite } from './internal/ssm.js'
import { kinesisTestSuite } from './internal/kinesis.js'
import { signatureV4TestSuite } from './internal/signature.js'
import { sqsTestSuite } from './internal/sqs.js'

@@ -98,4 +99,5 @@ export default function testSuite(data) {
sqsTestSuite(data)
ssmTestSuite(data)
signatureV4TestSuite(data)
kinesisTestSuite(data)
}
99 changes: 99 additions & 0 deletions tests/internal/kinesis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import exec from 'k6/execution'

import { AWSConfig, KinesisClient } from '../../build/kinesis.js'
import encoding from 'k6/encoding'
import { describe, expect } from 'https://jslib.k6.io/k6chaijs/4.3.4.2/index.js'
import { fail, sleep } from 'k6'

const dummyStream = `kinesis-test-stream-dummy`

export function kinesisTestSuite(data) {
const kinesis = new KinesisClient(data.awsConfig)

describe('[kinesis] create a stream', () => {
try {
// Valid Values: PROVISIONED | ON_DEMAND
kinesis.createStream(dummyStream, {
ShardCount: 10,
StreamModeDetails: {
StreamMode: 'PROVISIONED',
},
})
} catch (err) {
fail(err)
}
})

describe('[kinesis] list streams', () => {
try {
const res = kinesis.listStreams()
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
})

describe('[kinesis] kist streams with arguments', () => {
try {
const res = kinesis.listStreams({ limit: 1 })
expect(res.StreamNames.length, 'number of streams').to.equal(1)
} catch (err) {
fail(err)
}
sleep(2)
})

describe('[kinesis] publish to a stream', () => {
try {
for (let i = 0; i < 50; i++) {
const records = [
{
Data: encoding.b64encode(JSON.stringify({ this: 'is', a: 'test' })),
PartitionKey: 'partitionKey1',
},
{
Data: encoding.b64encode(JSON.stringify([{ this: 'is', second: 'test' }])),
PartitionKey: 'partitionKey2',
},
]

const res = kinesis.putRecords(records, { streamName: dummyStream })
expect(res.FailedRecordCount, `Failed Records to publish`).to.equal(0)
expect(res.Records.length, `Total Records`).to.equal(2)
}
} catch (err) {
fail(err)
}
})

describe('[kinesis] Gets read all data from shards', () => {
try {
kinesis.listShards(dummyStream).Shards.map((shard) => {
let iterator = kinesis.getShardIterator(
dummyStream,
shard.Id,
`TRIM_HORIZON`
).ShardIterator

while (true) {
const res = kinesis.getRecords(iterator)
iterator = res.NextShardIterator

if (!res.MillisBehindLatest || res.MillisBehindLatest == `0`) {
break
}
}
})
} catch (err) {
fail(err)
}
})

describe('[kinesis] delete a stream', () => {
try {
kinesis.deleteStream(dummyStream)
} catch (err) {
fail(err)
}
})
}
1 change: 1 addition & 0 deletions webpack.config.js
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ module.exports = {
sqs: path.resolve(__dirname, 'src/sqs.ts'),
ssm: path.resolve(__dirname, 'src/ssm.ts'),
kms: path.resolve(__dirname, 'src/kms.ts'),
kinesis: path.resolve(__dirname, 'src/kinesis.ts'),

// AWS signature v4
signature: path.resolve(__dirname, 'src/signature.ts'),