1
1
import base64
2
2
import json
3
- from typing import Iterator
3
+ from typing import Iterator , Optional
4
4
5
5
from aws_lambda_powertools .utilities .data_classes .common import DictWrapper
6
6
7
7
8
8
class KinesisFirehoseRecordMetadata (DictWrapper ):
9
9
@property
10
- def shard_id (self ) -> str :
10
+ def shard_id (self ) -> Optional [ str ] :
11
11
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
12
12
return self .get ("shardId" )
13
13
14
14
@property
15
- def partition_key (self ) -> str :
15
+ def partition_key (self ) -> Optional [ str ]
16
16
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
17
17
return self .get ("partitionKey" )
18
18
19
19
@property
20
- def approximate_arrival_timestamp (self ) -> str :
20
+ def approximate_arrival_timestamp (self ) -> Optional [ str ]
21
21
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
22
22
return self .get ("approximateArrivalTimestamp" )
23
23
24
24
@property
25
- def sequence_number (self ) -> str :
25
+ def sequence_number (self ) - > Optional [ str ]
26
26
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
27
27
return self .get ("sequenceNumber" )
28
28
29
29
@property
30
- def subsequence_number (self ) -> str :
30
+ def subsequence_number (self ) - > Optional [ str ]
31
31
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
32
32
33
33
Note: this will only be present for Kinesis streams using record aggregation
@@ -50,9 +50,9 @@ def record_id(self) -> str:
50
50
def data (self ) -> str :
51
51
"""The data blob, base64-encoded"""
52
52
return self ["data" ]
53
-
53
+
54
54
@property
55
- def metadata (self ) -> KinesisFirehoseRecordMetadata :
55
+ def metadata (self ) -> Optional [ KinesisFirehoseRecordMetadata ] :
56
56
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
57
57
return KinesisFirehoseRecordMetadata (self .get ('kinesisRecordMetadata' , {}))
58
58
@@ -93,7 +93,7 @@ def delivery_stream_arn(self) -> str:
93
93
return self ["deliveryStreamArn" ]
94
94
95
95
@property
96
- def source_kinesis_stream_arn (self ) -> str :
96
+ def source_kinesis_stream_arn (self ) -> Optional [ str ] :
97
97
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
98
98
return self .get ("sourceKinesisStreamArn" )
99
99
0 commit comments