-
Notifications
You must be signed in to change notification settings - Fork 178
/
ingestion-csv-sample.go
145 lines (129 loc) · 5.49 KB
/
ingestion-csv-sample.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"errors"
"flag"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"go_sample/utils"
"net/http"
"strconv"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/timestreamquery"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
)
/**
This code sample is to run the CRUD APIs and WriteRecords API in a logical order.
*/
func main() {
databaseName := flag.String("database_name", utils.DATABASE_NAME, "database name string")
tableName := flag.String("table_name", utils.TABLE_NAME, "table name string")
testFileName := flag.String("test_file", utils.SAMPLE_DATA_CSV_FILE_PATH, "CSV file containing the data to ingest")
region := flag.String("region", utils.REGION, "region")
flag.Parse()
// Use the SDK's default configuration.
cfg, err := config.LoadDefaultConfig(context.TODO())
tr := utils.LoadHttpSettings()
writeSvc := timestreamwrite.NewFromConfig(cfg, func(o *timestreamwrite.Options) {
o.Region = *region
o.HTTPClient = &http.Client{Transport: tr}
})
querySvc := timestreamquery.NewFromConfig(cfg, func(o *timestreamquery.Options) {
o.Region = *region
o.HTTPClient = &http.Client{Transport: tr}
})
timestreamBuilder := utils.TimestreamBuilder{WriteSvc: writeSvc, QuerySvc: querySvc}
s3Svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = *region
})
timestreamDependencyHelper := utils.TimestreamDependencyHelper{S3Svc: s3Svc}
createdResourcesList := []utils.Resource{}
// Make the bucket name unique by appending 5 random characters at the end
s3BucketName := utils.ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX + utils.GenerateRandomStringWithSize(5)
err = timestreamDependencyHelper.CreateS3Bucket(s3BucketName, *region)
utils.HandleError(err, fmt.Sprintf("Failed to create S3Bucket %s ", s3BucketName), true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "S3", Identifier: s3BucketName})
var RESOURCE_NOT_FOUND *types.ResourceNotFoundException
// Describe database.
err = timestreamBuilder.DescribeDatabase(*databaseName)
if err != nil {
if errors.As(err, &RESOURCE_NOT_FOUND) {
err = timestreamBuilder.CreateDatabase(*databaseName)
if err != nil {
utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, "", "", s3BucketName)
utils.HandleError(err, fmt.Sprintf("Failed to create Database %s\n", *databaseName), true)
}
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: *databaseName})
}
}
// Describe table.
_, err = timestreamBuilder.DescribeTable(*databaseName, *tableName)
if err != nil {
if errors.As(err, &RESOURCE_NOT_FOUND) {
err = timestreamBuilder.CreateTable(*databaseName, *tableName, s3BucketName)
if err != nil {
utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, *databaseName, "", s3BucketName)
utils.HandleError(err, fmt.Sprintf("Failed to create table %s in database %s ",
*tableName, *databaseName), true)
}
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: *tableName})
}
} else {
fmt.Println("Table already exists")
fmt.Printf("Deleting created s3Bucket=%s as it was not used for creating the table.", s3BucketName)
timestreamDependencyHelper.DeleteS3Bucket(s3BucketName)
if len(createdResourcesList) > 0 && createdResourcesList[0].Identifier == s3BucketName {
createdResourcesList = createdResourcesList[1:]
}
}
total_records := int64(0)
// sample query
queryString := fmt.Sprintf("select count(*) as total_records from %s.%s", *databaseName, *tableName)
// execute the query
queryOutput, err := timestreamBuilder.QueryWithQueryString(queryString)
if err != nil {
fmt.Printf("Error while querying: %s", err.Error())
utils.HandleError(err, fmt.Sprintf("Failed to query from table %s in database %s ",
*tableName, *databaseName), true)
} else {
utils.ParseQueryResult(queryOutput, nil)
if len(queryOutput.Rows) > 0 {
total_records, _ = strconv.ParseInt(*queryOutput.Rows[0].Data[0].ScalarValue, 10, 64)
}
}
if total_records >= 63000 {
fmt.Println("Records are already ingested into database, Skipping Ingestion from csv provided")
} else {
//Ingest records from csv file
err = timestreamBuilder.IngestRecordsFromCsv(*testFileName, *databaseName, *tableName)
if err != nil {
utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, *databaseName, *tableName, s3BucketName)
utils.HandleError(err, fmt.Sprintf("Failed to ingest data from csv path `%s` table %s in database %s ", *testFileName),
true)
} else {
fmt.Println("Ingesting Records Complete")
}
// sample query
queryString := fmt.Sprintf("select count(*) as total_records from %s.%s", *databaseName, *tableName)
// execute the query
queryOutput, err := timestreamBuilder.QueryWithQueryString(queryString)
if err != nil {
fmt.Printf("Error while querying: %s", err.Error())
} else {
utils.ParseQueryResult(queryOutput, nil)
if len(queryOutput.Rows) > 0 {
total_records, _ := strconv.ParseInt(*queryOutput.Rows[0].Data[0].ScalarValue, 10, 64)
fmt.Println("Total ingested records count : ", total_records)
}
}
}
if len(createdResourcesList) > 0 {
fmt.Println("Following Resources are created and not cleaned")
for _, resource := range createdResourcesList {
fmt.Printf("\tResource Type : %s, Identifier/Name : %s\n", resource.Type, resource.Identifier)
}
} else {
fmt.Println("Used existing resources to ingest data")
}
}