Skip to content

Commit

Permalink
cleanup arango backend, add ingestion of builder, add support for sou…
Browse files Browse the repository at this point in the history
…rce ingestion for Occurrences, return IDs during ingestion and query (#1048)

* add ingest builder

Signed-off-by: pxp928 <parth.psu@gmail.com>

* add ingest and query builder for arango

Signed-off-by: pxp928 <parth.psu@gmail.com>

* reorg and cleanup code

Signed-off-by: pxp928 <parth.psu@gmail.com>

* add constants for collections

Signed-off-by: pxp928 <parth.psu@gmail.com>

---------

Signed-off-by: pxp928 <parth.psu@gmail.com>
  • Loading branch information
pxp928 authored Jul 12, 2023
1 parent 97bb3d4 commit 2b9306d
Show file tree
Hide file tree
Showing 23 changed files with 2,012 additions and 1,372 deletions.
93 changes: 30 additions & 63 deletions pkg/assembler/backends/arangodb/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (

func (c *arangoClient) Artifacts(ctx context.Context, artifactSpec *model.ArtifactSpec) ([]*model.Artifact, error) {
values := map[string]any{}
arangoQueryBuilder := newForQuery("artifacts", "art")
arangoQueryBuilder := newForQuery(artifactsStr, "art")
if artifactSpec.Algorithm != nil {
arangoQueryBuilder.filter("algorithm", "art", "==", "@algorithm")
arangoQueryBuilder.filter("art", "algorithm", "==", "@algorithm")
values["algorithm"] = strings.ToLower(*artifactSpec.Algorithm)
}
if artifactSpec.Digest != nil {
arangoQueryBuilder.filter("digest", "art", "==", "@digest")
arangoQueryBuilder.filter("art", "digest", "==", "@digest")
values["digest"] = strings.ToLower(*artifactSpec.Digest)
}
arangoQueryBuilder.query.WriteString("\n")
Expand All @@ -46,39 +46,24 @@ func (c *arangoClient) Artifacts(ctx context.Context, artifactSpec *model.Artifa
fmt.Println(arangoQueryBuilder.string())
cursor, err := executeQueryWithRetry(ctx, c.db, arangoQueryBuilder.string(), values, "Artifacts")
if err != nil {
return nil, fmt.Errorf("failed to create vertex documents: %w", err)
return nil, fmt.Errorf("failed to query for artifacts: %w", err)
}
defer cursor.Close()

var collectedArtifacts []*model.Artifact
for {
var doc *model.Artifact
_, err := cursor.ReadDocument(ctx, &doc)
if err != nil {
if driver.IsNoMoreDocuments(err) {
break
} else {
return nil, fmt.Errorf("failed to query artifact: %w", err)
}
} else {
collectedArtifacts = append(collectedArtifacts, doc)
}
}
return getArtifacts(ctx, cursor)
}

return collectedArtifacts, nil
func getArtifactQueryValues(artifact *model.ArtifactInputSpec) map[string]any {
values := map[string]any{}
values["algorithm"] = strings.ToLower(artifact.Algorithm)
values["digest"] = strings.ToLower(artifact.Digest)
return values
}

func (c *arangoClient) IngestArtifacts(ctx context.Context, artifacts []*model.ArtifactInputSpec) ([]*model.Artifact, error) {

listOfValues := []map[string]any{}

for i := range artifacts {
values := map[string]any{}

values["algorithm"] = strings.ToLower(artifacts[i].Algorithm)
values["digest"] = strings.ToLower(artifacts[i].Digest)

listOfValues = append(listOfValues, values)
listOfValues = append(listOfValues, getArtifactQueryValues(artifacts[i]))
}

var documents []string
Expand Down Expand Up @@ -113,45 +98,39 @@ RETURN NEW`

cursor, err := executeQueryWithRetry(ctx, c.db, sb.String(), nil, "IngestArtifacts")
if err != nil {
return nil, fmt.Errorf("failed to create vertex documents: %w", err)
return nil, fmt.Errorf("failed to ingest artifact: %w", err)
}
defer cursor.Close()

var createdArtifacts []*model.Artifact
for {
var doc *model.Artifact
_, err := cursor.ReadDocument(ctx, &doc)
if err != nil {
if driver.IsNoMoreDocuments(err) {
break
} else {
return nil, fmt.Errorf("failed to ingest artifact: %w", err)
}
} else {
createdArtifacts = append(createdArtifacts, doc)
}
}
return createdArtifacts, nil
return getArtifacts(ctx, cursor)

}

func (c *arangoClient) IngestArtifact(ctx context.Context, artifact *model.ArtifactInputSpec) (*model.Artifact, error) {
values := map[string]any{}
values["algorithm"] = strings.ToLower(artifact.Algorithm)
values["digest"] = strings.ToLower(artifact.Digest)

query := `
UPSERT { algorithm:@algorithm, digest:@digest }
INSERT { algorithm:@algorithm, digest:@digest }
UPDATE {} IN artifacts OPTIONS { indexHint: "byArtAndDigest" }
RETURN NEW`

cursor, err := executeQueryWithRetry(ctx, c.db, query, values, "IngestArtifact")
cursor, err := executeQueryWithRetry(ctx, c.db, query, getArtifactQueryValues(artifact), "IngestArtifact")
if err != nil {
return nil, fmt.Errorf("failed to create vertex documents: %w", err)
return nil, fmt.Errorf("failed to ingest artifact: %w", err)
}
defer cursor.Close()

createdArtifacts, err := getArtifacts(ctx, cursor)
if err != nil {
return nil, fmt.Errorf("failed to get artifacts from arango cursor: %w", err)
}
if len(createdArtifacts) == 1 {
return createdArtifacts[0], nil
} else {
return nil, fmt.Errorf("number of artifacts ingested is greater than one")
}
}

func getArtifacts(ctx context.Context, cursor driver.Cursor) ([]*model.Artifact, error) {
var createdArtifacts []*model.Artifact
for {
var doc *model.Artifact
Expand All @@ -160,23 +139,11 @@ RETURN NEW`
if driver.IsNoMoreDocuments(err) {
break
} else {
return nil, fmt.Errorf("failed to ingest artifact: %w", err)
return nil, fmt.Errorf("failed to get artifact from cursor: %w", err)
}
} else {
createdArtifacts = append(createdArtifacts, doc)
}
}
if len(createdArtifacts) == 1 {
return createdArtifacts[0], nil
} else {
return nil, fmt.Errorf("number of artifacts ingested is greater than one")
}
}

func generateModelArtifact(algorithm, digest string) *model.Artifact {
artifact := model.Artifact{
Algorithm: algorithm,
Digest: digest,
}
return &artifact
return createdArtifacts, nil
}
Loading

0 comments on commit 2b9306d

Please sign in to comment.