Skip to content

Commit 6185b13

Browse files
authored
Merge pull request #95 from redcordlau/master
feat(kinesis): support Kinesis PutRecords
2 parents a1c6cb7 + 7d80b8c commit 6185b13

File tree

9 files changed

+331
-12
lines changed

9 files changed

+331
-12
lines changed

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ custom:
9595
queryStringParam: myKey # use query string param
9696
streamName: { Ref: 'YourStream' }
9797
cors: true
98+
- kinesis: # PutRecords
99+
path: /kinesis
100+
method: post
101+
action: PutRecords
102+
streamName: { Ref: 'YourStream' }
103+
cors: true
98104

99105
resources:
100106
Resources:

__tests__/integration/kinesis/multiple-integrations/service/serverless.yml

+12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ custom:
4343
queryStringParam: myKey # use query string param
4444
streamName: { Ref: 'YourStream' }
4545
cors: true
46+
- kinesis:
47+
path: /kinesis6
48+
action: PutRecord
49+
method: post
50+
streamName: { Ref: 'YourStream' }
51+
cors: true
52+
- kinesis:
53+
path: /kinesis7
54+
action: PutRecords
55+
method: post
56+
streamName: { Ref: 'YourStream' }
57+
cors: true
4658

4759
resources:
4860
Resources:

__tests__/integration/kinesis/multiple-integrations/tests.js

+40
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,44 @@ describe('Multiple Kinesis Proxy Integrations Test', () => {
9696
expect(body).to.have.own.property('ShardId')
9797
expect(body).to.have.own.property('SequenceNumber')
9898
})
99+
100+
it('should get correct response from kinesis proxy endpoints with action "PutRecord" with default partitionkey', async () => {
101+
const stream = 'kinesis6'
102+
const testEndpoint = `${endpoint}/${stream}`
103+
const response = await fetch(testEndpoint, {
104+
method: 'POST',
105+
headers: { 'Content-Type': 'application/json' },
106+
body: JSON.stringify({ message: `data for stream ${stream}` })
107+
})
108+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
109+
expect(response.status).to.be.equal(200)
110+
const body = await response.json()
111+
expect(body).to.have.own.property('ShardId')
112+
expect(body).to.have.own.property('SequenceNumber')
113+
})
114+
115+
it('should get correct response from kinesis proxy endpoints with action "PutRecords" with default partitionkey', async () => {
116+
const stream = 'kinesis7'
117+
const testEndpoint = `${endpoint}/${stream}`
118+
const response = await fetch(testEndpoint, {
119+
method: 'POST',
120+
headers: { 'Content-Type': 'application/json' },
121+
body: JSON.stringify({
122+
records: [
123+
{ data: 'some data', 'partition-key': 'some key' },
124+
{ data: 'some other data', 'partition-key': 'some key' }
125+
]
126+
})
127+
})
128+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
129+
expect(response.status).to.be.equal(200)
130+
const body = await response.json()
131+
expect(body).to.have.own.property('FailedRecordCount')
132+
expect(body).to.have.own.property('Records')
133+
expect(body.Records.length).to.be.equal(2)
134+
expect(body.Records[0]).to.have.own.property('ShardId')
135+
expect(body.Records[0]).to.have.own.property('SequenceNumber')
136+
expect(body.Records[1]).to.have.own.property('ShardId')
137+
expect(body.Records[1]).to.have.own.property('SequenceNumber')
138+
})
99139
})

lib/apiGateway/schema.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,13 @@ const allowedProxies = ['kinesis', 'sqs', 's3', 'sns', 'dynamodb', 'eventbridge'
240240

241241
const proxiesSchemas = {
242242
kinesis: Joi.object({
243-
kinesis: proxy.append({ streamName: stringOrRef.required(), partitionKey, request, response })
243+
kinesis: proxy.append({
244+
action: Joi.string().valid('PutRecord', 'PutRecords'),
245+
streamName: stringOrRef.required(),
246+
partitionKey,
247+
request,
248+
response
249+
})
244250
}),
245251
s3: Joi.object({
246252
s3: proxy.append({

lib/apiGateway/validate.test.js

+21
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,27 @@ describe('#validateServiceProxies()', () => {
10681068

10691069
expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.not.throw()
10701070
})
1071+
1072+
it('should throw error if action is not "PutRecord" and "PutRecords"', () => {
1073+
serverlessApigatewayServiceProxy.serverless.service.custom = {
1074+
apiGatewayServiceProxies: [
1075+
{
1076+
kinesis: {
1077+
streamName: 'yourStream',
1078+
path: 'kinesis',
1079+
method: 'post',
1080+
action: 'xxxxxx',
1081+
request: { template: [] }
1082+
}
1083+
}
1084+
]
1085+
}
1086+
1087+
expect(() => serverlessApigatewayServiceProxy.validateServiceProxies()).to.throw(
1088+
serverless.classes.Error,
1089+
'child "kinesis" fails because [child "action" fails because ["action" must be one of [PutRecord, PutRecords]]]'
1090+
)
1091+
})
10711092
})
10721093

10731094
describe('s3', () => {

lib/package/kinesis/compileIamRoleToKinesis.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ module.exports = {
5252
},
5353
{
5454
Effect: 'Allow',
55-
Action: ['kinesis:PutRecord'],
55+
Action: ['kinesis:PutRecord', 'kinesis:PutRecords'],
5656
Resource: policyResource
5757
}
5858
]

lib/package/kinesis/compileIamRoleToKinesis.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ describe('#compileIamRoleToKinesis()', () => {
7979
},
8080
{
8181
Effect: 'Allow',
82-
Action: ['kinesis:PutRecord'],
82+
Action: ['kinesis:PutRecord', 'kinesis:PutRecords'],
8383
Resource: [
8484
{
8585
'Fn::Sub': [

lib/package/kinesis/compileMethodsToKinesis.js

+32-9
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ module.exports = {
5353
Type: 'AWS',
5454
Credentials: roleArn,
5555
Uri: {
56-
'Fn::Sub': 'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/PutRecord'
56+
'Fn::Sub':
57+
'arn:${AWS::Partition}:apigateway:${AWS::Region}:kinesis:action/' +
58+
(http.action || 'PutRecord')
5759
},
5860
PassthroughBehavior: 'NEVER',
5961
RequestTemplates: this.getKinesisIntegrationRequestTemplates(http)
@@ -133,15 +135,36 @@ module.exports = {
133135
buildDefaultKinesisRequestTemplate(http) {
134136
const objectRequestParam = this.getKinesisObjectRequestParameter(http)
135137

136-
return {
137-
'Fn::Sub': [
138-
'{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}',
139-
{
140-
StreamName: http.streamName,
141-
Data: '$util.base64Encode($input.body)',
142-
PartitionKey: `${objectRequestParam}`
138+
switch (http.action) {
139+
case 'PutRecords':
140+
return {
141+
'Fn::Sub': [
142+
'{"StreamName":"${StreamName}","Records":${Records}}',
143+
{
144+
StreamName: http.streamName,
145+
Records: `[
146+
#foreach($elem in $input.path('$.records'))
147+
{
148+
"Data": "$util.base64Encode($elem.data)",
149+
"PartitionKey": "$elem.partition-key"
150+
}#if($foreach.hasNext),#end
151+
#end
152+
]`
153+
}
154+
]
155+
}
156+
case 'PutRecord':
157+
default:
158+
return {
159+
'Fn::Sub': [
160+
'{"StreamName":"${StreamName}","Data":"${Data}","PartitionKey":"${PartitionKey}"}',
161+
{
162+
StreamName: http.streamName,
163+
Data: '$util.base64Encode($input.body)',
164+
PartitionKey: `${objectRequestParam}`
165+
}
166+
]
143167
}
144-
]
145168
}
146169
}
147170
}

0 commit comments

Comments
 (0)