From 22587eb889a894ebc436cae9a7e9f1264c93e576 Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Mon, 2 Dec 2024 12:45:44 +0530 Subject: [PATCH 1/7] updated initialization process --- config-files/init.go | 358 +++++++++++++++++++++++++++++++++++++ config.yaml | 4 + config/config.go | 81 +++------ go.mod | 28 ++- go.sum | 30 ++++ initializer/initializer.go | 2 +- models/model.go | 34 ++-- service/spanner/spanner.go | 29 ++- storage/spanner.go | 9 +- storage/storage.go | 18 +- 10 files changed, 508 insertions(+), 85 deletions(-) create mode 100644 config-files/init.go create mode 100644 config.yaml diff --git a/config-files/init.go b/config-files/init.go new file mode 100644 index 0000000..6c8bc14 --- /dev/null +++ b/config-files/init.go @@ -0,0 +1,358 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "regexp" + "strings" + + "cloud.google.com/go/spanner" + Admindatabase "cloud.google.com/go/spanner/admin/database/apiv1" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/cloudspannerecosystem/dynamodb-adapter/models" + "google.golang.org/genproto/googleapis/spanner/admin/database/v1" + "gopkg.in/yaml.v3" +) + +var readFile = os.ReadFile + +var ( + adapterTableDDL = ` + CREATE TABLE dynamodb_adapter_table_ddl ( + column STRING(MAX) NOT NULL, + tableName STRING(MAX) NOT NULL, + dataType STRING(MAX) NOT NULL, + originalColumn STRING(MAX) NOT NULL, + partitionKey STRING(MAX), + sortKey STRING(MAX), + spannerIndexName STRING(MAX), + actualTable STRING(MAX) + ) PRIMARY KEY (tableName, column)` +) + +func main() { + dryRun := flag.Bool("dry_run", false, "Run the program in dry-run mode to output DDL and queries without making changes") + flag.Parse() + + config, err := loadConfig("../config.yaml") + if err != nil { + log.Fatalf("Error loading configuration: %v", err) + } + // Construct database name + databaseName := fmt.Sprintf( + "projects/%s/instances/%s/databases/%s", + config.Spanner.ProjectID, config.Spanner.InstanceID, config.Spanner.DatabaseName, + ) + if *dryRun { + fmt.Println("-- Dry Run Mode: Generating Spanner DDL and Insert Queries Only --") + runDryRun(databaseName) + } else { + fmt.Println("-- Executing Setup on Spanner --") + executeSetup(databaseName) + } +} + +func loadConfig(filename string) (*models.Config, error) { + // Read the file + data, err := readFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + // Unmarshal YAML data into config struct + var config models.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal config: %w", err) + } + + return &config, nil +} + +func runDryRun(databaseName string) { + fmt.Println("-- Spanner DDL to create the adapter table --") + fmt.Println(adapterTableDDL) + + client := createDynamoClient() + tables, err := listDynamoTables(client) + if err != nil { + log.Fatalf("Failed to list DynamoDB tables: %v", err) + } + + for _, tableName := range tables { + fmt.Printf("Processing table: %s\n", tableName) + generateInsertQueries(tableName, client) + } +} + +func executeSetup(databaseName string) { + ctx := context.Background() + + // Create Spanner database and adapter table + if err := createDatabase(ctx, databaseName); err != nil { + log.Fatalf("Failed to create database: %v", err) + } + if err := createTable(ctx, databaseName, adapterTableDDL); err != nil { + log.Fatalf("Failed to create adapter table: %v", err) + } + + // Fetch and migrate data + client := createDynamoClient() + tables, err := listDynamoTables(client) + if err != nil { + log.Fatalf("Failed to list DynamoDB tables: %v", err) + } + + for _, tableName := range tables { + if err := migrateDynamoTableToSpanner(ctx, databaseName, tableName, client); err != nil { + log.Printf("Failed to migrate table %s: %v", tableName, err) + } + } + fmt.Println("Migration complete.") +} + +func createDatabase(ctx context.Context, db string) error { + matches := regexp.MustCompile("^(.*)/databases/(.*)$").FindStringSubmatch(db) + if matches == nil || len(matches) != 3 { + return fmt.Errorf("invalid database ID: %s", db) + } + + adminClient, err := Admindatabase.NewDatabaseAdminClient(ctx) + if err != nil { + return fmt.Errorf("failed to create Spanner Admin client: %v", err) + } + defer adminClient.Close() + + op, err := adminClient.CreateDatabase(ctx, &database.CreateDatabaseRequest{ + Parent: matches[1], + CreateStatement: "CREATE DATABASE `" + matches[2] + "`", + }) + + if err != nil { + if strings.Contains(err.Error(), "AlreadyExists") { + log.Printf("Database `%s` already exists. Skipping creation.", matches[2]) + return nil + } + return fmt.Errorf("failed to initiate database creation: %v", err) + } + + if op == nil { + return fmt.Errorf("received nil operation for database creation") + } + + _, err = op.Wait(ctx) + if err != nil { + return fmt.Errorf("error while waiting for database creation to complete: %v", err) + } + + log.Printf("Database `%s` created successfully.", matches[2]) + return nil +} + +func createTable(ctx context.Context, db, ddl string) error { + adminClient, err := Admindatabase.NewDatabaseAdminClient(ctx) + if err != nil { + return fmt.Errorf("failed to create Spanner Admin client: %v", err) + } + defer adminClient.Close() + + client, err := spanner.NewClient(ctx, db) + if err != nil { + return fmt.Errorf("failed to create Spanner client: %v", err) + } + defer client.Close() + + stmt := spanner.Statement{ + SQL: `SELECT COUNT(*) + FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_NAME = @tableName`, + Params: map[string]interface{}{ + "tableName": "dynamodb_adapter_table_ddl", + }, + } + iter := client.Single().Query(ctx, stmt) + defer iter.Stop() + + var tableCount int64 + err = iter.Do(func(row *spanner.Row) error { + return row.Columns(&tableCount) + }) + if err != nil { + return fmt.Errorf("failed to query table existence: %w", err) + } + + if tableCount > 0 { + fmt.Println("Table `dynamodb_adapter_table_ddl` already exists. Skipping creation.") + return nil + } + + op, err := adminClient.UpdateDatabaseDdl(ctx, &database.UpdateDatabaseDdlRequest{ + Database: db, + Statements: []string{ddl}, + }) + if err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + + return op.Wait(ctx) +} + +func listDynamoTables(client *dynamodb.Client) ([]string, error) { + output, err := client.ListTables(context.TODO(), &dynamodb.ListTablesInput{}) + if err != nil { + return nil, err + } + return output.TableNames, nil +} + +func migrateDynamoTableToSpanner(ctx context.Context, db, tableName string, client *dynamodb.Client) error { + // Fetch attributes, partition key, and sort key from DynamoDB table + config, err := loadConfig("../config.yaml") + if err != nil { + log.Fatalf("Error loading configuration: %v", err) + } + models.SpannerTableMap[tableName] = config.Spanner.InstanceID + attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) + if err != nil { + return fmt.Errorf("failed to fetch attributes for table %s: %v", tableName, err) + } + + // Generate Spanner index name and actual table name + // spannerIndexName := fmt.Sprintf("index_%s", tableName) + actualTable := tableName + + // Prepare mutations to insert data into the adapter table + var mutations []*spanner.Mutation + for column, dataType := range attributes { + mutations = append(mutations, spanner.InsertOrUpdate( + "dynamodb_adapter_table_ddl", + []string{ + "column", "tableName", "dataType", "originalColumn", + "partitionKey", "sortKey", "spannerIndexName", "actualTable", + }, + []interface{}{ + column, tableName, dataType, column, + partitionKey, sortKey, column, actualTable, + }, + )) + } + + // Perform batch insert into Spanner + if err := spannerBatchInsert(ctx, db, mutations); err != nil { + return fmt.Errorf("failed to insert metadata for table %s into Spanner: %v", tableName, err) + } + + log.Printf("Successfully migrated metadata for DynamoDB table %s to Spanner.", tableName) + return nil +} + +func fetchTableAttributes(client *dynamodb.Client, tableName string) (map[string]string, string, string, error) { + // Fetch table description + output, err := client.DescribeTable(context.TODO(), &dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + }) + if err != nil { + return nil, "", "", fmt.Errorf("failed to describe table %s: %w", tableName, err) + } + + // Extract partition key and sort key + var partitionKey, sortKey string + for _, keyElement := range output.Table.KeySchema { + switch keyElement.KeyType { + case dynamodbtypes.KeyTypeHash: + partitionKey = aws.ToString(keyElement.AttributeName) + case dynamodbtypes.KeyTypeRange: + sortKey = aws.ToString(keyElement.AttributeName) + } + } + + // Extract attributes from the table + attributes := make(map[string]string) + scanOutput, err := client.Scan(context.TODO(), &dynamodb.ScanInput{ + TableName: aws.String(tableName), + }) + if err != nil { + return nil, "", "", fmt.Errorf("failed to scan table %s: %w", tableName, err) + } + + for _, item := range scanOutput.Items { + for attr, value := range item { + attributes[attr] = inferDynamoDBType(value) + } + } + + return attributes, partitionKey, sortKey, nil +} + +func inferDynamoDBType(attr dynamodbtypes.AttributeValue) string { + switch attr.(type) { + case *dynamodbtypes.AttributeValueMemberS: + return "S" + case *dynamodbtypes.AttributeValueMemberN: + return "N" + case *dynamodbtypes.AttributeValueMemberB: + return "B" + case *dynamodbtypes.AttributeValueMemberBOOL: + return "BOOL" + case *dynamodbtypes.AttributeValueMemberSS: + return "SS" + case *dynamodbtypes.AttributeValueMemberNS: + return "NS" + case *dynamodbtypes.AttributeValueMemberBS: + return "BS" + case *dynamodbtypes.AttributeValueMemberNULL: + return "NULL" + case *dynamodbtypes.AttributeValueMemberM: + return "M" + case *dynamodbtypes.AttributeValueMemberL: + return "L" + default: + log.Printf("Unknown DynamoDB attribute type: %T\n", attr) + return "Unknown" + } +} + +func spannerBatchInsert(ctx context.Context, db string, mutations []*spanner.Mutation) error { + client, err := spanner.NewClient(ctx, db) + if err != nil { + return err + } + defer client.Close() + + _, err = client.Apply(ctx, mutations) + return err +} + +func createDynamoClient() *dynamodb.Client { + cfg, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + log.Fatalf("Failed to load AWS configuration: %v", err) + } + return dynamodb.NewFromConfig(cfg) +} + +func generateInsertQueries(tableName string, client *dynamodb.Client) { + attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) + if err != nil { + log.Printf("Failed to fetch attributes for table %s: %v", tableName, err) + return + } + + spannerIndexName := fmt.Sprintf("index_%s", tableName) + actualTable := tableName + + for column, dataType := range attributes { + query := fmt.Sprintf( + `INSERT INTO dynamodb_adapter_table_ddl + (column, tableName, dataType, originalColumn, partitionKey, sortKey, spannerIndexName, actualTable) + VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');`, + column, tableName, dataType, column, partitionKey, sortKey, spannerIndexName, actualTable, + ) + fmt.Println(query) + } +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..b7a89eb --- /dev/null +++ b/config.yaml @@ -0,0 +1,4 @@ +spanner: + project_id: "your-project-id" + instance_id: "your-instance-id" + database_name: "your-database-name" diff --git a/config/config.go b/config/config.go index 8b8fbd0..e4f1e50 100644 --- a/config/config.go +++ b/config/config.go @@ -17,16 +17,12 @@ package config import ( - "encoding/json" + "fmt" "os" - "strings" - "sync" - rice "github.com/GeertJohan/go.rice" "github.com/cloudspannerecosystem/dynamodb-adapter/models" "github.com/cloudspannerecosystem/dynamodb-adapter/pkg/errors" - "github.com/cloudspannerecosystem/dynamodb-adapter/pkg/logger" - "github.com/cloudspannerecosystem/dynamodb-adapter/utils" + "gopkg.in/yaml.v2" ) // Configuration struct @@ -39,63 +35,38 @@ type Configuration struct { // ConfigurationMap pointer var ConfigurationMap *Configuration -// DbConfigMap dynamo to Spanner -var DbConfigMap map[string]models.TableConfig - -var once sync.Once - func init() { ConfigurationMap = new(Configuration) } -// InitConfig loads ConfigurationMap and DbConfigMap in memory based on -// ACTIVE_ENV. If ACTIVE_ENV is not set or and empty string the environment -// is defaulted to staging. -// -// These config files are read from rice-box -func InitConfig(box *rice.Box) { - once.Do(func() { - env := strings.ToLower(os.Getenv("ACTIVE_ENV")) - if env == "" { - env = "staging" - } +var readFile = os.ReadFile + +func InitConfig() { + GlobalConfig, err := loadConfig("config.yaml") + if err != nil { + fmt.Errorf("failed to read config file: %w", err) + } + models.GlobalConfig = GlobalConfig +} + +func loadConfig(filename string) (*models.Config, error) { + data, err := readFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } - ConfigurationMap = new(Configuration) + // Unmarshal YAML data into config struct + var config models.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal config: %w", err) + } - ba, err := box.Bytes(env + "/tables.json") - if err != nil { - logger.LogFatal(err) - } - err = json.Unmarshal(ba, &DbConfigMap) - if err != nil { - logger.LogFatal(err) - } - ba, err = box.Bytes(env + "/config.json") - if err != nil { - logger.LogFatal(err) - } - err = json.Unmarshal(ba, ConfigurationMap) - if err != nil { - logger.LogFatal(err) - } - ba, err = box.Bytes(env + "/spanner.json") - if err != nil { - logger.LogFatal(err) - } - tmp := make(map[string]string) - err = json.Unmarshal(ba, &tmp) - if err != nil { - logger.LogFatal(err) - } - for k, v := range tmp { - models.SpannerTableMap[utils.ChangeTableNameForSpanner(k)] = v - } - }) + return &config, nil } -//GetTableConf returns table configuration from global map object +// GetTableConf returns table configuration from global map object func GetTableConf(tableName string) (models.TableConfig, error) { - tableConf, ok := DbConfigMap[tableName] + tableConf, ok := models.DbConfigMap[tableName] if !ok { return models.TableConfig{}, errors.New("ResourceNotFoundException", tableName) } @@ -104,7 +75,7 @@ func GetTableConf(tableName string) (models.TableConfig, error) { return tableConf, nil } else if tableConf.ActualTable != "" { actualTable := tableConf.ActualTable - tableConf = DbConfigMap[actualTable] + tableConf = models.DbConfigMap[actualTable] tableConf.ActualTable = actualTable return tableConf, nil } diff --git a/go.mod b/go.mod index f96a094..efe56d5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/cloudspannerecosystem/dynamodb-adapter -go 1.17 +go 1.21 + +toolchain go1.22.3 require ( cloud.google.com/go/pubsub v1.17.0 @@ -38,6 +40,7 @@ require ( github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/ajg/form v1.5.1 // indirect github.com/andybalholm/brotli v1.0.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/daaku/go.zipexe v1.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -88,3 +91,26 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect moul.io/http2curl v1.0.1-0.20190925090545-5cd742060b0e // indirect ) + +require ( + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1 + github.com/aws/smithy-go v1.22.1 // indirect +) + +require ( + github.com/aws/aws-sdk-go-v2/config v1.28.5 + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.5 // indirect +) + +require ( + github.com/aws/aws-sdk-go-v2/credentials v1.17.46 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.6 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.1 // indirect +) diff --git a/go.sum b/go.sum index d47c92f..057beda 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,36 @@ github.com/antonmedv/expr v1.9.0 h1:j4HI3NHEdgDnN9p6oI6Ndr0G5QryMY0FNxT4ONrFDGU= github.com/antonmedv/expr v1.9.0/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8= github.com/aws/aws-sdk-go v1.40.43 h1:froMtO2//9kCu1sK+dOfAcwxUu91p5KgUP4AL7SDwUQ= github.com/aws/aws-sdk-go v1.40.43/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/aws/aws-sdk-go-v2 v1.32.5 h1:U8vdWJuY7ruAkzaOdD7guwJjD06YSKmnKCJs7s3IkIo= +github.com/aws/aws-sdk-go-v2 v1.32.5/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/config v1.28.5 h1:Za41twdCXbuyyWv9LndXxZZv3QhTG1DinqlFsSuvtI0= +github.com/aws/aws-sdk-go-v2/config v1.28.5/go.mod h1:4VsPbHP8JdcdUDmbTVgNL/8w9SqOkM5jyY8ljIxLO3o= +github.com/aws/aws-sdk-go-v2/credentials v1.17.46 h1:AU7RcriIo2lXjUfHFnFKYsLCwgbz1E7Mm95ieIRDNUg= +github.com/aws/aws-sdk-go-v2/credentials v1.17.46/go.mod h1:1FmYyLGL08KQXQ6mcTlifyFXfJVCNJTVGuQP4m0d/UA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 h1:sDSXIrlsFSFJtWKLQS4PUWRvrT580rrnuLydJrCQ/yA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20/go.mod h1:WZ/c+w0ofps+/OUqMwWgnfrgzZH1DZO1RIkktICsqnY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 h1:4usbeaes3yJnCFC7kfeyhkdkPtoRYPa/hTmCqMpKpLI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24/go.mod h1:5CI1JemjVwde8m2WG3cz23qHKPOxbpkq0HaoreEgLIY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 h1:N1zsICrQglfzaBnrfM0Ys00860C+QFwu6u/5+LomP+o= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24/go.mod h1:dCn9HbJ8+K31i8IQ8EWmWj0EiIk0+vKiHNMxTTYveAg= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1 h1:vucMirlM6D+RDU8ncKaSZ/5dGrXNajozVwpmWNPn2gQ= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1/go.mod h1:fceORfs010mNxZbQhfqUjUeHlTwANmIT4mvHamuUaUg= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.5 h1:3Y457U2eGukmjYjeHG6kanZpDzJADa2m0ADqnuePYVQ= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.5/go.mod h1:CfwEHGkTjYZpkQ/5PvcbEtT7AJlG68KkEvmtwU8z3/U= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.5 h1:wtpJ4zcwrSbwhECWQoI/g6WM9zqCcSpHDJIWSbMLOu4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.5/go.mod h1:qu/W9HXQbbQ4+1+JcZp0ZNPV31ym537ZJN+fiS7Ti8E= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.6 h1:3zu537oLmsPfDMyjnUS2g+F2vITgy5pB74tHI+JBNoM= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.6/go.mod h1:WJSZH2ZvepM6t6jwu4w/Z45Eoi75lPN7DcydSRtJg6Y= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.5 h1:K0OQAsDywb0ltlFrZm0JHPY3yZp/S9OaoLU33S7vPS8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.5/go.mod h1:ORITg+fyuMoeiQFiVGoqB3OydVTLkClw/ljbblMq6Cc= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.1 h1:6SZUVRQNvExYlMLbHdlKB48x0fLbc2iVROyaNEwBHbU= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.1/go.mod h1:GqWyYCwLXnlUB1lOAXQyNSPqPLQJvmo8J0DWBzp9mtg= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/initializer/initializer.go b/initializer/initializer.go index 7f99035..e957cbe 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -27,7 +27,7 @@ import ( // InitAll - this will initialize all the project object // Config, storage and all other global objects are initialize func InitAll(box *rice.Box) error { - config.InitConfig(box) + config.InitConfig() storage.InitializeDriver() err := spanner.ParseDDL(true) if err != nil { diff --git a/models/model.go b/models/model.go index ae00328..50160e1 100644 --- a/models/model.go +++ b/models/model.go @@ -22,6 +22,18 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" ) +type SpannerConfig struct { + ProjectID string `yaml:"project_id"` + InstanceID string `yaml:"instance_id"` + DatabaseName string `yaml:"database_name"` +} + +type Config struct { + Spanner SpannerConfig `yaml:"spanner"` +} + +var GlobalConfig *Config + // Meta struct type Meta struct { TableName string `json:"TableName"` @@ -72,7 +84,7 @@ type GetItemMeta struct { Key map[string]*dynamodb.AttributeValue `json:"Key"` } -//BatchGetMeta struct +// BatchGetMeta struct type BatchGetMeta struct { RequestItems map[string]BatchGetWithProjectionMeta `json:"RequestItems"` } @@ -135,7 +147,7 @@ type UpdateAttr struct { ExpressionAttributeValues map[string]*dynamodb.AttributeValue `json:"ExpressionAttributeValues"` } -//ScanMeta for Scan request +// ScanMeta for Scan request type ScanMeta struct { TableName string `json:"TableName"` IndexName string `json:"IndexName"` @@ -158,39 +170,41 @@ type TableConfig struct { Indices map[string]TableConfig `json:"Indices,omitempty"` GCSSourcePath string `json:"GcsSourcePath,omitempty"` DDBIndexName string `json:"DdbIndexName,omitempty"` - SpannerIndexName string `json:"Table,omitempty"` + SpannerIndexName string `json:"SpannerIndexName,omitempty"` IsPadded bool `json:"IsPadded,omitempty"` IsComplement bool `json:"IsComplement,omitempty"` TableSource string `json:"TableSource,omitempty"` ActualTable string `json:"ActualTable,omitempty"` } -//BatchWriteItem for Batch Operation +// BatchWriteItem for Batch Operation type BatchWriteItem struct { RequestItems map[string][]BatchWriteSubItems `json:"RequestItems"` } -//BatchWriteItemResponse for Batch Operation +// BatchWriteItemResponse for Batch Operation type BatchWriteItemResponse struct { UnprocessedItems map[string][]BatchWriteSubItems `json:"UnprocessedItems"` } -//BatchWriteSubItems is for BatchWriteItem +// BatchWriteSubItems is for BatchWriteItem type BatchWriteSubItems struct { DelReq BatchDeleteItem `json:"DeleteRequest"` PutReq BatchPutItem `json:"PutRequest"` } -//BatchDeleteItem is for BatchWriteSubItems +// BatchDeleteItem is for BatchWriteSubItems type BatchDeleteItem struct { Key map[string]*dynamodb.AttributeValue `json:"Key"` } -//BatchPutItem is for BatchWriteSubItems +// BatchPutItem is for BatchWriteSubItems type BatchPutItem struct { Item map[string]*dynamodb.AttributeValue `json:"Item"` } +var DbConfigMap map[string]TableConfig + // TableDDL - this contains the DDL var TableDDL map[string]map[string]string @@ -208,10 +222,10 @@ var OriginalColResponse map[string]string func init() { TableDDL = make(map[string]map[string]string) - TableDDL["dynamodb_adapter_table_ddl"] = map[string]string{"tableName": "STRING(MAX)", "column": "STRING(MAX)", "dataType": "STRING(MAX)", "originalColumn": "STRING(MAX)"} + TableDDL["dynamodb_adapter_table_ddl"] = map[string]string{"tableName": "STRING(MAX)", "column": "STRING(MAX)", "dataType": "STRING(MAX)", "originalColumn": "STRING(MAX)", "partitionKey": "STRING(MAX)", "sortKey": "STRING(MAX)", "spannerIndexName": "STRING(MAX)", "actualTable": "STRING(MAX)"} TableDDL["dynamodb_adapter_config_manager"] = map[string]string{"tableName": "STRING(MAX)", "config": "STRING(MAX)", "cronTime": "STRING(MAX)", "uniqueValue": "STRING(MAX)", "enabledStream": "STRING(MAX)", "pubsubTopic": "STRING(MAX)"} TableColumnMap = make(map[string][]string) - TableColumnMap["dynamodb_adapter_table_ddl"] = []string{"tableName", "column", "dataType", "originalColumn"} + TableColumnMap["dynamodb_adapter_table_ddl"] = []string{"tableName", "column", "dataType", "originalColumn", "partitionKey", "sortKey", "spannerIndexName", "actualTable"} TableColumnMap["dynamodb_adapter_config_manager"] = []string{"tableName", "config", "cronTime", "uniqueValue", "enabledStream", "pubsubTopic"} TableColChangeMap = make(map[string]struct{}) ColumnToOriginalCol = make(map[string]string) diff --git a/service/spanner/spanner.go b/service/spanner/spanner.go index 51df05d..bf61a6e 100644 --- a/service/spanner/spanner.go +++ b/service/spanner/spanner.go @@ -16,25 +16,36 @@ package spanner import ( "context" + "strings" + "cloud.google.com/go/spanner" "github.com/cloudspannerecosystem/dynamodb-adapter/models" "github.com/cloudspannerecosystem/dynamodb-adapter/storage" - - "cloud.google.com/go/spanner" ) // ParseDDL - this will parse DDL of spannerDB and set all the table configs in models // This fetches the spanner schema config from dynamodb_adapter_table_ddl table and stored it in // global map object which is used to read and write data into spanner tables -func ParseDDL(updateDB bool) error { +// InitConfig loads ConfigurationMap and DbConfigMap in memory based on +// ACTIVE_ENV. If ACTIVE_ENV is not set or and empty string the environment +// is defaulted to staging. +// +// These config files are read from rice-box + +func ParseDDL(updateDB bool) error { stmt := spanner.Statement{} + stmt.SQL = "SELECT * FROM dynamodb_adapter_table_ddl" - ms, err := storage.GetStorageInstance().ExecuteSpannerQuery(context.Background(), "dynamodb_adapter_table_ddl", []string{"tableName", "column", "dataType", "originalColumn"}, false, stmt) + ms, err := storage.GetStorageInstance().ExecuteSpannerQuery(context.Background(), "dynamodb_adapter_table_ddl", []string{"tableName", "column", "dataType", "originalColumn", "partitionKey", "sortKey", "spannerIndexName", "actualTable"}, false, stmt) + if err != nil { return err } + if models.DbConfigMap == nil { + models.DbConfigMap = make(map[string]models.TableConfig) + } if len(ms) > 0 { for i := 0; i < len(ms); i++ { @@ -43,6 +54,16 @@ func ParseDDL(updateDB bool) error { column = strings.Trim(column, "`") dataType := ms[i]["dataType"].(string) originalColumn, ok := ms[i]["originalColumn"].(string) + partitionKey := ms[i]["partitionKey"].(string) + sortKey, _ := ms[i]["sortKey"].(string) // Optional, check if available + spannerIndexName, _ := ms[i]["spannerIndexName"].(string) + models.DbConfigMap[tableName] = models.TableConfig{ + PartitionKey: partitionKey, + SortKey: sortKey, + SpannerIndexName: spannerIndexName, + ActualTable: tableName, + } + if ok { originalColumn = strings.Trim(originalColumn, "`") if column != originalColumn && originalColumn != "" { diff --git a/storage/spanner.go b/storage/spanner.go index 544f857..6194eae 100755 --- a/storage/spanner.go +++ b/storage/spanner.go @@ -115,11 +115,15 @@ func (s Storage) SpannerGet(ctx context.Context, tableName string, pKeys, sKeys // ExecuteSpannerQuery - this will execute query on spanner database func (s Storage) ExecuteSpannerQuery(ctx context.Context, table string, cols []string, isCountQuery bool, stmt spanner.Statement) ([]map[string]interface{}, error) { + colDLL, ok := models.TableDDL[utils.ChangeTableNameForSpanner(table)] + if !ok { return nil, errors.New("ResourceNotFoundException", table) } + itr := s.getSpannerClient(table).Single().WithTimestampBound(spanner.ExactStaleness(time.Second*10)).Query(ctx, stmt) + defer itr.Stop() allRows := []map[string]interface{}{} for { @@ -146,6 +150,7 @@ func (s Storage) ExecuteSpannerQuery(ctx context.Context, table string, cols []s } allRows = append(allRows, singleRow) } + return allRows, nil } @@ -733,7 +738,7 @@ func evaluateStatementFromRowMap(conditionalExpression, colName string, rowMap m return true } _, ok := rowMap[colName] - return !ok + return !ok } if strings.HasPrefix(conditionalExpression, "attribute_exists") || strings.HasPrefix(conditionalExpression, "if_exists") { if len(rowMap) == 0 { @@ -745,7 +750,7 @@ func evaluateStatementFromRowMap(conditionalExpression, colName string, rowMap m return rowMap[conditionalExpression] } -//parseRow - Converts Spanner row and datatypes to a map removing null columns from the result. +// parseRow - Converts Spanner row and datatypes to a map removing null columns from the result. func parseRow(r *spanner.Row, colDDL map[string]string) (map[string]interface{}, error) { singleRow := make(map[string]interface{}) if r == nil { diff --git a/storage/storage.go b/storage/storage.go index ab81de5..52cad37 100755 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,10 +23,8 @@ import ( "syscall" "cloud.google.com/go/spanner" - "github.com/cloudspannerecosystem/dynamodb-adapter/config" "github.com/cloudspannerecosystem/dynamodb-adapter/models" "github.com/cloudspannerecosystem/dynamodb-adapter/pkg/logger" - "github.com/cloudspannerecosystem/dynamodb-adapter/utils" ) // Storage object for intracting with storage package @@ -37,10 +35,10 @@ type Storage struct { // storage - global instance of storage var storage *Storage -func initSpannerDriver(instance string) *spanner.Client { +func initSpannerDriver() *spanner.Client { conf := spanner.ClientConfig{} - str := "projects/" + config.ConfigurationMap.GoogleProjectID + "/instances/" + instance + "/databases/" + config.ConfigurationMap.SpannerDb + str := "projects/" + models.GlobalConfig.Spanner.ProjectID + "/instances/" + models.GlobalConfig.Spanner.InstanceID + "/databases/" + models.GlobalConfig.Spanner.DatabaseName Client, err := spanner.NewClientWithConfig(context.Background(), str, conf) if err != nil { logger.LogFatal(err) @@ -50,14 +48,10 @@ func initSpannerDriver(instance string) *spanner.Client { // InitializeDriver - this will Initialize databases object in global map func InitializeDriver() { - storage = new(Storage) storage.spannerClient = make(map[string]*spanner.Client) - for _, v := range models.SpannerTableMap { - if _, ok := storage.spannerClient[v]; !ok { - storage.spannerClient[v] = initSpannerDriver(v) - } - } + storage.spannerClient[models.GlobalConfig.Spanner.InstanceID] = initSpannerDriver() + } // Close - This gracefully returns the session pool objects, when driver gets exit signal @@ -85,6 +79,6 @@ func GetStorageInstance() *Storage { return storage } -func (s Storage) getSpannerClient(tableName string) *spanner.Client { - return s.spannerClient[models.SpannerTableMap[utils.ChangeTableNameForSpanner(tableName)]] +func (s Storage) getSpannerClient(_ string) *spanner.Client { + return s.spannerClient[models.GlobalConfig.Spanner.InstanceID] } From 7da05d5d78488a98f1eaa2d206e4a6cf5f83d7ff Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Mon, 2 Dec 2024 13:12:48 +0530 Subject: [PATCH 2/7] updated readme file --- README.md | 193 +++++++++++------------------------------------------- 1 file changed, 40 insertions(+), 153 deletions(-) diff --git a/README.md b/README.md index 3b9d84d..953065b 100644 --- a/README.md +++ b/README.md @@ -59,16 +59,19 @@ DynamoDB Adapter currently supports the following DynamoDB data types ## Configuration -DynamoDB Adapter requires two tables to store metadata and configuration for -the project: `dynamodb_adapter_table_ddl` and -`dynamodb_adapter_config_manager`. There are also three configuration files -required by the adapter: `config.json`, `spanner.json`, `tables.json`. +### config.yaml +This file defines the necessary settings for the adapter. A sample configuration might look like this: -By default there are two folders **production** and **staging** in -[config-files](./config-files). This is configurable by using the enviroment -variable `ACTIVE_ENV` and can be set to other environment names, so long as -there is a matching directory in the `config-files` directory. If `ACTIVE_ENV` -is not set the default environtment is **staging**. + + spanner: + project_id: "my-project-id" + instance_id: "my-instance-id" + database_name: "my-database-name" + +The fields are: +project_id: The Google Cloud project ID. +instance_id: The Spanner instance ID. +database_name: The database name in Spanner. ### dynamodb_adapter_table_ddl @@ -79,121 +82,43 @@ present in DynamoDB. This mapping is required because DynamoDB supports the special characters in column names while Cloud Spanner only supports underscores(_). For more: [Spanner Naming Conventions](https://cloud.google.com/spanner/docs/data-definition-language#naming_conventions) -```sql -CREATE TABLE -dynamodb_adapter_table_ddl -( - column STRING(MAX), - tableName STRING(MAX), - dataType STRING(MAX), - originalColumn STRING(MAX), -) PRIMARY KEY (tableName, column) -``` - -![dynamodb_adapter_table_ddl sample data](images/config_spanner.png) - -### dynamodb_adapter_config_manager - -`dynamodb_adapter_config_manager` contains the Pub/Sub configuration used for -DynamoDB Stream compatability. It is used to do some additional operation -required on the change of data in tables. It can trigger New and Old data on -given Pub/Sub topic. - -```sql -CREATE TABLE -dynamodb_adapter_config_manager -( - tableName STRING(MAX), - config STRING(MAX), - cronTime STRING(MAX), - enabledStream STRING(MAX), - pubsubTopic STRING(MAX), - uniqueValue STRING(MAX), -) PRIMARY KEY (tableName) -``` - -### config-files/{env}/config.json +### Initialization Modes +DynamoDB Adapter supports two modes of initialization: -`config.json` contains the basic settings for DynamoDB Adapter; GCP Project, -Cloud Spanner Database and query record limit. +#### Dry Run Mode +This mode generates the Spanner queries required to: -| Key | Description -| ----------------- | ----------- -| GoogleProjectID | Your Google Project ID -| SpannerDb | Your Spanner Database Name -| QueryLimit | Default limit for the number of records returned in query +Create the dynamodb_adapter_table_ddl table in Spanner. +Insert metadata for all DynamoDB tables into dynamodb_adapter_table_ddl. +These queries are printed to the console without executing them on Spanner, allowing you to review them before making changes. -For example: - -```json -{ - "GoogleProjectID" : "first-project", - "SpannerDb" : "test-db", - "QueryLimit" : 5000 -} +```sh +go run init.go --dry_run ``` -### config-files/{env}/spanner.json - -`spanner.json` is a key/value mapping file for table names with a Cloud Spanner -instance ids. This enables the adapter to query data for a particular table on -different Cloud Spanner instances. +#### Execution Mode +This mode executes the Spanner queries generated during the dry run on the Spanner instance. It will: -For example: +Create the dynamodb_adapter_table_ddl table in Spanner if it does not exist. +Insert metadata for all DynamoDB tables into the dynamodb_adapter_table_ddl table. -```json -{ - "dynamodb_adapter_table_ddl": "spanner-2 ", - "dynamodb_adapter_config_manager": "spanner-2", - "tableName1": "spanner-1", - "tableName2": "spanner-1" - ... - ... -} +```sh +go run init.go ``` -### config-files/{env}/tables.json - -`tables.json` contains the description of the tables as they appear in -DynamoDB. This includes all table's primary key, columns and index information. -This file supports the update and query operations by providing the primary -key, sort key and any other indexes present. - -| Key | Description -| ----------------- | ----------- -| tableName | Name of the table in DynamoDB -| partitionKey | Primary key of the table in DynamoDB -| sortKey | Sorting key of the table in DynamoDB -| attributeTypes | Key/Value list of column names and type -| indices | Collection of index objects that represent the indexes present in the DynamoDB table - -For example: - -```json -{ - "tableName": { - "partitionKey": "primary key or Partition key", - "sortKey": "sorting key of dynamoDB adapter", - "attributeTypes": { - "column_a": "N", - "column_b": "S", - "column_of_bytes": "B", - "my_boolean_column": "BOOL" - }, - "indices": { - "indexName1": { - "sortKey": "sort key for indexName1", - "partitionKey": "partition key for indexName1" - }, - "another_index": { - "sortKey": "sort key for another_index", - "partitionKey": "partition key for another_index" - } - } - }, - ..... - ..... -} +### Prerequisites for Initialization +AWS CLI: +Configure AWS credentials: +```sh +aws configure set aws_access_key_id YOUR_ACCESS_KEY +aws configure set aws_secret_access_key YOUR_SECRET_KEY +aws configure set default.region YOUR_REGION +``` +Google Cloud CLI: +Authenticate and set up your environment: +```sh +gcloud auth application-default login +gcloud config set project [MY_PROJECT_NAME] ``` ## Starting DynamoDB Adapter @@ -219,49 +144,11 @@ gcloud config set project [MY_PROJECT NAME] ``` ```sh -export ACTIVE_ENV=PRODUCTION go run main.go ``` -### Internal Startup Stages - -When DynamoDB Adapter starts up the following steps are performed: - -* Stage 1 - Configuration is loaded according the Environment Variable - *ACTIVE_ENV* -* Stage 2 - Connections to Cloud Spanner instances are initialized. - Connections to all the instances are started it doesn't need to start the - connection again and again for every request. -* Stage 3 - `dynamodb_adapter_table_ddl` is parsed and will stored in ram for - faster access of data. -* Stage 4 - `dynamodb_adapter_config_manager` is loaded into ram. The adapter - will check every 1 min if configuration has been changed, if data is changed - it will be updated in memory. -* Stage 5 - Start the API listener to accept DynamoDB operations. - -## Advanced - -### Embedding the Configuration -The rice-box package can be used to increase preformance by converting the -configuration files into Golang source code and there by compiling them into -the binary. If they are not found in the binary rice-box will look to the -disk for the configuration files. -#### Install rice package - -This package is required to load the config files. This is required in the -first step of the running DynamoDB Adapter. - -Follow the [link](https://github.com/GeertJohan/go.rice#installation). - -#### run command for creating the file - -This is required to increase the performance when any config file is changed -so that configuration files can be loaded directly from go file. - -```sh -rice embed-go ``` ## API Documentation From 7ff2a0af1f0349536a99be84e9e21de204d706aa Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Wed, 4 Dec 2024 10:10:02 +0530 Subject: [PATCH 3/7] spanner datatype changes --- api/v1/db.go | 4 ++-- config/config_test.go | 28 ++++++++++++++-------------- models/model.go | 3 ++- service/services/services.go | 2 +- storage/spanner.go | 4 ++-- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/api/v1/db.go b/api/v1/db.go index e499c65..9a7c8de 100644 --- a/api/v1/db.go +++ b/api/v1/db.go @@ -59,7 +59,7 @@ func RouteRequest(c *gin.Context) { case "UpdateItem": Update(c) default: - c.JSON(errors.New("ValidationException", "Invalid X-Amz-Target header value of" + amzTarget). + c.JSON(errors.New("ValidationException", "Invalid X-Amz-Target header value of"+amzTarget). HTTPResponse("X-Amz-Target Header not supported")) } } @@ -186,7 +186,7 @@ func queryResponse(query models.Query, c *gin.Context) { } if query.Limit == 0 { - query.Limit = config.ConfigurationMap.QueryLimit + query.Limit = models.GlobalConfig.Spanner.QueryLimit } query.ExpressionAttributeNames = ChangeColumnToSpannerExpressionName(query.TableName, query.ExpressionAttributeNames) query = ReplaceHashRangeExpr(query) diff --git a/config/config_test.go b/config/config_test.go index 2be07e0..25aaa70 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,16 +1,16 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// // Copyright 2020 Google LLC +// // +// // Licensed under the Apache License, Version 2.0 (the "License"); +// // you may not use this file except in compliance with the License. +// // You may obtain a copy of the License at +// // +// // http://www.apache.org/licenses/LICENSE-2.0 +// // +// // Unless required by applicable law or agreed to in writing, software +// // distributed under the License is distributed on an "AS IS" BASIS, +// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// // See the License for the specific language governing permissions and +// // limitations under the License. package config @@ -22,7 +22,7 @@ import ( ) func TestGetTableConf(t *testing.T) { - DbConfigMap = map[string]models.TableConfig{ + models.DbConfigMap = map[string]models.TableConfig{ "employee_data": { PartitionKey: "emp_id", SortKey: "emp_name", diff --git a/models/model.go b/models/model.go index 50160e1..77de6e6 100644 --- a/models/model.go +++ b/models/model.go @@ -26,6 +26,7 @@ type SpannerConfig struct { ProjectID string `yaml:"project_id"` InstanceID string `yaml:"instance_id"` DatabaseName string `yaml:"database_name"` + QueryLimit int64 `yaml:"query_limit"` } type Config struct { @@ -222,7 +223,7 @@ var OriginalColResponse map[string]string func init() { TableDDL = make(map[string]map[string]string) - TableDDL["dynamodb_adapter_table_ddl"] = map[string]string{"tableName": "STRING(MAX)", "column": "STRING(MAX)", "dataType": "STRING(MAX)", "originalColumn": "STRING(MAX)", "partitionKey": "STRING(MAX)", "sortKey": "STRING(MAX)", "spannerIndexName": "STRING(MAX)", "actualTable": "STRING(MAX)"} + TableDDL["dynamodb_adapter_table_ddl"] = map[string]string{"tableName": "S", "column": "S", "dataType": "S", "originalColumn": "S", "partitionKey": "S", "sortKey": "S", "spannerIndexName": "S", "actualTable": "S"} TableDDL["dynamodb_adapter_config_manager"] = map[string]string{"tableName": "STRING(MAX)", "config": "STRING(MAX)", "cronTime": "STRING(MAX)", "uniqueValue": "STRING(MAX)", "enabledStream": "STRING(MAX)", "pubsubTopic": "STRING(MAX)"} TableColumnMap = make(map[string][]string) TableColumnMap["dynamodb_adapter_table_ddl"] = []string{"tableName", "column", "dataType", "originalColumn", "partitionKey", "sortKey", "spannerIndexName", "actualTable"} diff --git a/service/services/services.go b/service/services/services.go index 9d41351..1d3625e 100644 --- a/service/services/services.go +++ b/service/services/services.go @@ -539,7 +539,7 @@ func Scan(ctx context.Context, scanData models.ScanMeta) (map[string]interface{} query.TableName = scanData.TableName query.Limit = scanData.Limit if query.Limit == 0 { - query.Limit = config.ConfigurationMap.QueryLimit + query.Limit = models.GlobalConfig.Spanner.QueryLimit } query.StartFrom = scanData.StartFrom query.RangeValMap = scanData.ExpressionAttributeMap diff --git a/storage/spanner.go b/storage/spanner.go index 6194eae..2c177ae 100755 --- a/storage/spanner.go +++ b/storage/spanner.go @@ -767,7 +767,7 @@ func parseRow(r *spanner.Row, colDDL map[string]string) (map[string]interface{}, return nil, errors.New("ResourceNotFoundException", k) } switch v { - case "STRING(MAX)": + case "S": var s spanner.NullString err := r.Column(i, &s) if err != nil { @@ -833,7 +833,7 @@ func parseRow(r *spanner.Row, colDDL map[string]string) (map[string]interface{}, } singleRow[k] = m } - case "INT64": + case "N": var s spanner.NullInt64 err := r.Column(i, &s) if err != nil { From 9d1bc95f02ef5c8266f48680abbd24512fe0c236 Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Mon, 9 Dec 2024 09:15:22 +0530 Subject: [PATCH 4/7] Added DDL capability for tables --- config-files/init.go | 374 ++++++++++++++++++++++++++++++------------- utils/utils.go | 18 +++ 2 files changed, 285 insertions(+), 107 deletions(-) diff --git a/config-files/init.go b/config-files/init.go index 6c8bc14..13daa7f 100644 --- a/config-files/init.go +++ b/config-files/init.go @@ -16,12 +16,15 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/cloudspannerecosystem/dynamodb-adapter/models" + "github.com/cloudspannerecosystem/dynamodb-adapter/utils" "google.golang.org/genproto/googleapis/spanner/admin/database/v1" "gopkg.in/yaml.v3" ) +// Define a global variable for reading files (mockable for tests) var readFile = os.ReadFile +// DDL statement to create the DynamoDB adapter table in Spanner var ( adapterTableDDL = ` CREATE TABLE dynamodb_adapter_table_ddl ( @@ -36,36 +39,41 @@ var ( ) PRIMARY KEY (tableName, column)` ) +// Entry point for the application func main() { + // Parse command-line arguments for dry-run mode dryRun := flag.Bool("dry_run", false, "Run the program in dry-run mode to output DDL and queries without making changes") flag.Parse() + // Load configuration from a YAML file config, err := loadConfig("../config.yaml") if err != nil { log.Fatalf("Error loading configuration: %v", err) } - // Construct database name + + // Build the Spanner database name databaseName := fmt.Sprintf( "projects/%s/instances/%s/databases/%s", config.Spanner.ProjectID, config.Spanner.InstanceID, config.Spanner.DatabaseName, ) + + // Decide execution mode based on the dry-run flag if *dryRun { fmt.Println("-- Dry Run Mode: Generating Spanner DDL and Insert Queries Only --") - runDryRun(databaseName) + runDryRun() } else { fmt.Println("-- Executing Setup on Spanner --") executeSetup(databaseName) } } +// Load configuration from a YAML file func loadConfig(filename string) (*models.Config, error) { - // Read the file data, err := readFile(filename) if err != nil { return nil, fmt.Errorf("failed to read config file: %w", err) } - // Unmarshal YAML data into config struct var config models.Config if err := yaml.Unmarshal(data, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal config: %w", err) @@ -74,7 +82,8 @@ func loadConfig(filename string) (*models.Config, error) { return &config, nil } -func runDryRun(databaseName string) { +// Run in dry-run mode to output DDL and insert queries without making changes +func runDryRun() { fmt.Println("-- Spanner DDL to create the adapter table --") fmt.Println(adapterTableDDL) @@ -84,24 +93,78 @@ func runDryRun(databaseName string) { log.Fatalf("Failed to list DynamoDB tables: %v", err) } + // Process each DynamoDB table for _, tableName := range tables { fmt.Printf("Processing table: %s\n", tableName) + + // Generate and print table-specific DDL + ddl := generateTableDDL(tableName, client) + fmt.Printf("-- DDL for table: %s --\n%s\n", tableName, ddl) + + // Generate and print insert queries generateInsertQueries(tableName, client) } } +// Generate DDL statement for a specific DynamoDB table +func generateTableDDL(tableName string, client *dynamodb.Client) string { + attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) + if err != nil { + log.Printf("Failed to fetch attributes for table %s: %v", tableName, err) + return "" + } + + var columns []string + for column, dataType := range attributes { + columns = append(columns, fmt.Sprintf("%s %s", column, utils.ConvertDynamoTypeToSpannerType(dataType))) + } + primaryKey := fmt.Sprintf("PRIMARY KEY (%s%s)", partitionKey, func() string { + if sortKey != "" { + return ", " + sortKey + } + return "" + }()) + + return fmt.Sprintf( + "CREATE TABLE %s (\n\t%s\n) %s", + tableName, strings.Join(columns, ",\n\t"), primaryKey, + ) +} + +// Generate insert queries for a given DynamoDB table +func generateInsertQueries(tableName string, client *dynamodb.Client) { + attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) + if err != nil { + log.Printf("Failed to fetch attributes for table %s: %v", tableName, err) + return + } + + for column, dataType := range attributes { + query := fmt.Sprintf( + `INSERT INTO dynamodb_adapter_table_ddl + (column, tableName, dataType, originalColumn, partitionKey, sortKey, spannerIndexName, actualTable) + VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');`, + column, tableName, dataType, column, partitionKey, sortKey, column, tableName, + ) + fmt.Println(query) + } +} + +// Execute the setup process: create database, tables, and migrate data func executeSetup(databaseName string) { ctx := context.Background() - // Create Spanner database and adapter table + // Create the Spanner database if it doesn't exist if err := createDatabase(ctx, databaseName); err != nil { log.Fatalf("Failed to create database: %v", err) } + + // Create the adapter table if err := createTable(ctx, databaseName, adapterTableDDL); err != nil { log.Fatalf("Failed to create adapter table: %v", err) } - // Fetch and migrate data + // Process each DynamoDB table client := createDynamoClient() tables, err := listDynamoTables(client) if err != nil { @@ -109,70 +172,157 @@ func executeSetup(databaseName string) { } for _, tableName := range tables { - if err := migrateDynamoTableToSpanner(ctx, databaseName, tableName, client); err != nil { - log.Printf("Failed to migrate table %s: %v", tableName, err) + // Generate and apply table-specific DDL + ddl := generateTableDDL(tableName, client) + if err := createTable(ctx, databaseName, ddl); err != nil { + log.Printf("Failed to create table %s: %v", tableName, err) + continue } + + // Migrate table metadata to Spanner + migrateDynamoTableToSpanner(ctx, databaseName, tableName, client) } - fmt.Println("Migration complete.") + + fmt.Println("Initial setup complete.") } +// migrateDynamoTableToSpanner migrates a DynamoDB table schema and metadata to Spanner. +func migrateDynamoTableToSpanner(ctx context.Context, db, tableName string, client *dynamodb.Client) error { + // Load configuration + config, err := loadConfig("../config.yaml") + if err != nil { + return fmt.Errorf("error loading configuration: %v", err) + } + models.SpannerTableMap[tableName] = config.Spanner.InstanceID + + // Fetch table attributes and keys from DynamoDB + attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) + if err != nil { + return fmt.Errorf("failed to fetch attributes for table %s: %v", tableName, err) + } + + // Fetch the current Spanner schema for the table + spannerSchema, err := fetchSpannerSchema(ctx, db, tableName) + if err != nil { + return fmt.Errorf("failed to fetch Spanner schema for table %s: %v", tableName, err) + } + + // Generate and apply DDL statements for missing columns + var ddlStatements []string + for column, dynamoType := range attributes { + if _, exists := spannerSchema[column]; !exists { + spannerType := utils.ConvertDynamoTypeToSpannerType(dynamoType) + ddlStatements = append(ddlStatements, fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", tableName, column, spannerType)) + } + } + if len(ddlStatements) > 0 { + if err := applySpannerDDL(ctx, db, ddlStatements); err != nil { + return fmt.Errorf("failed to apply DDL to table %s: %v", tableName, err) + } + log.Printf("Schema updated for table %s in Spanner.", tableName) + } + + // Check for columns that are in Spanner but not in DynamoDB (columns that should be dropped) + var dropColumnStatements []string + for column := range spannerSchema { + if _, exists := attributes[column]; !exists { + dropColumnStatements = append(dropColumnStatements, fmt.Sprintf("ALTER TABLE %s DROP COLUMN %s", tableName, column)) + } + } + + // Apply DDL to drop removed columns + if len(dropColumnStatements) > 0 { + if err := applySpannerDDL(ctx, db, dropColumnStatements); err != nil { + return fmt.Errorf("failed to apply DROP COLUMN DDL to table %s: %v", tableName, err) + } + log.Printf("Removed columns from table %s in Spanner.", tableName) + } + + // Prepare mutations to insert metadata into the adapter table + var mutations []*spanner.Mutation + for column, dataType := range attributes { + mutations = append(mutations, spanner.InsertOrUpdate( + "dynamodb_adapter_table_ddl", + []string{"column", "tableName", "dataType", "originalColumn", "partitionKey", "sortKey", "spannerIndexName", "actualTable"}, + []interface{}{column, tableName, dataType, column, partitionKey, sortKey, column, tableName}, + )) + } + + // Perform batch insert into Spanner + if err := spannerBatchInsert(ctx, db, mutations); err != nil { + return fmt.Errorf("failed to insert metadata for table %s into Spanner: %v", tableName, err) + } + + log.Printf("Successfully migrated metadata for table %s to Spanner.", tableName) + return nil +} + +// createDatabase creates a new Spanner database if it does not exist. func createDatabase(ctx context.Context, db string) error { + // Parse database ID matches := regexp.MustCompile("^(.*)/databases/(.*)$").FindStringSubmatch(db) if matches == nil || len(matches) != 3 { return fmt.Errorf("invalid database ID: %s", db) } + parent, dbName := matches[1], matches[2] + // Create Spanner Admin client adminClient, err := Admindatabase.NewDatabaseAdminClient(ctx) if err != nil { return fmt.Errorf("failed to create Spanner Admin client: %v", err) } defer adminClient.Close() + // Initiate database creation op, err := adminClient.CreateDatabase(ctx, &database.CreateDatabaseRequest{ - Parent: matches[1], - CreateStatement: "CREATE DATABASE `" + matches[2] + "`", + Parent: parent, + CreateStatement: "CREATE DATABASE `" + dbName + "`", }) - if err != nil { if strings.Contains(err.Error(), "AlreadyExists") { - log.Printf("Database `%s` already exists. Skipping creation.", matches[2]) + log.Printf("Database `%s` already exists. Skipping creation.", dbName) return nil } return fmt.Errorf("failed to initiate database creation: %v", err) } - if op == nil { - return fmt.Errorf("received nil operation for database creation") + // Wait for database creation to complete + if _, err = op.Wait(ctx); err != nil { + return fmt.Errorf("error while waiting for database creation: %v", err) } - - _, err = op.Wait(ctx) - if err != nil { - return fmt.Errorf("error while waiting for database creation to complete: %v", err) - } - - log.Printf("Database `%s` created successfully.", matches[2]) + log.Printf("Database `%s` created successfully.", dbName) return nil } +// createTable creates a table in Spanner if it does not already exist. func createTable(ctx context.Context, db, ddl string) error { + // Extract table name from DDL + re := regexp.MustCompile(`CREATE TABLE (\w+)`) + matches := re.FindStringSubmatch(ddl) + if len(matches) < 2 { + return fmt.Errorf("unable to extract table name from DDL: %s", ddl) + } + tableName := matches[1] + + // Create Spanner Admin client adminClient, err := Admindatabase.NewDatabaseAdminClient(ctx) if err != nil { return fmt.Errorf("failed to create Spanner Admin client: %v", err) } defer adminClient.Close() + // Create Spanner client client, err := spanner.NewClient(ctx, db) if err != nil { return fmt.Errorf("failed to create Spanner client: %v", err) } defer client.Close() + // Check if the table already exists stmt := spanner.Statement{ - SQL: `SELECT COUNT(*) - FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_NAME = @tableName`, + SQL: `SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = @tableName`, Params: map[string]interface{}{ - "tableName": "dynamodb_adapter_table_ddl", + "tableName": tableName, }, } iter := client.Single().Query(ctx, stmt) @@ -185,12 +335,12 @@ func createTable(ctx context.Context, db, ddl string) error { if err != nil { return fmt.Errorf("failed to query table existence: %w", err) } - if tableCount > 0 { - fmt.Println("Table `dynamodb_adapter_table_ddl` already exists. Skipping creation.") + log.Printf("Table `%s` already exists. Skipping creation.", tableName) return nil } + // Create the table op, err := adminClient.UpdateDatabaseDdl(ctx, &database.UpdateDatabaseDdlRequest{ Database: db, Statements: []string{ddl}, @@ -198,7 +348,6 @@ func createTable(ctx context.Context, db, ddl string) error { if err != nil { return fmt.Errorf("failed to create table: %w", err) } - return op.Wait(ctx) } @@ -210,49 +359,10 @@ func listDynamoTables(client *dynamodb.Client) ([]string, error) { return output.TableNames, nil } -func migrateDynamoTableToSpanner(ctx context.Context, db, tableName string, client *dynamodb.Client) error { - // Fetch attributes, partition key, and sort key from DynamoDB table - config, err := loadConfig("../config.yaml") - if err != nil { - log.Fatalf("Error loading configuration: %v", err) - } - models.SpannerTableMap[tableName] = config.Spanner.InstanceID - attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) - if err != nil { - return fmt.Errorf("failed to fetch attributes for table %s: %v", tableName, err) - } - - // Generate Spanner index name and actual table name - // spannerIndexName := fmt.Sprintf("index_%s", tableName) - actualTable := tableName - - // Prepare mutations to insert data into the adapter table - var mutations []*spanner.Mutation - for column, dataType := range attributes { - mutations = append(mutations, spanner.InsertOrUpdate( - "dynamodb_adapter_table_ddl", - []string{ - "column", "tableName", "dataType", "originalColumn", - "partitionKey", "sortKey", "spannerIndexName", "actualTable", - }, - []interface{}{ - column, tableName, dataType, column, - partitionKey, sortKey, column, actualTable, - }, - )) - } - - // Perform batch insert into Spanner - if err := spannerBatchInsert(ctx, db, mutations); err != nil { - return fmt.Errorf("failed to insert metadata for table %s into Spanner: %v", tableName, err) - } - - log.Printf("Successfully migrated metadata for DynamoDB table %s to Spanner.", tableName) - return nil -} - +// fetchTableAttributes retrieves attributes and key schema (partition and sort keys) of a DynamoDB table. +// It describes the table to get its key schema and scans the table to infer attribute types. func fetchTableAttributes(client *dynamodb.Client, tableName string) (map[string]string, string, string, error) { - // Fetch table description + // Describe the DynamoDB table to get its key schema and attributes. output, err := client.DescribeTable(context.TODO(), &dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) @@ -260,19 +370,21 @@ func fetchTableAttributes(client *dynamodb.Client, tableName string) (map[string return nil, "", "", fmt.Errorf("failed to describe table %s: %w", tableName, err) } - // Extract partition key and sort key var partitionKey, sortKey string + // Extract the partition key and sort key from the table's key schema. for _, keyElement := range output.Table.KeySchema { switch keyElement.KeyType { case dynamodbtypes.KeyTypeHash: - partitionKey = aws.ToString(keyElement.AttributeName) + partitionKey = aws.ToString(keyElement.AttributeName) // Partition key case dynamodbtypes.KeyTypeRange: - sortKey = aws.ToString(keyElement.AttributeName) + sortKey = aws.ToString(keyElement.AttributeName) // Sort key } } - // Extract attributes from the table + // Map to store inferred attribute types. attributes := make(map[string]string) + + // Scan the table to retrieve data and infer attribute types. scanOutput, err := client.Scan(context.TODO(), &dynamodb.ScanInput{ TableName: aws.String(tableName), }) @@ -280,6 +392,7 @@ func fetchTableAttributes(client *dynamodb.Client, tableName string) (map[string return nil, "", "", fmt.Errorf("failed to scan table %s: %w", tableName, err) } + // Iterate through the items and infer the attribute types. for _, item := range scanOutput.Items { for attr, value := range item { attributes[attr] = inferDynamoDBType(value) @@ -289,70 +402,117 @@ func fetchTableAttributes(client *dynamodb.Client, tableName string) (map[string return attributes, partitionKey, sortKey, nil } +// inferDynamoDBType determines the type of a DynamoDB attribute based on its value. func inferDynamoDBType(attr dynamodbtypes.AttributeValue) string { + // Check the attribute type and return the corresponding DynamoDB type. switch attr.(type) { case *dynamodbtypes.AttributeValueMemberS: - return "S" + return "S" // String type case *dynamodbtypes.AttributeValueMemberN: - return "N" + return "N" // Number type case *dynamodbtypes.AttributeValueMemberB: - return "B" + return "B" // Binary type case *dynamodbtypes.AttributeValueMemberBOOL: - return "BOOL" + return "BOOL" // Boolean type case *dynamodbtypes.AttributeValueMemberSS: - return "SS" + return "SS" // String Set type case *dynamodbtypes.AttributeValueMemberNS: - return "NS" + return "NS" // Number Set type case *dynamodbtypes.AttributeValueMemberBS: - return "BS" + return "BS" // Binary Set type case *dynamodbtypes.AttributeValueMemberNULL: - return "NULL" + return "NULL" // Null type case *dynamodbtypes.AttributeValueMemberM: - return "M" + return "M" // Map type case *dynamodbtypes.AttributeValueMemberL: - return "L" + return "L" // List type default: log.Printf("Unknown DynamoDB attribute type: %T\n", attr) - return "Unknown" + return "Unknown" // Unknown type } } -func spannerBatchInsert(ctx context.Context, db string, mutations []*spanner.Mutation) error { - client, err := spanner.NewClient(ctx, db) +// spannerBatchInsert applies a batch of mutations to a Spanner database. +func spannerBatchInsert(ctx context.Context, databaseName string, mutations []*spanner.Mutation) error { + // Create a Spanner client. + client, err := spanner.NewClient(ctx, databaseName) if err != nil { - return err + return fmt.Errorf("failed to create Spanner client: %w", err) } - defer client.Close() + defer client.Close() // Ensure the client is closed after the operation. + // Apply the batch of mutations to the database. _, err = client.Apply(ctx, mutations) return err } +// createDynamoClient initializes a DynamoDB client using default AWS configuration. func createDynamoClient() *dynamodb.Client { + // Load the default AWS configuration. cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { - log.Fatalf("Failed to load AWS configuration: %v", err) + log.Fatalf("failed to load AWS config: %v", err) } - return dynamodb.NewFromConfig(cfg) + return dynamodb.NewFromConfig(cfg) // Return the configured client. } -func generateInsertQueries(tableName string, client *dynamodb.Client) { - attributes, partitionKey, sortKey, err := fetchTableAttributes(client, tableName) +// fetchSpannerSchema retrieves the schema of a Spanner table by querying the INFORMATION_SCHEMA. +func fetchSpannerSchema(ctx context.Context, db, tableName string) (map[string]string, error) { + // Create a Spanner client. + client, err := spanner.NewClient(ctx, db) if err != nil { - log.Printf("Failed to fetch attributes for table %s: %v", tableName, err) - return + return nil, fmt.Errorf("failed to create Spanner client: %v", err) } + defer client.Close() // Ensure the client is closed after the operation. - spannerIndexName := fmt.Sprintf("index_%s", tableName) - actualTable := tableName + // Query the schema information for the specified table. + stmt := spanner.Statement{ + SQL: `SELECT COLUMN_NAME, SPANNER_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = @tableName`, + Params: map[string]interface{}{ + "tableName": tableName, + }, + } + iter := client.Single().Query(ctx, stmt) + defer iter.Stop() // Ensure the iterator is stopped after use. - for column, dataType := range attributes { - query := fmt.Sprintf( - `INSERT INTO dynamodb_adapter_table_ddl - (column, tableName, dataType, originalColumn, partitionKey, sortKey, spannerIndexName, actualTable) - VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');`, - column, tableName, dataType, column, partitionKey, sortKey, spannerIndexName, actualTable, - ) - fmt.Println(query) + // Map to store the schema information. + schema := make(map[string]string) + err = iter.Do(func(row *spanner.Row) error { + var columnName, spannerType string + // Extract column name and type from the row. + if err := row.Columns(&columnName, &spannerType); err != nil { + return err + } + schema[columnName] = spannerType + return nil + }) + if err != nil { + return nil, err + } + return schema, nil +} + +// applySpannerDDL executes DDL statements on a Spanner database. +func applySpannerDDL(ctx context.Context, db string, ddlStatements []string) error { + // Create a Spanner Admin client. + adminClient, err := Admindatabase.NewDatabaseAdminClient(ctx) + if err != nil { + return fmt.Errorf("failed to create Spanner Admin client: %v", err) } + defer adminClient.Close() // Ensure the client is closed after the operation. + + // Initiate the DDL update operation. + op, err := adminClient.UpdateDatabaseDdl(ctx, &database.UpdateDatabaseDdlRequest{ + Database: db, + Statements: ddlStatements, + }) + if err != nil { + return fmt.Errorf("failed to initiate DDL update: %v", err) + } + + // Wait for the DDL update operation to complete. + if err := op.Wait(ctx); err != nil { + return fmt.Errorf("error while waiting for DDL update to complete: %v", err) + } + return nil } diff --git a/utils/utils.go b/utils/utils.go index 63f5c9a..52e0c18 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -172,3 +172,21 @@ func ChangeTableNameForSpanner(tableName string) string { tableName = strings.ReplaceAll(tableName, "-", "_") return tableName } + +// Convert DynamoDB data types to equivalent Spanner types +func ConvertDynamoTypeToSpannerType(dynamoType string) string { + switch dynamoType { + case "S": + return "STRING(MAX)" + case "N": + return "FLOAT64" + case "B": + return "BYTES(MAX)" + case "BOOL": + return "BOOL" + case "NULL": + return "NULL" + default: + return "UNKNOWN" + } +} From 7efdc27c7c1c5aa7f323eb3c7c498f31ef986899 Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Thu, 19 Dec 2024 19:42:03 +0530 Subject: [PATCH 5/7] Added scripts to test apis --- config-files/README.MD | 119 ++++++++++++++ config-files/testScript.go | 323 +++++++++++++++++++++++++++++++++++++ 2 files changed, 442 insertions(+) create mode 100644 config-files/README.MD create mode 100644 config-files/testScript.go diff --git a/config-files/README.MD b/config-files/README.MD new file mode 100644 index 0000000..d6c8fca --- /dev/null +++ b/config-files/README.MD @@ -0,0 +1,119 @@ + +# DynamoDB Adapter Demo Setup + +This README outlines the steps to set up a test environment for using the DynamoDB adapter with Cloud Spanner. Follow these instructions to create the necessary tables, insert sample data, and configure the adapter. + +## 1. Create a Sample Table for Demo Operations + +The following SQL statement creates a table `employee_table` in Cloud Spanner, which will be used to demonstrate DynamoDB operations through the adapter: + +```sql +CREATE TABLE employee_table ( + emp_name STRING(MAX), + emp_id FLOAT64, + emp_image BYTES(MAX), + isHired BOOL +) PRIMARY KEY(emp_id); +``` + +## 2. Create `dynamodb_adapter_table_ddl` Table + +The DynamoDB adapter requires a table to store metadata about the schema for other tables. Use the following SQL statement to create this table: + +```sql +CREATE TABLE dynamodb_adapter_table_ddl ( + column STRING(MAX), + tableName STRING(MAX), + dataType STRING(MAX), + originalColumn STRING(MAX), +) PRIMARY KEY (tableName, column); +``` + +## 3. Insert Data into `dynamodb_adapter_table_ddl` + +Once the `dynamodb_adapter_table_ddl` table is created, insert the metadata for `employee_table` as follows: + + + +```sql +INSERT INTO dynamodb_adapter_table_ddl (column, tableName, dataType, originalColumn) +VALUES ('emp_name', 'employee_table', 'STRING(MAX)', 'emp_name'); + +INSERT INTO dynamodb_adapter_table_ddl (column, tableName, dataType, originalColumn) +VALUES ('emp_id', 'employee_table', 'FLOAT64', 'emp_id'); + +INSERT INTO dynamodb_adapter_table_ddl (column, tableName, dataType, originalColumn) +VALUES ('emp_image', 'employee_table', 'BYTES(MAX)', 'emp_image'); + +INSERT INTO dynamodb_adapter_table_ddl (column, tableName, dataType, originalColumn) +VALUES ('isHired', 'employee_table', 'BOOL', 'isHired'); +``` + +## 4. Configuration Files + +The DynamoDB adapter requires several configuration files to function properly. Below are examples of these configuration files: + +### 4.1. `config.json` + +The `config.json` file contains the basic settings for the DynamoDB Adapter. Replace the `GoogleProjectID` and `SpannerDb` values with your own project and database names: + +```json +{ + "GoogleProjectID": "cassandra-to-spanner", + "SpannerDb": "cluster10", + "QueryLimit": 5000 +} +``` + +### 4.2. `spanner.json` + +The `spanner.json` file maps table names to Cloud Spanner instance IDs. Define the instance where each table resides: + +```json +{ + "dynamodb_adapter_table_ddl": "spanner-instance-dev", + "dynamodb_adapter_config_manager": "spanner-instance-dev", + "employee_table": "spanner-instance-dev" +} +``` + +### 4.3. `tables.json` + +The `tables.json` file describes the DynamoDB table schema, including the partition and sort keys. Here's an example for `employee_table`: + +```json +{ + "employee_table": { + "partitionKey": "guid", + "sortKey": "context", + "spannerIndexName": "guid", + "actualTable": "employee_table" + } +} +``` + +## 5. Build the Project + +Once the configuration files are set up, build the DynamoDB Adapter project by running: + +```bash +go build +``` + +## 6. Start the Adapter + +After building the project, start the DynamoDB Adapter by running the following command: + +```bash +./dynamodb-adapter +``` + +--- + +Follow these steps to set up your DynamoDB Adapter environment and perform necessary operations using the `employee_table`. Let me know if you need further clarification or assistance! + + + + + + diff --git a/config-files/testScript.go b/config-files/testScript.go new file mode 100644 index 0000000..601a424 --- /dev/null +++ b/config-files/testScript.go @@ -0,0 +1,323 @@ +package main + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +func main() { + // Set up the DynamoDB client to point to the proxy + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String("us-west-2"), // Replace with your region if needed + Endpoint: aws.String("http://localhost:9050/v1"), // Proxy URL + })) + + // Create DynamoDB service client + svc := dynamodb.New(sess) + fmt.Println("svc", svc) + + //createItem(svc) //PutItem + readItems(svc) //Scan + //updateItem(svc) //UpdateItem + //deleteItem(svc) //DeleteItem + //getItem(svc) //GetItem + //queryItems(svc) //QueryItem + //batchGetItem(svc) //BatchGetItem + //batchWriteItem(svc) //BatchWriteItem +} + +func createItem(svc *dynamodb.DynamoDB) { + + item := map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("123"), + }, + "emp_name": { + S: aws.String("test"), + }, + "isHired": { + BOOL: aws.Bool(false), + }, + "emp_image": { + B: []byte("binary_data_here"), + }, + } + + input := &dynamodb.PutItemInput{ + TableName: aws.String("employee_table"), + Item: item, + } + + // Perform the PutItem operation + _, err := svc.PutItem(input) + if err != nil { + fmt.Println("Error putting item:", err) + return + } + + fmt.Println("Successfully added item:", item) +} + +func checkItemExists(svc *dynamodb.DynamoDB, empID string) bool { + input := &dynamodb.GetItemInput{ + TableName: aws.String("employee_table"), + Key: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String(empID), + }, + }, + } + + result, err := svc.GetItem(input) + if err != nil { + fmt.Println("Error checking item:", err) + return false + } + + return result.Item != nil +} + +func readItems(svc *dynamodb.DynamoDB) { + // Define the input for the Scan operation + input := &dynamodb.ScanInput{ + TableName: aws.String("employee_table"), // Table name + } + + // Perform the Scan operation + result, err := svc.Scan(input) + if err != nil { + fmt.Println("Error performing scan:", err) + return + } + + // Print the scan results + fmt.Println("Scan result:", result) +} + +func updateItem(svc *dynamodb.DynamoDB) { + //currentTime := time.Now().UTC().Format(time.RFC3339) + key := map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("123"), + }, + } + + // Use the correct timestamp format (RFC3339) + update := &dynamodb.UpdateItemInput{ + TableName: aws.String("employee_table"), + Key: key, + UpdateExpression: aws.String("SET emp_name = :crl"), + ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ + ":crl": { + S: aws.String("value"), // New value for contact_ranking_list + }, + }, + ReturnValues: aws.String("UPDATED_NEW"), + } + + // Perform the UpdateItem operation + result, err := svc.UpdateItem(update) + if err != nil { + fmt.Println("Error updating item:", err) + return + } + + // Print the result of the update + fmt.Println("Successfully updated item:", result) +} + +func deleteItem(svc *dynamodb.DynamoDB) { + // Define the input for the DeleteItem operation + input := &dynamodb.DeleteItemInput{ + TableName: aws.String("employee_table"), + Key: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("123"), + }, + }, + } + fmt.Println(input) + // Perform the DeleteItem operation + _, err := svc.DeleteItem(input) + if err != nil { + fmt.Println("Error deleting item:", err) + return + } + + // Print a success message + fmt.Println("Successfully deleted item with guid:", "00XK5C0X6112TMBON4B2A5F88CYT548") +} + +func getItem(svc *dynamodb.DynamoDB) { + // Define the key for the GetItem operation + input := &dynamodb.GetItemInput{ + TableName: aws.String("employee_table"), // Table name + Key: map[string]*dynamodb.AttributeValue{ + "emp_id": { + S: aws.String("123"), // Primary Key + }, + }, + } + + // Perform the GetItem operation + result, err := svc.GetItem(input) + if err != nil { + fmt.Println("Error getting item:", err) + return + } + + // Print the GetItem result + fmt.Println("GetItem result:", result.Item) +} + +func queryItems(svc *dynamodb.DynamoDB) { + // Define the input for the Query operation + input := &dynamodb.QueryInput{ + TableName: aws.String("employee_table"), + KeyConditionExpression: aws.String("emp_id = :emp_id"), + ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ + ":emp_id": { + N: aws.String("123"), // Primary Key + }, + }, + } + + // Perform the Query operation + result, err := svc.Query(input) + if err != nil { + fmt.Println("Error performing query:", err) + return + } + + // Print the Query results + fmt.Println("Query result:", result.Items) +} + +func batchGetItem(svc *dynamodb.DynamoDB) { + // Define the input for the BatchGetItem operation + input := &dynamodb.BatchGetItemInput{ + RequestItems: map[string]*dynamodb.KeysAndAttributes{ + "employee_table": { + Keys: []map[string]*dynamodb.AttributeValue{ + { + "emp_id": { + N: aws.String("124"), + }, + }, + { + "emp_id": { + N: aws.String("156"), + }, + }, + }, + }, + }, + } + + // Perform the BatchGetItem operation + result, err := svc.BatchGetItem(input) + if err != nil { + fmt.Println("Error performing batch get:", err) + return + } + + // Print the BatchGetItem result + for tableName, items := range result.Responses { + fmt.Printf("Table: %s\n", tableName) + for _, item := range items { + fmt.Println(item) + } + } +} + +func batchWriteItem(svc *dynamodb.DynamoDB) { + // Define the input for the BatchWriteItem operation + input := &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]*dynamodb.WriteRequest{ + "employee_table": { + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("124"), + }, + "emp_name": { + S: aws.String("jacob"), + }, + "isHired": { + BOOL: aws.Bool(true), + }, + }, + }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("156"), + }, + "emp_name": { + S: aws.String("sara"), + }, + "emp_image": { + B: []byte("binary_data_here"), + }, + "isHired": { + BOOL: aws.Bool(true), + }, + }, + }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("15666"), + }, + "emp_name": { + S: aws.String("jennifer"), + }, + "emp_image": { + B: []byte("binary_data_here"), + }, + "isHired": { + BOOL: aws.Bool(true), + }, + }, + }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "emp_id": { + N: aws.String("1564"), + }, + "emp_name": { + S: aws.String("john"), + }, + "emp_image": { + B: []byte("binary_data_here"), + }, + "isHired": { + BOOL: aws.Bool(true), + }, + }, + }, + }, + }, + }, + } + + // Perform the BatchWriteItem operation + _, err := svc.BatchWriteItem(input) + if err != nil { + fmt.Println("Error performing batch write:", err) + return + } + + // Print success message + fmt.Println("Successfully performed batch write operation") +} From c95f953573efe58771eb3455c3898c75cc8de22e Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Fri, 20 Dec 2024 11:05:17 +0530 Subject: [PATCH 6/7] refactored --- {config-files => samples}/README.MD | 0 config-files/testScript.go => samples/main.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {config-files => samples}/README.MD (100%) rename config-files/testScript.go => samples/main.go (100%) diff --git a/config-files/README.MD b/samples/README.MD similarity index 100% rename from config-files/README.MD rename to samples/README.MD diff --git a/config-files/testScript.go b/samples/main.go similarity index 100% rename from config-files/testScript.go rename to samples/main.go From 6b80cc9fa576ed4c1b13d22ed05f2516ed88bfbc Mon Sep 17 00:00:00 2001 From: nikitajain1998 Date: Thu, 26 Dec 2024 23:56:47 +0530 Subject: [PATCH 7/7] updated readme file --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 953065b..d6c0468 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ Configure AWS credentials: aws configure set aws_access_key_id YOUR_ACCESS_KEY aws configure set aws_secret_access_key YOUR_SECRET_KEY aws configure set default.region YOUR_REGION +aws configure set aws_session_token YOUR_SESSION_TOKEN ``` Google Cloud CLI: Authenticate and set up your environment: