Skip to content

Commit

Permalink
Fixes 4934: module streams for introspected repositories
Browse files Browse the repository at this point in the history
  • Loading branch information
jlsherrill committed Dec 16, 2024
1 parent b78164d commit a3c0075
Show file tree
Hide file tree
Showing 20 changed files with 869 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ packages:
TemplateDao:
config:
filename: "templates_mock.go"
ModuleStreamsDao:
ModuleStreamDao:
config:
filename: "modules_streams_mock.go"
2 changes: 1 addition & 1 deletion db/migrations.latest
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20241113084850
20241202091225
5 changes: 5 additions & 0 deletions db/migrations/20241202091225_AddModuleStreams.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

DROP TABLE IF EXISTS module_streams, repositories_module_streams;

COMMIT;
45 changes: 45 additions & 0 deletions db/migrations/20241202091225_AddModuleStreams.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
BEGIN;

CREATE TABLE IF NOT EXISTS module_streams (
uuid UUID UNIQUE NOT NULL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
name text NOT NULL,
stream text NOT NULL,
version text NOT NULL,
context text NOT NULL,
arch text NOT NULL,
summary text NOT NULL,
description text NOT NULL,
package_names text[] NOT NULL,
packages text[] NOT NULL,
hash_value text NOT NULL,
profiles jsonb NOT NULL DEFAULT '{}'::jsonb
);

CREATE TABLE IF NOT EXISTS repositories_module_streams (
repository_uuid UUID NOT NULL,
module_stream_uuid UUID NOT NULL
);

ALTER TABLE ONLY repositories_module_streams
DROP CONSTRAINT IF EXISTS repositories_module_streams_pkey,
ADD CONSTRAINT repositories_module_streams_pkey PRIMARY KEY (repository_uuid, module_stream_uuid);

ALTER TABLE ONLY repositories_module_streams
DROP CONSTRAINT IF EXISTS fk_repositories_module_streams_mstream,
ADD CONSTRAINT fk_repositories_module_streams_mstream
FOREIGN KEY (module_stream_uuid) REFERENCES module_streams(uuid)
ON DELETE CASCADE;

ALTER TABLE ONLY repositories_module_streams
DROP CONSTRAINT IF EXISTS fk_repositories_module_streams_repository,
ADD CONSTRAINT fk_repositories_module_streams_repository
FOREIGN KEY (repository_uuid) REFERENCES repositories(uuid)
ON DELETE CASCADE;

ALTER TABLE ONLY module_streams
DROP CONSTRAINT IF EXISTS fk_module_streams_uniq,
ADD CONSTRAINT fk_module_streams_uniq UNIQUE (hash_value);

COMMIT;
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
module github.com/content-services/content-sources-backend

go 1.22.7
go 1.23

toolchain go1.23.4

replace github.com/content-services/yummy => /home/jlsherri/git/yummy/

replace github.com/content-services/tang => /home/jlsherri/git/tang/

require (
github.com/ProtonMail/go-crypto v1.1.3
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/module_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ type SearchSnapshotModuleStreamsRequest struct {
Search string `json:"search"` // Search string to search rpm names
}

type SearchModuleStreamsRequest struct {
UUIDs []string `json:"uuids" validate:"required"` // List of repository UUIDs to search
URLs []string `json:"urls" validate:"required"` // List of repository URLs to search
RpmNames []string `json:"rpm_names" validate:"required"` // List of rpm names to search
SortBy string `json:"sort_by"` // SortBy sets the sort order of the result
Search string `json:"search"` // Search string to search rpm names
}

type Stream struct {
Name string `json:"name"` // Name of the module
Stream string `json:"stream"` // Module stream version
Expand Down
13 changes: 8 additions & 5 deletions pkg/dao/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type DaoRegistry struct {
AdminTask AdminTaskDao
Domain DomainDao
PackageGroup PackageGroupDao
ModuleStream ModuleStreamDao
Environment EnvironmentDao
Template TemplateDao
ModuleStreams ModuleStreamsDao
}

func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
Expand All @@ -37,9 +37,9 @@ func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
Rpm: &rpmDaoImpl{
db: db,
},
ModuleStreams: &moduleStreamsImpl{db: db},
Repository: repositoryDaoImpl{db: db},
Metrics: metricsDaoImpl{db: db},
ModuleStream: &moduleStreamsImpl{db: db},
Repository: repositoryDaoImpl{db: db},
Metrics: metricsDaoImpl{db: db},
Snapshot: &snapshotDaoImpl{
db: db,
pulpClient: pulp_client.GetPulpClientWithDomain(""),
Expand Down Expand Up @@ -76,8 +76,11 @@ type RepositoryConfigDao interface {
BulkImport(ctx context.Context, reposToImport []api.RepositoryRequest) ([]api.RepositoryImportResponse, []error)
}

type ModuleStreamsDao interface {
type ModuleStreamDao interface {
SearchRepositoryModuleStreams(ctx context.Context, orgID string, request api.SearchModuleStreamsRequest) (api.SearchModuleStreamsCollectionResponse, error)
SearchSnapshotModuleStreams(ctx context.Context, orgID string, request api.SearchSnapshotModuleStreamsRequest) (api.SearchModuleStreamsCollectionResponse, error)
InsertForRepository(ctx context.Context, repoUuid string, pkgGroups []yum.ModuleMD) (int64, error)
OrphanCleanup(ctx context.Context) error
}

type RpmDao interface {
Expand Down
237 changes: 232 additions & 5 deletions pkg/dao/module_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,115 @@ package dao
import (
"context"
"fmt"
"strings"

"github.com/content-services/content-sources-backend/pkg/api"
"github.com/content-services/content-sources-backend/pkg/config"
ce "github.com/content-services/content-sources-backend/pkg/errors"
"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/tang/pkg/tangy"
"github.com/content-services/yummy/pkg/yum"
"github.com/lib/pq"
"golang.org/x/exp/slices"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func GetModuleStreamsDao(db *gorm.DB) ModuleStreamDao {
// Return DAO instance
return &moduleStreamsImpl{db: db}
}

type moduleStreamsImpl struct {
db *gorm.DB
}

func GetModuleStreamsDao(db *gorm.DB) ModuleStreamsDao {
// Return DAO instance
return &moduleStreamsImpl{
db: db,
func (r *moduleStreamsImpl) SearchRepositoryModuleStreams(ctx context.Context, orgID string, request api.SearchModuleStreamsRequest) (api.SearchModuleStreamsCollectionResponse, error) {
if orgID == "" {
return api.SearchModuleStreamsCollectionResponse{}, fmt.Errorf("orgID can not be an empty string")
}
dbWithCtx := r.db.WithContext(ctx)
if request.RpmNames == nil {
request.RpmNames = []string{}
}
if len(request.UUIDs) == 0 && len(request.URLs) == 0 {
return api.SearchModuleStreamsCollectionResponse{}, &ce.DaoError{
BadValidation: true,
Message: "must contain at least 1 Repository UUID or URL",
}
}

uuids := []string{}
if request.UUIDs != nil {
uuids = request.UUIDs
}

urls := []string{}
for _, url := range request.URLs {
url = models.CleanupURL(url)
urls = append(urls, url)
}

uuidsValid, urlsValid, uuid, url := checkForValidRepoUuidsUrls(ctx, uuids, urls, r.db)
if !uuidsValid {
return api.SearchModuleStreamsCollectionResponse{}, &ce.DaoError{
NotFound: true,
Message: "Could not find repository with UUID: " + uuid,
}
}
if !urlsValid {
return api.SearchModuleStreamsCollectionResponse{}, &ce.DaoError{
NotFound: true,
Message: "Could not find repository with URL: " + url,
}
}

streams := []models.ModuleStream{}

newestStreams := dbWithCtx.Model(&models.ModuleStream{}).
Select("DISTINCT ON (name, stream) uuid").
Joins("inner join repositories_module_streams on module_streams.uuid = repositories_module_streams.module_stream_uuid").
Where("repositories_module_streams.repository_uuid in (?)", readableRepositoryQuery(dbWithCtx, orgID, urls, uuids))

if len(request.RpmNames) > 0 {
// we are checking if two arrays have things in common, so we have to conver to pq array type
newestStreams = newestStreams.Where("module_streams.package_names && ?", pq.Array(request.RpmNames))
}
if request.Search != "" {
newestStreams = newestStreams.Where("module_streams.name ilike ?", fmt.Sprintf("%%%s%%", request.Search))
}
newestStreams = newestStreams.Order("name, stream, version DESC")

order := convertSortByToSQL(request.SortBy, map[string]string{"name": "name"}, "name asc")
result := dbWithCtx.Model(&models.ModuleStream{}).Where("uuid in (?)", newestStreams).Order(fmt.Sprintf("%v, stream", order)).Find(&streams)

if result.Error != nil {
return api.SearchModuleStreamsCollectionResponse{}, result.Error
}
return ModuleStreamsToCollectionResponse(streams), nil
}

func ModuleStreamsToCollectionResponse(modules []models.ModuleStream) (response api.SearchModuleStreamsCollectionResponse) {
mapping := make(map[string][]api.Stream)
for _, mod := range modules {
mapping[mod.Name] = append(mapping[mod.Name], api.Stream{
Name: mod.Name,
Stream: mod.Stream,
Context: mod.Context,
Arch: mod.Arch,
Version: mod.Version,
Description: mod.Description,
Profiles: mod.Profiles,
})
}

for k, v := range mapping {
response.Data = append(response.Data, api.SearchModuleStreams{
ModuleName: k,
Streams: v,
})
}
return response
}

func (r *moduleStreamsImpl) SearchSnapshotModuleStreams(ctx context.Context, orgID string, request api.SearchSnapshotModuleStreamsRequest) (api.SearchModuleStreamsCollectionResponse, error) {
Expand All @@ -31,7 +123,7 @@ func (r *moduleStreamsImpl) SearchSnapshotModuleStreams(ctx context.Context, org
request.RpmNames = []string{}
}

if request.UUIDs == nil || len(request.UUIDs) == 0 {
if len(request.UUIDs) == 0 {
return api.SearchModuleStreamsCollectionResponse{}, &ce.DaoError{
BadValidation: true,
Message: "must contain at least 1 snapshot UUID",
Expand Down Expand Up @@ -102,3 +194,138 @@ func (r *moduleStreamsImpl) SearchSnapshotModuleStreams(ctx context.Context, org
},
}, nil
}

func (r moduleStreamsImpl) fetchRepo(ctx context.Context, uuid string) (models.Repository, error) {
found := models.Repository{}
if err := r.db.WithContext(ctx).
Where("UUID = ?", uuid).
First(&found).
Error; err != nil {
return found, err
}
return found, nil
}

// Converts an rpm NVREA into just the name
func extractRpmName(nvrea string) string {
// rubygem-bson-debugsource-0:4.3.0-2.module+el8.1.0+3656+f80bfa1d.x86_64
split := strings.Split(nvrea, "-")
if len(split) < 3 {
return nvrea
}
split = split[0 : len(split)-2]
return strings.Join(split, "-")
}

func ModuleMdToModuleStreams(moduleMds []yum.ModuleMD) (moduleStreams []models.ModuleStream) {
for _, m := range moduleMds {
mStream := models.ModuleStream{
Name: m.Data.Name,
Stream: m.Data.Stream,
Version: m.Data.Version,
Context: m.Data.Context,
Arch: m.Data.Arch,
Summary: m.Data.Summary,
Description: m.Data.Description,
Profiles: map[string][]string{},
PackageNames: []string{},
Packages: m.Data.Artifacts.Rpms,
}
for _, p := range m.Data.Artifacts.Rpms {
mStream.PackageNames = append(mStream.PackageNames, extractRpmName(p))
}
slices.Sort(mStream.PackageNames) // Sort the package names so the hash is consistent
mStream.HashValue = generateHash(mStream.ToHashString())
for pName, p := range m.Data.Profiles {
mStream.Profiles[pName] = p.Rpms
}

moduleStreams = append(moduleStreams, mStream)
}
return moduleStreams
}

// InsertForRepository inserts a set of yum module streams for a given repository
// and removes any that are not in the list. This will involve inserting the package groups
// if not present, and adding or removing any associations to the Repository
// Returns a count of new package groups added to the system (not the repo), as well as any error
func (r moduleStreamsImpl) InsertForRepository(ctx context.Context, repoUuid string, modules []yum.ModuleMD) (int64, error) {
var (
err error
repo models.Repository
)
ctxDb := r.db.WithContext(ctx)

// Retrieve Repository record
if repo, err = r.fetchRepo(ctx, repoUuid); err != nil {
return 0, fmt.Errorf("failed to fetchRepo: %w", err)
}

moduleStreams := ModuleMdToModuleStreams(modules)

err = ctxDb.Model(&models.ModuleStream{}).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "hash_value"}},
DoNothing: true}).
Create(moduleStreams).Error
if err != nil {
return 0, fmt.Errorf("failed to insert module streams: %w", err)
}

hashes := make([]string, len(moduleStreams))
for _, m := range moduleStreams {
hashes = append(hashes, m.HashValue)
}
uuids := make([]string, len(moduleStreams))

// insert any modules streams, ignoring any hash conflicts
if err = r.db.WithContext(ctx).
Where("hash_value in (?)", hashes).
Model(&models.ModuleStream{}).
Pluck("uuid", &uuids).Error; err != nil {
return 0, fmt.Errorf("failed retrieving existing ids in module_streams: %w", err)
}

// Delete repository module stream entries not needed
err = r.deleteUnneeded(ctx, repo, uuids)
if err != nil {
return 0, fmt.Errorf("failed to delete unneeded module streams: %w", err)
}

// Add any needed repo module stream entries
repoModStreams := make([]models.RepositoryModuleStream, len(moduleStreams))
for i, uuid := range uuids {
repoModStreams[i] = models.RepositoryModuleStream{
RepositoryUUID: repo.UUID,
ModuleStreamUUID: uuid,
}
}
err = ctxDb.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "repository_uuid"}, {Name: "module_stream_uuid"}},
DoNothing: true}).
Create(repoModStreams).Error
if err != nil {
return 0, fmt.Errorf("failed to insert repo module streams: %w", err)
}
return int64(len(repoModStreams)), nil
}

// deleteUnneeded removes any RepositoryPackageGroup entries that are not in the list of package_group_uuids
func (r moduleStreamsImpl) deleteUnneeded(ctx context.Context, repo models.Repository, moduleStreamUUIDs []string) error {
if err := r.db.WithContext(ctx).Model(&models.RepositoryModuleStream{}).
Where("repository_uuid = ?", repo.UUID).
Where("module_stream_uuid NOT IN (?)", moduleStreamUUIDs).
Error; err != nil {
return err
}
return nil
}

func (r moduleStreamsImpl) OrphanCleanup(ctx context.Context) error {
if err := r.db.WithContext(ctx).
Model(&models.ModuleStream{}).
Where("NOT EXISTS (select from repositories_module_streams where module_streams.uuid = repositories_module_streams.module_stream_uuid )").
Delete(&models.ModuleStream{}).Error; err != nil {
return err
}
return nil
}
Loading

0 comments on commit a3c0075

Please sign in to comment.