-
Notifications
You must be signed in to change notification settings - Fork 0
/
kinesis_util.go
80 lines (71 loc) · 1.89 KB
/
kinesis_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/kinesis"
)
func newSession() *session.Session {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
return sess
}
func newClient(sess *session.Session) *kinesis.Kinesis {
return kinesis.New(sess)
}
func newFirehoseClient(sess *session.Session) *firehose.Firehose {
return firehose.New(sess)
}
// SafeShard structure
type SafeShard struct {
ShardId string
ParentShardId string
StartingHashKey string
EndingHashKey string
}
func ParseShard(s *kinesis.Shard) *SafeShard {
var ss = &SafeShard{}
if s.ShardId != nil {
ss.ShardId = *s.ShardId
}
if s.ParentShardId != nil {
ss.ParentShardId = *s.ParentShardId
}
if s.HashKeyRange != nil {
if s.HashKeyRange.StartingHashKey != nil {
ss.StartingHashKey = *s.HashKeyRange.StartingHashKey
}
if s.HashKeyRange.EndingHashKey != nil {
ss.EndingHashKey = *s.HashKeyRange.EndingHashKey
}
}
return ss
}
func getShards(client *kinesis.Kinesis, streamName string) ([]string, error) {
params := &kinesis.DescribeStreamInput{
StreamName: &streamName,
}
resp, err := client.DescribeStream(params)
if err != nil {
fmt.Println(err.Error())
return nil, err
}
shards := make([]string, 0, len(resp.StreamDescription.Shards))
for _, s := range resp.StreamDescription.Shards {
shards = append(shards, *s.ShardId)
}
return shards, nil
}
func getShardIterator(client *kinesis.Kinesis, streamName string, shardId string) (*string, error) {
shardIter, err := client.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: aws.String(shardId),
ShardIteratorType: aws.String("LATEST"),
StreamName: aws.String(streamName),
})
if err != nil {
return nil, err
}
return shardIter.ShardIterator, nil
}