Skip to content

Commit

Permalink
v2 BlobMetadataStore operations for BlobVerificationInfos (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 11, 2024
1 parent 9891981 commit 3453468
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 48 deletions.
5 changes: 3 additions & 2 deletions api/clients/config_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package clients

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Claude generated tests... don't blame the copy paster.
Expand Down
68 changes: 45 additions & 23 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (

var (
once sync.Once
clientRef *Client
clientRef *client
ErrConditionFailed = errors.New("condition failed")
)

Expand All @@ -47,12 +47,34 @@ type QueryResult struct {
LastEvaluatedKey Key
}

type Client struct {
type Client interface {
DeleteTable(ctx context.Context, tableName string) error
PutItem(ctx context.Context, tableName string, item Item) error
PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string, expressionAttributeNames map[string]string, expressionAttributeValues map[string]types.AttributeValue) error
PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error)
UpdateItem(ctx context.Context, tableName string, key Key, item Item) (Item, error)
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error)
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error)
QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) (int32, error)
QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error)
DeleteItem(ctx context.Context, tableName string, key Key) error
DeleteItems(ctx context.Context, tableName string, keys []Key) ([]Key, error)
TableExists(ctx context.Context, name string) error
}

type client struct {
dynamoClient *dynamodb.Client
logger logging.Logger
}

func NewClient(cfg commonaws.ClientConfig, logger logging.Logger) (*Client, error) {
var _ Client = (*client)(nil)

func NewClient(cfg commonaws.ClientConfig, logger logging.Logger) (*client, error) {
var err error
once.Do(func() {
createClient := func(service, region string, options ...interface{}) (aws.Endpoint, error) {
Expand Down Expand Up @@ -85,12 +107,12 @@ func NewClient(cfg commonaws.ClientConfig, logger logging.Logger) (*Client, erro
return
}
dynamoClient := dynamodb.NewFromConfig(awsConfig)
clientRef = &Client{dynamoClient: dynamoClient, logger: logger.With("component", "DynamodbClient")}
clientRef = &client{dynamoClient: dynamoClient, logger: logger.With("component", "DynamodbClient")}
})
return clientRef, err
}

func (c *Client) DeleteTable(ctx context.Context, tableName string) error {
func (c *client) DeleteTable(ctx context.Context, tableName string) error {
_, err := c.dynamoClient.DeleteTable(ctx, &dynamodb.DeleteTableInput{
TableName: aws.String(tableName)})
if err != nil {
Expand All @@ -99,7 +121,7 @@ func (c *Client) DeleteTable(ctx context.Context, tableName string) error {
return nil
}

func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err error) {
func (c *client) PutItem(ctx context.Context, tableName string, item Item) (err error) {
_, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(tableName), Item: item,
})
Expand All @@ -109,7 +131,7 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err
return nil
}

func (c *Client) PutItemWithCondition(
func (c *client) PutItemWithCondition(
ctx context.Context,
tableName string,
item Item,
Expand All @@ -135,11 +157,11 @@ func (c *Client) PutItemWithCondition(

// PutItems puts items in batches of 25 items (which is a limit DynamoDB imposes)
// It returns the items that failed to be put.
func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) {
func (c *client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) {
return c.writeItems(ctx, tableName, items, update)
}

func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item Item) (Item, error) {
func (c *client) UpdateItem(ctx context.Context, tableName string, key Key, item Item) (Item, error) {
update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
// Ignore primary key updates
Expand Down Expand Up @@ -170,7 +192,7 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
return resp.Attributes, err
}

func (c *Client) UpdateItemWithCondition(
func (c *client) UpdateItemWithCondition(
ctx context.Context,
tableName string,
key Key,
Expand Down Expand Up @@ -214,7 +236,7 @@ func (c *Client) UpdateItemWithCondition(
}

// IncrementBy increments the attribute by the value for item that matches with the key
func (c *Client) IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error) {
func (c *client) IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error) {
// ADD numeric values
f, err := strconv.ParseFloat(strconv.FormatUint(value, 10), 64)
if err != nil {
Expand Down Expand Up @@ -243,7 +265,7 @@ func (c *Client) IncrementBy(ctx context.Context, tableName string, key Key, att
return resp.Attributes, nil
}

func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item, error) {
func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item, error) {
resp, err := c.dynamoClient.GetItem(ctx, &dynamodb.GetItemInput{Key: key, TableName: aws.String(tableName)})
if err != nil {
return nil, err
Expand All @@ -254,7 +276,7 @@ func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item,

// GetItems returns the items for the given keys
// Note: ordering of items is not guaranteed
func (c *Client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys)
if err != nil {
return nil, err
Expand All @@ -264,7 +286,7 @@ func (c *Client) GetItems(ctx context.Context, tableName string, keys []Key) ([]
}

// QueryIndex returns all items in the index that match the given key
func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error) {
func (c *client) QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
Expand All @@ -279,7 +301,7 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
}

// Query returns all items in the primary index that match the given expression
func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error) {
func (c *client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditionExpression: aws.String(keyCondition),
Expand All @@ -293,7 +315,7 @@ func (c *Client) Query(ctx context.Context, tableName string, keyCondition strin
}

// QueryWithInput is a wrapper for the Query function that allows for a custom query input
func (c *Client) QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error) {
func (c *client) QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, input)
if err != nil {
return nil, err
Expand All @@ -302,7 +324,7 @@ func (c *Client) QueryWithInput(ctx context.Context, input *dynamodb.QueryInput)
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) (int32, error) {
func (c *client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
Expand All @@ -320,7 +342,7 @@ func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexNam
// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is 0, all items are returned
func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
func (c *client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
var queryInput *dynamodb.QueryInput

// Fetch all items if limit is 0
Expand Down Expand Up @@ -362,7 +384,7 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string,
}, nil
}

func (c *Client) DeleteItem(ctx context.Context, tableName string, key Key) error {
func (c *client) DeleteItem(ctx context.Context, tableName string, key Key) error {
_, err := c.dynamoClient.DeleteItem(ctx, &dynamodb.DeleteItemInput{Key: key, TableName: aws.String(tableName)})
if err != nil {
return err
Expand All @@ -373,15 +395,15 @@ func (c *Client) DeleteItem(ctx context.Context, tableName string, key Key) erro

// DeleteItems deletes items in batches of 25 items (which is a limit DynamoDB imposes)
// It returns the items that failed to be deleted.
func (c *Client) DeleteItems(ctx context.Context, tableName string, keys []Key) ([]Key, error) {
func (c *client) DeleteItems(ctx context.Context, tableName string, keys []Key) ([]Key, error) {
return c.writeItems(ctx, tableName, keys, delete)
}

// writeItems writes items in batches of 25 items (which is a limit DynamoDB imposes)
// update and delete operations are supported.
// For update operation, requestItems is []Item.
// For delete operation, requestItems is []Key.
func (c *Client) writeItems(ctx context.Context, tableName string, requestItems []map[string]types.AttributeValue, operation batchOperation) ([]map[string]types.AttributeValue, error) {
func (c *client) writeItems(ctx context.Context, tableName string, requestItems []map[string]types.AttributeValue, operation batchOperation) ([]map[string]types.AttributeValue, error) {
startIndex := 0
failedItems := make([]map[string]types.AttributeValue, 0)
for startIndex < len(requestItems) {
Expand Down Expand Up @@ -422,7 +444,7 @@ func (c *Client) writeItems(ctx context.Context, tableName string, requestItems
return failedItems, nil
}

func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
startIndex := 0
items := make([]Item, 0)
for startIndex < len(keys) {
Expand Down Expand Up @@ -457,7 +479,7 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([
}

// TableExists checks if a table exists and can be described
func (c *Client) TableExists(ctx context.Context, name string) error {
func (c *client) TableExists(ctx context.Context, name string) error {
if name == "" {
return errors.New("table name is empty")
}
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var (
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
dynamoClient *commondynamodb.Client
dynamoClient commondynamodb.Client
clientConfig commonaws.ClientConfig

deployLocalStack bool
Expand Down
105 changes: 105 additions & 0 deletions common/aws/mock/dynamodb_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/mock"
)

type MockDynamoDBClient struct {
mock.Mock
}

var _ dynamodb.Client = (*MockDynamoDBClient)(nil)

func (c *MockDynamoDBClient) DeleteTable(ctx context.Context, tableName string) error {
args := c.Called()
return args.Error(0)
}

func (c *MockDynamoDBClient) PutItem(ctx context.Context, tableName string, item dynamodb.Item) error {
args := c.Called()
return args.Error(0)
}

func (c *MockDynamoDBClient) PutItemWithCondition(ctx context.Context, tableName string, item dynamodb.Item, condition string, expressionAttributeNames map[string]string, expressionAttributeValues map[string]types.AttributeValue) error {
args := c.Called()
return args.Error(0)
}

func (c *MockDynamoDBClient) PutItems(ctx context.Context, tableName string, items []dynamodb.Item) ([]dynamodb.Item, error) {
args := c.Called(ctx, tableName, items)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) UpdateItem(ctx context.Context, tableName string, key dynamodb.Key, item dynamodb.Item) (dynamodb.Item, error) {
args := c.Called()
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) UpdateItemWithCondition(ctx context.Context, tableName string, key dynamodb.Key, item dynamodb.Item, condition expression.ConditionBuilder) (dynamodb.Item, error) {
args := c.Called()
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) IncrementBy(ctx context.Context, tableName string, key dynamodb.Key, attr string, value uint64) (dynamodb.Item, error) {
args := c.Called()
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key dynamodb.Key) (dynamodb.Item, error) {
args := c.Called()
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues dynamodb.ExpressionValues) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues dynamodb.ExpressionValues) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) QueryWithInput(ctx context.Context, input *awsdynamodb.QueryInput) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues dynamodb.ExpressionValues) (int32, error) {
args := c.Called()
return args.Get(0).(int32), args.Error(1)
}

func (c *MockDynamoDBClient) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues dynamodb.ExpressionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (dynamodb.QueryResult, error) {
args := c.Called()
return args.Get(0).(dynamodb.QueryResult), args.Error(1)
}

func (c *MockDynamoDBClient) DeleteItem(ctx context.Context, tableName string, key dynamodb.Key) error {
args := c.Called()
return args.Error(0)
}

func (c *MockDynamoDBClient) DeleteItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Key, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Key), args.Error(1)
}

func (c *MockDynamoDBClient) TableExists(ctx context.Context, name string) error {
args := c.Called()
return args.Error(0)
}
File renamed without changes.
9 changes: 5 additions & 4 deletions common/aws/test/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package test

import (
"context"
"math/rand"
"os"
"testing"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/aws/mock"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/common/mock"
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"math/rand"
"os"
"testing"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions common/store/dynamo_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
)

type dynamodbBucketStore[T any] struct {
client *commondynamodb.Client
client commondynamodb.Client
tableName string
}

func NewDynamoParamStore[T any](client *commondynamodb.Client, tableName string) common.KVStore[T] {
func NewDynamoParamStore[T any](client commondynamodb.Client, tableName string) common.KVStore[T] {
return &dynamodbBucketStore[T]{
client: client,
tableName: tableName,
Expand Down
2 changes: 1 addition & 1 deletion common/store/dynamo_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
deployLocalStack bool
localStackPort = "4566"

dynamoClient *dynamodb.Client
dynamoClient dynamodb.Client
dynamoParamStore common.KVStore[common.RateBucketParams]
bucketTableName = "BucketStore"
)
Expand Down
Loading

0 comments on commit 3453468

Please sign in to comment.