Skip to content

Commit

Permalink
move index storage into interface (#1741)
Browse files Browse the repository at this point in the history
* move index storage into interface

Signed-off-by: Bob Callaway <bcallaway@google.com>

* fix nits, add unit tests

Signed-off-by: Bob Callaway <bcallaway@google.com>

---------

Signed-off-by: Bob Callaway <bcallaway@google.com>
  • Loading branch information
bobcallaway authored Oct 9, 2023
1 parent bcd004b commit 7fa3cd5
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Memory and file-based signers should only be used for testing.`)

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
_ = rootCmd.PersistentFlags().MarkDeprecated("enable_retrieve_api", "this flag is deprecated in favor of enabled_api_endpoints (searchIndex)")
rootCmd.PersistentFlags().String("search_index.storage_provider", "redis",
`Index Storage provider to use. Valid options are: [redis].`)
rootCmd.PersistentFlags().String("redis_server.address", "127.0.0.1", "Redis server address")
rootCmd.PersistentFlags().Uint16("redis_server.port", 6379, "Redis server port")

Expand Down
24 changes: 15 additions & 9 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/indexstorage"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
Expand Down Expand Up @@ -145,9 +146,10 @@ func NewAPI(treeID uint) (*API, error) {
}

var (
api *API
storageClient storage.AttestationStorage
redisClient *redis.Client
api *API
attestationStorageClient storage.AttestationStorage
indexStorageClient indexstorage.IndexStorage
redisClient *redis.Client
)

func ConfigureAPI(treeID uint) {
Expand All @@ -159,21 +161,25 @@ func ConfigureAPI(treeID uint) {
}
if viper.GetBool("enable_retrieve_api") || viper.GetBool("enable_stable_checkpoint") ||
slices.Contains(viper.GetStringSlice("enabled_api_endpoints"), "searchIndex") {
redisClient = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")),
Network: "tcp",
DB: 0, // default DB
})
indexStorageClient, err = indexstorage.NewIndexStorage(viper.GetString("search_index.storage_provider"))
if err != nil {
log.Logger.Panic(err)
}
}

if viper.GetBool("enable_attestation_storage") {
storageClient, err = storage.NewAttestationStorage()
attestationStorageClient, err = storage.NewAttestationStorage()
if err != nil {
log.Logger.Panic(err)
}
}

if viper.GetBool("enable_stable_checkpoint") {
redisClient = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")),
Network: "tcp",
DB: 0, // default DB
})
checkpointPublisher := witness.NewCheckpointPublisher(context.Background(), api.logClient, api.logRanges.ActiveTreeID(),
viper.GetString("rekor_server.hostname"), api.signer, redisClient, viper.GetUint("publish_frequency"), CheckpointPublishCount)

Expand Down
6 changes: 3 additions & 3 deletions pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func logEntryFromLeaf(ctx context.Context, signer signature.Signer, _ trilliancl
attKey := entryWithAtt.AttestationKey()
// if we're given a key by the type logic, let's try that first
if attKey != "" {
att, fetchErr = storageClient.FetchAttestation(ctx, attKey)
att, fetchErr = attestationStorageClient.FetchAttestation(ctx, attKey)
if fetchErr != nil {
log.ContextLogger(ctx).Debugf("error fetching attestation by key, trying by UUID: %s %v", attKey, fetchErr)
}
}
// if looking up by key failed or we weren't able to generate a key, try looking up by uuid
if attKey == "" || fetchErr != nil {
att, fetchErr = storageClient.FetchAttestation(ctx, entryIDstruct.UUID)
att, fetchErr = attestationStorageClient.FetchAttestation(ctx, entryIDstruct.UUID)
if fetchErr != nil {
log.ContextLogger(ctx).Debugf("error fetching attestation by uuid: %s %v", entryIDstruct.UUID, fetchErr)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
IntegratedTime: swag.Int64(queuedLeaf.IntegrateTimestamp.AsTime().Unix()),
}

if redisClient != nil {
if indexStorageClient != nil {
go func() {
keys, err := entry.IndexKeys()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
malformedUUID = "UUID must be a 64-character hexadecimal string"
malformedPublicKey = "public key provided could not be parsed"
failedToGenerateCanonicalKey = "error generating canonicalized public key"
redisUnexpectedResult = "unexpected result from searching index"
indexStorageUnexpectedResult = "unexpected result from searching index"
lastSizeGreaterThanKnown = "the tree size requested(%d) was greater than what is currently observable(%d)"
signingError = "error signing"
sthGenerateError = "error generating signed tree head"
Expand Down
16 changes: 8 additions & 8 deletions pkg/api/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
if params.Query.Hash != "" {
// This must be a valid hash
sha := util.PrefixSHA(params.Query.Hash)
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(sha), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(sha))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
Expand All @@ -72,16 +72,16 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
}

keyHash := sha256.Sum256(canonicalKey)
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
if params.Query.Email != "" {
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(params.Query.Email.String()), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(params.Query.Email.String()))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
Expand All @@ -100,15 +100,15 @@ func SearchIndexNotImplementedHandler(_ index.SearchIndexParams) middleware.Resp
}

func addToIndex(ctx context.Context, key, value string) error {
_, err := redisClient.LPush(ctx, key, value).Result()
err := indexStorageClient.WriteIndex(ctx, key, value)
if err != nil {
return fmt.Errorf("redis client: %w", err)
}
return nil
}

func storeAttestation(ctx context.Context, uuid string, attestation []byte) error {
return storageClient.StoreAttestation(ctx, uuid, attestation)
return attestationStorageClient.StoreAttestation(ctx, uuid, attestation)
}

// Uniq is a collection of unique elements.
Expand Down
38 changes: 38 additions & 0 deletions pkg/indexstorage/indexstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package indexstorage

import (
"context"
"fmt"

"github.com/sigstore/rekor/pkg/indexstorage/redis"
"github.com/spf13/viper"
)

type IndexStorage interface {
LookupIndices(context.Context, string) ([]string, error) // Returns indices for specified key
WriteIndex(context.Context, string, string) error // Writes index for specified key
}

// NewIndexStorage instantiates a new IndexStorage provider based on the requested type
func NewIndexStorage(providerType string) (IndexStorage, error) {
switch providerType {
case redis.ProviderType:
return redis.NewProvider(viper.GetString("redis_server.address"), viper.GetString("redis_server.port"))
default:
return nil, fmt.Errorf("invalid index storage provider type: %v", providerType)
}
}
62 changes: 62 additions & 0 deletions pkg/indexstorage/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redis

import (
"context"
"errors"
"fmt"
"strings"

redis "github.com/redis/go-redis/v9"
)

const ProviderType = "redis"

// IndexStorageProvider implements indexstorage.IndexStorage
type IndexStorageProvider struct {
client *redis.Client
}

func NewProvider(address, port string) (*IndexStorageProvider, error) {
provider := &IndexStorageProvider{}
provider.client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", address, port),
Network: "tcp",
DB: 0, // default DB
})
return provider, nil
}

// LookupIndices looks up and returns all indices for the specified key. The key value will be canonicalized
// by converting all characters into a lowercase value before looking up in Redis
func (isp *IndexStorageProvider) LookupIndices(ctx context.Context, key string) ([]string, error) {
if isp.client == nil {
return []string{}, errors.New("redis client has not been initialized")
}
return isp.client.LRange(ctx, strings.ToLower(key), 0, -1).Result()
}

// WriteIndex adds the index for the specified key. The key value will be canonicalized
// by converting all characters into a lowercase value before appending the index in Redis
func (isp *IndexStorageProvider) WriteIndex(ctx context.Context, key, index string) error {
if isp.client == nil {
return errors.New("redis client has not been initialized")
}
if _, err := isp.client.LPush(ctx, strings.ToLower(key), index).Result(); err != nil {
return fmt.Errorf("redis client: %w", err)
}
return nil
}
100 changes: 100 additions & 0 deletions pkg/indexstorage/redis/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redis

import (
"context"
"errors"
"testing"

"github.com/go-redis/redismock/v9"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/goleak"
)

func TestLookupIndices(t *testing.T) {
key := "87c1b129fbadd7b6e9abc0a9ef7695436d767aece042bec198a97e949fcbe14c"
value := []string{"1e1f2c881ae0608ec77ebf88a75c66d3099113a7343238f2f7a0ebb91a4ed335"}
redisClient, mock := redismock.NewClientMock()
mock.Regexp().ExpectLRange(key, 0, -1).SetVal(value)

isp := IndexStorageProvider{redisClient}

indices, err := isp.LookupIndices(context.Background(), key)
if err != nil {
t.Error(err)
}

less := func(a, b string) bool { return a < b }
if cmp.Diff(value, indices, cmpopts.SortSlices(less)) != "" {
t.Errorf("expected %s, got %s", value, indices)
}

if err := mock.ExpectationsWereMet(); err != nil {
t.Error(err)
}

mock.ClearExpect()
errRedis := errors.New("redis error")
mock.Regexp().ExpectLRange(key, 0, -1).SetErr(errRedis)
if _, err := isp.LookupIndices(context.Background(), key); err == nil {
t.Error("unexpected success")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Error(err)
}
}

func TestWriteIndex(t *testing.T) {
key := "87c1b129fbadd7b6e9abc0a9ef7695436d767aece042bec198a97e949fcbe14c"
value := []string{"1e1f2c881ae0608ec77ebf88a75c66d3099113a7343238f2f7a0ebb91a4ed335"}
redisClient, mock := redismock.NewClientMock()
mock.Regexp().ExpectLPush(key, value).SetVal(1)

isp := IndexStorageProvider{redisClient}
if err := isp.WriteIndex(context.Background(), key, value[0]); err != nil {
t.Error(err)
}

if err := mock.ExpectationsWereMet(); err != nil {
t.Error(err)
}

mock.ClearExpect()
errRedis := errors.New("redis error")
mock.Regexp().ExpectLPush(key, value).SetErr(errRedis)
if err := isp.WriteIndex(context.Background(), key, value[0]); err == nil {
t.Error("unexpected success")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Error(err)
}
}

func TestUninitializedClient(t *testing.T) {
// this is not initialized with a real Redis client
isp := IndexStorageProvider{}
if _, err := isp.LookupIndices(context.Background(), "key"); err == nil {
t.Error("unexpected success")
}
if err := isp.WriteIndex(context.Background(), "key", "value"); err == nil {
t.Error("unexpected success")
}
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

0 comments on commit 7fa3cd5

Please sign in to comment.