Skip to content

Commit

Permalink
starting to integrate the new asset db cache
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Nov 22, 2024
1 parent 8f4a14b commit 1e610c5
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 12 deletions.
15 changes: 12 additions & 3 deletions engine/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"log/slog"
"os"
"time"

"github.com/caffix/queue"
et "github.com/owasp-amass/amass/v4/engine/types"
Expand Down Expand Up @@ -48,13 +49,19 @@ func (d *dis) Shutdown() {
}

func (d *dis) collectEvents() {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
loop:
for {
select {
case <-d.done:
break loop
case <-d.completed.Signal():
d.completed.Process(d.completedCallback)
case <-t.C:
if d.completed.Len() > 0 {
d.completed.Process(d.completedCallback)
}
}
}
d.completed.Process(d.completedCallback)
Expand Down Expand Up @@ -84,12 +91,14 @@ func (d *dis) DispatchEvent(e *et.Event) error {
}

e.Dispatcher = d
a := e.Asset.Asset
a := e.Entity.Asset
// do not schedule the same asset more than once
if p, hit := e.Session.Cache().GetAsset(a); p != nil && hit {
c := e.Session.Cache()
ents, err := c.FindEntityByContent(a, c.StartTime())
if len(ents) > 0 {
return errors.New("this event was processed previously")
}
e.Session.Cache().SetAsset(e.Asset)
_, _ = c.CreateAsset(a)

ap, err := d.reg.GetPipeline(a.AssetType())
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions engine/sessions/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package sessions
import (
"fmt"
"log/slog"
"os"
"sync"
"time"

Expand Down Expand Up @@ -94,6 +95,9 @@ func (r *manager) CancelSession(id uuid.UUID) {
if c := r.sessions[id].Cache(); c != nil {
c.Close()
}
if dir := r.sessions[id].tmpdir; dir != "" {
os.RemoveAll(dir)
}
if db := s.DB(); db != nil {
if err := db.Close(); err != nil {
s.Log().Error(fmt.Sprintf("failed to close the database for session %s: %v", id, err))
Expand Down
32 changes: 28 additions & 4 deletions engine/sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"errors"
"fmt"
"log/slog"
"math/rand"
"os"
"path/filepath"
"strings"
"time"

"github.com/google/uuid"
"github.com/owasp-amass/amass/v4/config"
Expand All @@ -19,6 +22,7 @@ import (
assetdb "github.com/owasp-amass/asset-db"
"github.com/owasp-amass/asset-db/cache"
"github.com/owasp-amass/asset-db/repository"
"github.com/owasp-amass/asset-db/repository/sqlrepo"
)

type Session struct {
Expand All @@ -31,6 +35,7 @@ type Session struct {
dsn string
dbtype string
c *cache.Cache
tmpdir string
stats *et.SessionStats
done chan struct{}
}
Expand All @@ -57,8 +62,13 @@ func CreateSession(cfg *config.Config) (et.Session, error) {
return nil, err
}

var err error
s.c, err = cache.New(s.db.Repo)
c, dir, err := createFileCacheRepo()
if err != nil {
return nil, err
}
s.tmpdir = dir

s.c, err = cache.New(c, s.db.Repo, time.Minute)
if err != nil || s.c == nil {
return nil, errors.New("failed to create the session cache")
}
Expand Down Expand Up @@ -137,12 +147,12 @@ func (s *Session) selectDBMS() error {
if db.System == "postgres" {
// Construct the connection string for a Postgres database.
s.dsn = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s", db.Host, db.Port, db.Username, db.Password, db.DBName)
s.dbtype = repository.Postgres
s.dbtype = sqlrepo.Postgres
} else if db.System == "sqlite" || db.System == "sqlite3" {
// Define the connection path for an SQLite database.
path := filepath.Join(config.OutputDirectory(s.cfg.Dir), "amass.sqlite")
s.dsn = path
s.dbtype = repository.SQLite
s.dbtype = sqlrepo.SQLite
}
// Break the loop once the primary database is found.
break
Expand All @@ -160,3 +170,17 @@ func (s *Session) selectDBMS() error {
s.db = store
return nil
}

func createFileCacheRepo() (repository.Repository, string, error) {
dir, err := os.MkdirTemp("", fmt.Sprintf("test-%d", rand.Intn(100)))
if err != nil {
return nil, "", errors.New("failed to create the temp dir")
}

c := assetdb.New(sqlrepo.SQLite, filepath.Join(dir, "cache.sqlite"))
if c == nil {
return nil, "", errors.New("failed to create the cache db")
}

return c.Repo, dir, nil
}
2 changes: 1 addition & 1 deletion engine/types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type Event struct {
Name string
Asset *dbt.Asset
Entity *dbt.Entity
Meta interface{}
Dispatcher Dispatcher
Session Session
Expand Down
10 changes: 9 additions & 1 deletion engine/types/registry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © by Jeff Foley 2023-2024. All rights reserved.
// Copyright © by Jeff Foley 2017-2024. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -7,6 +7,7 @@ package types
import (
"context"
"log/slog"
"time"

"github.com/caffix/pipeline"
"github.com/caffix/queue"
Expand Down Expand Up @@ -51,6 +52,9 @@ func NewPipelineQueue() *PipelineQueue {

// Next implements the pipeline InputSource interface.
func (pq *PipelineQueue) Next(ctx context.Context) bool {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()

if pq.Queue.Len() > 0 {
return true
}
Expand All @@ -59,6 +63,10 @@ func (pq *PipelineQueue) Next(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-t.C:
if pq.Queue.Len() > 0 {
return true
}
case <-pq.Queue.Signal():
if pq.Queue.Len() > 0 {
return true
Expand Down
2 changes: 0 additions & 2 deletions engine/types/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/owasp-amass/amass/v4/engine/cache"

Check failure on line 13 in engine/types/sessions.go

View workflow job for this annotation

GitHub Actions / Coverage

github.com/owasp-amass/asset-db@v0.14.1: missing go.sum entry for go.mod file; to add it:

Check failure on line 13 in engine/types/sessions.go

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest, 1.23.1)

github.com/owasp-amass/asset-db@v0.14.1: missing go.sum entry for go.mod file; to add it:
"github.com/owasp-amass/amass/v4/engine/pubsub"
"github.com/owasp-amass/amass/v4/engine/sessions/scope"
assetdb "github.com/owasp-amass/asset-db"
)

type Session interface {
Expand All @@ -22,7 +21,6 @@ type Session interface {
PubSub() *pubsub.Logger
Config() *config.Config
Scope() *scope.Scope
DB() *assetdb.AssetDB
Cache() cache.Cache
Stats() *SessionStats
Done() bool
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/miekg/dns v1.1.62
github.com/nyaruka/phonenumbers v1.4.1
github.com/openrdap/rdap v0.9.1
github.com/owasp-amass/asset-db v0.13.0
github.com/owasp-amass/asset-db v0.14.1
github.com/owasp-amass/open-asset-model v0.12.0
github.com/owasp-amass/resolve v0.8.1
github.com/rubenv/sql-migrate v1.7.0
Expand Down

0 comments on commit 1e610c5

Please sign in to comment.