Skip to content

Commit 2a14ac4

Browse files
committed
Merge upstream/main into add-discovery-datasets
2 parents dc6fed9 + f6aa23e commit 2a14ac4

16 files changed

+192
-95
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
### Breaking changes
1010

1111
### Bugfixes
12+
* Fix spans used in SQL storage indexer (technical preview). [#1334](https://github.com/elastic/package-registry/pull/1334)
1213

1314
### Added
1415
* Add new method `MustParsePackage` to create new packages, running more validations. [#1333](https://github.com/elastic/package-registry/pull/1333)
15-
* Allow to customize SQL storage indexer configurations (technical preview). [#1337](https://github.coim/elastic/package-registry/pull/1337)
16+
* Allow to customize settings related to SQL storage indexer (technical preview). [#1334](https://github.com/elastic/package-registry/pull/1334) [#1337](https://github.coim/elastic/package-registry/pull/1337)
17+
* Cleanup SQL storage indexer backup database only when a new index version is downloaded (technical preview). [#1337](https://github.coim/elastic/package-registry/pull/1337)
1618
* Add support to discover content packages based on the datasets defined in the discovery parameter. [#1338](https://github.com/elastic/package-registry/pull/1338)
1719

1820
### Deprecated

categories.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ func categoriesHandlerWithProxyMode(logger *zap.Logger, indexer Indexer, proxyMo
9292
return
9393
}
9494

95-
cacheHeaders(w, cacheTime)
96-
jsonHeader(w)
97-
w.Write(data)
95+
serveJSONResponse(r.Context(), w, cacheTime, data)
9896
}
9997
}
10098

@@ -242,3 +240,12 @@ func getCategoriesOutput(ctx context.Context, categories map[string]*packages.Ca
242240

243241
return util.MarshalJSONPretty(outputCategories)
244242
}
243+
244+
func serveJSONResponse(ctx context.Context, w http.ResponseWriter, cacheTime time.Duration, data []byte) {
245+
span, _ := apm.StartSpan(ctx, "Serve JSON Response", "app")
246+
defer span.End()
247+
248+
cacheHeaders(w, cacheTime)
249+
jsonHeader(w)
250+
w.Write(data)
251+
}

dev/docker-compose-epr-gcs.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ services:
2626
- EPR_DISABLE_PACKAGE_VALIDATION=true
2727
- EPR_ADDRESS=0.0.0.0:8080
2828
# - EPR_LOG_LEVEL=debug
29+
# - EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
30+
# - EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
2931
# - EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
3032
# - EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
3133
# - EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m

dev/docker-compose-epr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ services:
1919
- EPR_DISABLE_PACKAGE_VALIDATION=true
2020
- EPR_ADDRESS=0.0.0.0:8080
2121
# - EPR_LOG_LEVEL=debug
22+
# - EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
23+
# - EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
2224
# - EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
2325
# - EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
2426
# - EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m

dev/launch_epr_service_storage_indexer.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ export EPR_CONFIG="${CONFIG_PATH}"
108108
# export EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
109109
# export EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
110110
# export EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m
111+
# export EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
112+
# export EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
111113

112114
./package-registry
113115

index.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ func indexHandler(cacheTime time.Duration) (func(w http.ResponseWriter, r *http.
2626
return nil, err
2727
}
2828
return func(w http.ResponseWriter, r *http.Request) {
29-
w.Header().Set("Content-Type", "application/json")
30-
cacheHeaders(w, cacheTime)
31-
w.Write(body)
29+
serveJSONResponse(r.Context(), w, cacheTime, body)
3230
}, nil
3331
}

internal/database/memoryrepository.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
_ "modernc.org/sqlite" // Import the SQLite driver
1313
)
1414

15-
func NewMemorySQLDB(path string) (*SQLiteRepository, error) {
15+
type MemorySQLDBOptions struct {
16+
Path string
17+
}
18+
19+
func NewMemorySQLDB(options MemorySQLDBOptions) (*SQLiteRepository, error) {
1620
db, err := sql.Open("sqlite", ":memory:")
1721
if err != nil {
1822
return nil, fmt.Errorf("failed to open database: %w", err)
@@ -22,14 +26,14 @@ func NewMemorySQLDB(path string) (*SQLiteRepository, error) {
2226
return nil, fmt.Errorf("failed to connect to database: %w", err)
2327
}
2428

25-
dbRepo, err := newSQLiteRepository(db)
29+
dbRepo, err := newSQLiteRepository(sqlDBOptions{db: db})
2630
if err != nil {
2731
return nil, fmt.Errorf("failed to create SQLite repository: %w", err)
2832
}
2933

3034
if err := dbRepo.Initialize(context.Background()); err != nil {
3135
return nil, fmt.Errorf("failed to create database: %w", err)
3236
}
33-
dbRepo.path = "memory-" + path
37+
dbRepo.path = "memory-" + options.Path
3438
return dbRepo, nil
3539
}

internal/database/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package database
66

77
type Package struct {
8+
Cursor string
89
Name string
910
Version string
1011
FormatVersion string

internal/database/sqliterepository.go

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type keyDefinition struct {
2929
}
3030

3131
var keys = []keyDefinition{
32+
{"cursor", "TEXT NOT NULL"},
3233
{"name", "TEXT NOT NULL"},
3334
{"version", "TEXT NOT NULL"},
3435
{"formatVersion", "TEXT NOT NULL"},
@@ -47,20 +48,25 @@ var keys = []keyDefinition{
4748
const defaultMaxBulkAddBatch = 2000
4849

4950
type SQLiteRepository struct {
50-
db *sql.DB
51-
path string
52-
maxBulkAddBatch int
53-
numberFields int
51+
db *sql.DB
52+
path string
53+
maxBulkAddBatchSize int
54+
maxTotalArgs int
5455
}
5556

5657
var _ Repository = new(SQLiteRepository)
5758

58-
func NewFileSQLDB(path string) (*SQLiteRepository, error) {
59+
type FileSQLDBOptions struct {
60+
Path string
61+
BatchSizeInserts int
62+
}
63+
64+
func NewFileSQLDB(options FileSQLDBOptions) (*SQLiteRepository, error) {
5965
// NOTE: Even using sqlcache (with Ristretto or Redis), data column needs to be processed (Unmarshalled)
6066
// again for all the Get queries performed, so there is no advantage in time of using sqlcache with SQLite
6167
// for our use case.
6268

63-
db, err := sql.Open("sqlite", path)
69+
db, err := sql.Open("sqlite", options.Path)
6470
if err != nil {
6571
return nil, fmt.Errorf("failed to open database: %w", err)
6672
}
@@ -69,19 +75,27 @@ func NewFileSQLDB(path string) (*SQLiteRepository, error) {
6975
return nil, fmt.Errorf("failed to connect to database: %w", err)
7076
}
7177

72-
dbRepo, err := newSQLiteRepository(db)
78+
dbRepo, err := newSQLiteRepository(sqlDBOptions{
79+
db: db,
80+
batchSizeInserts: options.BatchSizeInserts,
81+
})
7382
if err != nil {
7483
return nil, fmt.Errorf("failed to create SQLite repository: %w", err)
7584
}
7685
if err := dbRepo.Initialize(context.Background()); err != nil {
7786
return nil, fmt.Errorf("failed to create database: %w", err)
7887
}
79-
dbRepo.path = path
88+
dbRepo.path = options.Path
8089

8190
return dbRepo, nil
8291
}
8392

84-
func newSQLiteRepository(db *sql.DB) (*SQLiteRepository, error) {
93+
type sqlDBOptions struct {
94+
db *sql.DB
95+
batchSizeInserts int
96+
}
97+
98+
func newSQLiteRepository(options sqlDBOptions) (*SQLiteRepository, error) {
8599
// https://www.sqlite.org/pragma.html#pragma_journal_mode
86100
// Not observed any performance difference with WAL mode, so keeping it as DELETE mode for now.
87101
// if _, err = db.Exec("PRAGMA journal_mode = WAL;"); err != nil {
@@ -106,10 +120,14 @@ func newSQLiteRepository(db *sql.DB) (*SQLiteRepository, error) {
106120
// if _, err := db.Exec("PRAGMA cache_size = -10000;"); err != nil {
107121
// return nil, fmt.Errorf("failed to update cache_size: %w", err)
108122
// }
123+
batchSize := defaultMaxBulkAddBatch
124+
if options.batchSizeInserts > 0 {
125+
batchSize = options.batchSizeInserts
126+
}
109127
return &SQLiteRepository{
110-
db: db,
111-
maxBulkAddBatch: defaultMaxBulkAddBatch,
112-
numberFields: len(keys),
128+
db: options.db,
129+
maxBulkAddBatchSize: batchSize,
130+
maxTotalArgs: batchSize * len(keys),
113131
}, nil
114132
}
115133

@@ -119,6 +137,7 @@ func (r *SQLiteRepository) File(ctx context.Context) string {
119137

120138
func (r *SQLiteRepository) Ping(ctx context.Context) error {
121139
span, ctx := apm.StartSpan(ctx, "SQL: Ping", "app")
140+
span.Context.SetLabel("database.path", r.File(ctx))
122141
defer span.End()
123142
if r.db == nil {
124143
return errors.New("database is not initialized")
@@ -131,14 +150,15 @@ func (r *SQLiteRepository) Ping(ctx context.Context) error {
131150

132151
func (r *SQLiteRepository) Initialize(ctx context.Context) error {
133152
span, ctx := apm.StartSpan(ctx, "SQL: Initialize", "app")
153+
span.Context.SetLabel("database.path", r.File(ctx))
134154
defer span.End()
135155
createQuery := strings.Builder{}
136156
createQuery.WriteString("CREATE TABLE IF NOT EXISTS ")
137157
createQuery.WriteString("packages (")
138158
for _, i := range keys {
139159
createQuery.WriteString(fmt.Sprintf("%s %s, ", i.Name, i.SQLType))
140160
}
141-
createQuery.WriteString("PRIMARY KEY (name, version));")
161+
createQuery.WriteString("PRIMARY KEY (cursor, name, version));")
142162
if _, err := r.db.ExecContext(ctx, createQuery.String()); err != nil {
143163
return err
144164
}
@@ -159,19 +179,18 @@ func (r *SQLiteRepository) Initialize(ctx context.Context) error {
159179

160180
func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []*Package) error {
161181
span, ctx := apm.StartSpan(ctx, "SQL: Insert batches", "app")
182+
span.Context.SetLabel("insert.batch.size", r.maxBulkAddBatchSize)
183+
span.Context.SetLabel("database.path", r.File(ctx))
162184
defer span.End()
163185

164186
if len(pkgs) == 0 {
165187
return nil
166188
}
167189

168190
totalProcessed := 0
169-
args := make([]any, 0, r.maxBulkAddBatch*r.numberFields)
191+
args := make([]any, 0, r.maxTotalArgs)
170192
for {
171193
read := 0
172-
// reuse args slice
173-
args = args[:0]
174-
175194
var sb strings.Builder
176195
sb.WriteString("INSERT INTO ")
177196
sb.WriteString(database)
@@ -184,7 +203,7 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
184203
}
185204
sb.WriteString(") values")
186205

187-
endBatch := totalProcessed + r.maxBulkAddBatch
206+
endBatch := totalProcessed + r.maxBulkAddBatchSize
188207
for i := totalProcessed; i < endBatch && i < len(pkgs); i++ {
189208
sb.WriteString("(")
190209
for j := range keys {
@@ -206,6 +225,7 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
206225
discoveryFields := addCommasToString(pkgs[i].DiscoveryFields)
207226

208227
args = append(args,
228+
pkgs[i].Cursor,
209229
pkgs[i].Name,
210230
pkgs[i].Version,
211231
pkgs[i].FormatVersion,
@@ -240,6 +260,9 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
240260
if totalProcessed >= len(pkgs) {
241261
break
242262
}
263+
264+
// reuse args slice
265+
args = args[:0]
243266
}
244267

245268
return nil
@@ -263,6 +286,7 @@ func addCommasToString(s string) string {
263286

264287
func (r *SQLiteRepository) All(ctx context.Context, database string, whereOptions WhereOptions) ([]*Package, error) {
265288
span, ctx := apm.StartSpan(ctx, "SQL: Get All", "app")
289+
span.Context.SetLabel("database.path", r.File(ctx))
266290
defer span.End()
267291

268292
var all []*Package
@@ -276,6 +300,7 @@ func (r *SQLiteRepository) All(ctx context.Context, database string, whereOption
276300

277301
func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOptions WhereOptions, process func(ctx context.Context, pkg *Package) error) error {
278302
span, ctx := apm.StartSpan(ctx, "SQL: Get All (process each package)", "app")
303+
span.Context.SetLabel("database.path", r.File(ctx))
279304
defer span.End()
280305

281306
useBaseData := whereOptions == nil || !whereOptions.UseFullData()
@@ -301,8 +326,6 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
301326
clause, whereArgs = whereOptions.Where()
302327
query.WriteString(clause)
303328
}
304-
// TODO: remove debug
305-
// fmt.Println("Query:", query.String())
306329
rows, err := r.db.QueryContext(ctx, query.String(), whereArgs...)
307330
if err != nil {
308331
return err
@@ -313,6 +336,7 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
313336
var pkg Package
314337
for rows.Next() {
315338
if err := rows.Scan(
339+
&pkg.Cursor,
316340
&pkg.Name,
317341
&pkg.Version,
318342
&pkg.FormatVersion,
@@ -345,6 +369,7 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
345369

346370
func (r *SQLiteRepository) Drop(ctx context.Context, table string) error {
347371
span, ctx := apm.StartSpan(ctx, "SQL: Drop", "app")
372+
span.Context.SetLabel("database.path", r.File(ctx))
348373
defer span.End()
349374
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", table)
350375
_, err := r.db.ExecContext(ctx, query)
@@ -370,16 +395,34 @@ type FilterOptions struct {
370395
type SQLOptions struct {
371396
Filter *FilterOptions
372397

398+
CurrentCursor string
399+
373400
IncludeFullData bool // If true, the query will return the full data field instead of the base data field
374401
}
375402

376403
func (o *SQLOptions) Where() (string, []any) {
377-
if o == nil || o.Filter == nil {
404+
if o == nil {
378405
return "", nil
379406
}
380407
var sb strings.Builder
381408
var args []any
409+
// Always filter by cursor
410+
if o.CurrentCursor != "" {
411+
sb.WriteString("cursor = ?")
412+
args = append(args, o.CurrentCursor)
413+
}
414+
415+
if o.Filter == nil {
416+
if sb.Len() == 0 {
417+
return "", nil
418+
}
419+
return fmt.Sprintf(" WHERE %s", sb.String()), args
420+
}
421+
382422
if o.Filter.Type != "" {
423+
if sb.Len() > 0 {
424+
sb.WriteString(" AND ")
425+
}
383426
sb.WriteString("type = ?")
384427
args = append(args, o.Filter.Type)
385428
}

internal/storage/index.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func LoadPackagesAndCursorFromIndex(ctx context.Context, logger *zap.Logger, sto
116116
return anIndex, storageCursor.Current, nil
117117
}
118118

119-
func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor, batchSize int, process func(packages.Packages) error) error {
119+
func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor, batchSize int, process func(context.Context, packages.Packages, string) error) error {
120120
span, ctx := apm.StartSpan(ctx, "LoadSearchIndexAll", "app")
121121
span.Context.SetLabel("load.method", "batches")
122122
span.Context.SetLabel("load.batch.size", batchSize)
@@ -174,7 +174,7 @@ func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageC
174174
count++
175175

176176
if count >= batchSize {
177-
err = process(packages)
177+
err = process(ctx, packages, aCursor.Current)
178178
if err != nil {
179179
return fmt.Errorf("error processing batch of packages: %w", err)
180180
}
@@ -193,15 +193,15 @@ func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageC
193193
}
194194
}
195195
if len(packages) > 0 {
196-
err = process(packages)
196+
err = process(ctx, packages, aCursor.Current)
197197
if err != nil {
198198
return fmt.Errorf("error processing final batch of packages: %w", err)
199199
}
200200
}
201201
return nil
202202
}
203203

204-
func LoadPackagesAndCursorFromIndexBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, storageBucketInternal, currentCursor string, batchSize int, process func(packages.Packages) error) (string, error) {
204+
func LoadPackagesAndCursorFromIndexBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, storageBucketInternal, currentCursor string, batchSize int, process func(context.Context, packages.Packages, string) error) (string, error) {
205205
bucketName, rootStoragePath, err := extractBucketNameFromURL(storageBucketInternal)
206206
if err != nil {
207207
return "", fmt.Errorf("can't extract bucket name from URL (url: %s): %w", storageBucketInternal, err)

0 commit comments

Comments
 (0)