Skip to content

Commit

Permalink
Add GH workflow and goreleaser, run gofmt
Browse files Browse the repository at this point in the history
  • Loading branch information
bobby569 committed Jul 12, 2024
1 parent 09ce9e4 commit 166be03
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 70 deletions.
18 changes: 18 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Test

on: [push]

jobs:
go-test:
runs-on: ubuntu-latest
steps:
- name: set up go
uses: actions/setup-go@v2
with:
go-version: 1.22
- name: checkout the code
uses: actions/checkout@v2
- name: check format
run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi
# - name: test
# run: make test
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
.idea
dynamodb-sync

dist/
23 changes: 23 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

version: 2

before:
hooks:
- rm -rf ./dist
- go mod tidy
- go generate ./...

builds:
- env: [CGO_ENABLED=0]
goos:
- linux
goarch:
- amd64
- arm64

changelog:
sort: asc
filters:
exclude:
- "^docs:"
- "^test:"
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.PHONY: clean
clean:
rm -f dynamodb-sync

.PHONY: test
test:
go test -v ./...

.PHONY: build
build: clean
GOOS=linux GOARCH=amd64 go build
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# dynamodb-sync
[![Test build status](https://github.com/thumbtack/dynamodb-sync/workflows/Test/badge.svg)](https://github.com/thumbtack/dynamodb-sync/actions?query=workflow%3ATest)

## dynamodb-sync
3 changes: 2 additions & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (sync *syncState) isCheckpointFound(key primaryKey) bool {

// Update the checkpoint for `key's` local state
// sync : timestamp,
// checkpoint[`shardId`]: `sequenceNumber`
//
// checkpoint[`shardId`]: `sequenceNumber`
func (sync *syncState) updateCheckpointLocal(
key primaryKey,
sequenceNumber string,
Expand Down
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/thumbtack/dynamodb-sync

go 1.22

require (
github.com/aws/aws-sdk-go v1.54.1
github.com/sirupsen/logrus v1.9.3
golang.org/x/time v0.5.0
)

require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)
27 changes: 27 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
github.com/aws/aws-sdk-go v1.54.1 h1:+ULL7oLC+v3T00fOMIohUarPI3SR3oyDd6FBEvgdhvs=
github.com/aws/aws-sdk-go v1.54.1/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
34 changes: 10 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ package main
import (
"encoding/json"
"errors"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"strings"

//"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/thumbtack/go/lib/metrics"
//"github.com/thumbtack/go/lib/monitoring"
"io/ioutil"
"net/http"
_ "net/http/pprof"
Expand All @@ -40,6 +36,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand All @@ -61,7 +58,6 @@ var ddbTable = os.Getenv(paramCheckpointTable)
var ddbRegion = os.Getenv(paramCheckpointRegion)
var ddbEndpoint = os.Getenv(paramCheckpointEndpoint)
var ddbClient = ddbConfigConnect(ddbRegion, ddbEndpoint, maxRetries, *logger)
var metricsClient = newMetricsClient()

type config struct {
SrcTable string `json:"src_table"`
Expand All @@ -77,7 +73,7 @@ type config struct {
ReadQps int64 `json:"read_qps"`
WriteQps int64 `json:"write_qps"`
UpdateCheckpointThreshold int `json:"update_checkpoint_threshold"`
EnableStreaming *bool `json:"enable_streaming"`
EnableStreaming *bool `json:"enable_streaming"`
}

// Config file is read and dumped into this struct
Expand Down Expand Up @@ -108,12 +104,12 @@ func NewSyncState(tableConfig config) *syncState {
var stream *dynamodbstreams.DynamoDBStreams

tr := &http.Transport{
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
}
httpClient := &http.Client{
Timeout:8*time.Second,
Transport:tr}
Timeout: 8 * time.Second,
Transport: tr}

srcSess := session.Must(
session.NewSession(
Expand Down Expand Up @@ -169,8 +165,8 @@ func NewSyncState(tableConfig config) *syncState {
}

type appConfig struct {
sync []config
verbose bool
sync []config
verbose bool
}

// The primary key of the Checkpoint ddb table, of the stream etc
Expand Down Expand Up @@ -199,15 +195,6 @@ func ddbConfigConnect(region string, endpoint string, maxRetries int, logger log
)))
}

func newMetricsClient() (client metrics.Client) {
client, err := metrics.NewAlfredAppClient()
if err != nil {
logger.WithFields(logging.Fields{"Error":err}).Error("Error in initializing metrics")
os.Exit(1)
}
return client
}

// app constructor
func NewApp() *appConfig {
logger.SetLevel(logging.InfoLevel)
Expand Down Expand Up @@ -239,8 +226,8 @@ func NewApp() *appConfig {
}

return &appConfig{
sync: tableConfig,
verbose: true,
sync: tableConfig,
verbose: true,
}
}

Expand Down Expand Up @@ -277,7 +264,6 @@ func setDefaults(tableConfig []config) ([]config, error) {
continue
}


if tableConfig[i].ReadQps == 0 {
tableConfig[i].ReadQps = 500
}
Expand Down
9 changes: 5 additions & 4 deletions replicate.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"golang.org/x/time/rate"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand Down Expand Up @@ -196,8 +197,8 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error {
numShards := 0

type shardStats struct {
numShards int
tableName string
numShards int
tableName string
}

type activeShardStats struct {
Expand Down Expand Up @@ -227,7 +228,7 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error {
}

numShards += len(result.StreamDescription.Shards)

for _, shard := range result.StreamDescription.Shards {
sync.checkpointLock.RLock()
_, ok := sync.expiredShards[*shard.ShardId]
Expand Down
2 changes: 1 addition & 1 deletion shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func backoff(i int, s string) {
wait := math.Pow(2, float64(i))
logger.WithFields(logging.Fields{
"Backoff Caller": s,
"Backoff Caller": s,
"Backoff Time(seconds)": wait,
}).Info("Backing off")
time.Sleep(time.Duration(wait) * time.Second)
Expand Down
79 changes: 40 additions & 39 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"errors"
"golang.org/x/time/rate"
"os"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
logging "github.com/sirupsen/logrus"
Expand All @@ -23,36 +24,36 @@ func (sync *syncState) writeBatch(
batch map[string][]*dynamodb.WriteRequest,
key primaryKey, rl *rate.Limiter, reqCapacity float64,
writeBatchSize int64) []*dynamodb.ConsumedCapacity {
i := 0
r := rl.ReserveN(time.Now(), int(reqCapacity))
if !r.OK() {
r = rl.ReserveN(time.Now(), int(writeBatchSize))
}
time.Sleep(r.Delay())
consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0)

for len(batch) > 0 {
output,_ := sync.dstDynamo.BatchWriteItem(
&dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...)

if output.UnprocessedItems != nil {
logger.WithFields(logging.Fields{
"Unprocessed Items Size": len(output.UnprocessedItems),
"Source Table": key.sourceTable,
"Destination Table": key.dstTable,
}).Debug("Some items failed to be processed")
// exponential backoff before retrying
backoff(i, "BatchWrite")
i++
// Retry writing items that were not processed
batch = output.UnprocessedItems
}
i := 0
r := rl.ReserveN(time.Now(), int(reqCapacity))
if !r.OK() {
r = rl.ReserveN(time.Now(), int(writeBatchSize))
}
time.Sleep(r.Delay())
consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0)

for len(batch) > 0 {
output, _ := sync.dstDynamo.BatchWriteItem(
&dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...)

if output.UnprocessedItems != nil {
logger.WithFields(logging.Fields{
"Unprocessed Items Size": len(output.UnprocessedItems),
"Source Table": key.sourceTable,
"Destination Table": key.dstTable,
}).Debug("Some items failed to be processed")
// exponential backoff before retrying
backoff(i, "BatchWrite")
i++
// Retry writing items that were not processed
batch = output.UnprocessedItems
}
return consumedCapacity
}
return consumedCapacity
}

// Group items from the `items` channel into
Expand Down Expand Up @@ -192,13 +193,13 @@ func (sync *syncState) updateCapacity(

var err error
logger.WithFields(logging.Fields{
"Table":tableName,
"New Read Capacity": newThroughput.readCapacity,
"Table": tableName,
"New Read Capacity": newThroughput.readCapacity,
"New Write Capacity": newThroughput.writeCapacity}).Info("Updating capacity")
input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(newThroughput.readCapacity),
ReadCapacityUnits: aws.Int64(newThroughput.readCapacity),
WriteCapacityUnits: aws.Int64(newThroughput.writeCapacity),
},
}
Expand Down Expand Up @@ -274,9 +275,9 @@ func (sync *syncState) getCapacity(tableName string, dynamo *dynamodb.DynamoDB)
} else {
result := output.Table.ProvisionedThroughput
logger.WithFields(logging.Fields{
"Table": tableName,
"Read Capacity": *result.ReadCapacityUnits,
"Write Capacity": *result.WriteCapacityUnits,
"Table": tableName,
"Read Capacity": *result.ReadCapacityUnits,
"Write Capacity": *result.WriteCapacityUnits,
}).Info("Fetched provisioned throughput of table")
return provisionedThroughput{
*result.ReadCapacityUnits,
Expand Down Expand Up @@ -316,9 +317,9 @@ func (sync *syncState) createTable(key primaryKey, properties *dynamodb.Describe
}).Info("Creating table")

input := &dynamodb.CreateTableInput{
TableName: aws.String(sync.tableConfig.DstTable),
KeySchema: properties.Table.KeySchema,
AttributeDefinitions: properties.Table.AttributeDefinitions,
TableName: aws.String(sync.tableConfig.DstTable),
KeySchema: properties.Table.KeySchema,
AttributeDefinitions: properties.Table.AttributeDefinitions,
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: properties.Table.ProvisionedThroughput.ReadCapacityUnits,
WriteCapacityUnits: properties.Table.ProvisionedThroughput.WriteCapacityUnits,
Expand Down

0 comments on commit 166be03

Please sign in to comment.