6
6
7
7
8
8
class KinesisFirehoseRecordMetadata (DictWrapper ):
9
+ @property
10
+ def _metadata (self ) -> dict :
11
+ """Optional: metadata associated with this record; present only when Kinesis Stream is source"""
12
+ return self ["kinesisRecordMetadata" ] # could raise KeyError
13
+
9
14
@property
10
15
def shard_id (self ) -> Optional [str ]:
11
16
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
12
- return self .get ("shardId" )
17
+ return self ._metadata . get ("shardId" )
13
18
14
19
@property
15
- def partition_key (self ) -> Optional [str ]
20
+ def partition_key (self ) -> Optional [str ]:
16
21
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
17
- return self .get ("partitionKey" )
22
+ return self ._metadata . get ("partitionKey" )
18
23
19
24
@property
20
- def approximate_arrival_timestamp (self ) -> Optional [str ]
25
+ def approximate_arrival_timestamp (self ) -> Optional [str ]:
21
26
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
22
- return self .get ("approximateArrivalTimestamp" )
27
+ return self ._metadata . get ("approximateArrivalTimestamp" )
23
28
24
29
@property
25
- def sequence_number (self ) - > Optional [str ]
30
+ def sequence_number (self ) -> Optional [str ]:
26
31
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
27
- return self .get ("sequenceNumber" )
32
+ return self ._metadata . get ("sequenceNumber" )
28
33
29
34
@property
30
- def subsequence_number (self ) - > Optional [str ]
35
+ def subsequence_number (self ) -> Optional [str ]:
31
36
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
32
-
37
+
33
38
Note: this will only be present for Kinesis streams using record aggregation
34
39
"""
35
- return self .get ("subsequenceNumber" )
40
+ return self ._metadata . get ("subsequenceNumber" )
36
41
37
42
38
43
class KinesisFirehoseRecord (DictWrapper ):
39
44
@property
40
45
def approximate_arrival_timestamp (self ) -> float :
41
46
"""The approximate time that the record was inserted into the delivery stream"""
42
47
return float (self ["approximateArrivalTimestamp" ])
43
-
48
+
44
49
@property
45
50
def record_id (self ) -> str :
46
51
"""Record ID; uniquely identifies this record within the current batch"""
@@ -52,9 +57,9 @@ def data(self) -> str:
52
57
return self ["data" ]
53
58
54
59
@property
55
- def metadata (self ) -> Optional [ KinesisFirehoseRecordMetadata ] :
60
+ def metadata (self ) -> KinesisFirehoseRecordMetadata :
56
61
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
57
- return KinesisFirehoseRecordMetadata (self .get ( 'kinesisRecordMetadata' , {}) )
62
+ return KinesisFirehoseRecordMetadata (self ._data )
58
63
59
64
@property
60
65
def data_as_bytes (self ) -> bytes :
0 commit comments