Skip to content

Commit

Permalink
refactor(journal): merged tables into a single 'journal' table
Browse files Browse the repository at this point in the history
This commit merges the runtime and persistent tables
into a single 'journal' table with additional logic to
distinguish journal entries from the active session
and previous sessions.

Signed-off-by: Jason Jerome <jajerome@redhat.com>
  • Loading branch information
DuckBoss committed Jun 15, 2023
1 parent 19f82d4 commit 78c3b88
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 71 deletions.
77 changes: 23 additions & 54 deletions internal/messagejournal/messagejournal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import (
"github.com/redhatinsights/yggdrasil/ipc"
)

const runtimeTableName string = "runtime"
const persistentTableName string = "persistent"
const messageJournalTableName string = "journal"

//go:embed migrations/*.sql
var embeddedMigrationData embed.FS

// MessageJournal is a data structure representing the collection
// of message journal entries received from worker emitted events and messages.
// It also stores the date time of when the journal was initialized to track
// events and messages in the active session.
type MessageJournal struct {
database *sql.DB
database *sql.DB
initializedAt time.Time
}

// Filter is a data structure representing the filtering options
Expand All @@ -52,20 +54,11 @@ func New(databaseFilePath string) (*MessageJournal, error) {
return nil, fmt.Errorf("database migration error: %w", err)
}

messageJournal := MessageJournal{database: db}
messageJournal := MessageJournal{database: db, initializedAt: time.Now().UTC()}
if err = db.Ping(); err != nil {
return nil, fmt.Errorf("message journal database not connected: %w", err)
}

// Clear the runtime message journal table for the active session.
clearRuntimeTableResult, err := db.Exec(fmt.Sprintf("DELETE FROM %s", runtimeTableName))
if err != nil {
return nil, fmt.Errorf("could not clear journal entries from table '%v': %w", runtimeTableName, err)
}
if _, err = clearRuntimeTableResult.RowsAffected(); err != nil {
return nil, fmt.Errorf("could not identify affected rows for table '%v': %w", runtimeTableName, err)
}

return &messageJournal, nil
}

Expand Down Expand Up @@ -103,27 +96,12 @@ func (j *MessageJournal) AddEntry(entry yggdrasil.WorkerMessage) (*yggdrasil.Wor
message_id, sent, worker_name, response_to, worker_event, worker_message)
values (?,?,?,?,?,?)`

runtimeAction, err := j.database.Prepare(fmt.Sprintf(insertEntryTemplate, runtimeTableName))
if err != nil {
return nil, fmt.Errorf("cannot prepare statement for table '%v': %w", runtimeTableName, err)
}
persistentAction, err := j.database.Prepare(fmt.Sprintf(insertEntryTemplate, persistentTableName))
insertAction, err := j.database.Prepare(fmt.Sprintf(insertEntryTemplate, messageJournalTableName))
if err != nil {
return nil, fmt.Errorf("cannot prepare statement for table '%v': %w", persistentTableName, err)
return nil, fmt.Errorf("cannot prepare statement for table '%v': %w", messageJournalTableName, err)
}

runtimeResult, err := runtimeAction.Exec(
entry.MessageID,
entry.Sent,
entry.WorkerName,
entry.ResponseTo,
entry.WorkerEvent.EventName,
entry.WorkerEvent.EventMessage,
)
if err != nil {
return nil, fmt.Errorf("could not insert journal entry into table '%v': %w", runtimeTableName, err)
}
persistentResult, err := persistentAction.Exec(
persistentResult, err := insertAction.Exec(
entry.MessageID,
entry.Sent,
entry.WorkerName,
Expand All @@ -132,20 +110,14 @@ func (j *MessageJournal) AddEntry(entry yggdrasil.WorkerMessage) (*yggdrasil.Wor
entry.WorkerEvent.EventMessage,
)
if err != nil {
return nil, fmt.Errorf("could not insert journal entry into table '%v': %w", persistentTableName, err)
return nil, fmt.Errorf("could not insert journal entry into table '%v': %w", messageJournalTableName, err)
}

runtimeEntryID, err := runtimeResult.LastInsertId()
entryID, err := persistentResult.LastInsertId()
if err != nil {
return nil, fmt.Errorf("could not select last insert ID '%v' for table '%v': %w", runtimeEntryID, runtimeTableName, err)
return nil, fmt.Errorf("could not select last insert ID '%v' for table '%v': %w", entryID, messageJournalTableName, err)
}
log.Debugf("new message journal entry (id: %v) added to table: '%v'", runtimeEntryID, runtimeTableName)

persistentEntryID, err := persistentResult.LastInsertId()
if err != nil {
return nil, fmt.Errorf("could not select last insert ID '%v' for table '%v': %w", persistentEntryID, persistentTableName, err)
}
log.Debugf("new message journal entry (id: %v) added to table: '%v'", persistentEntryID, persistentTableName)
log.Debugf("new message journal entry (id: %v) added: '%v'", entryID, entry.MessageID)

return &entry, nil
}
Expand Down Expand Up @@ -225,20 +197,14 @@ func (j *MessageJournal) GetEntries(filter Filter) ([]map[string]string, error)
// required to filter journal entry messages from the message journal database
// when they are retrieved in the 'GetEntries' method.
func (j *MessageJournal) buildDynamicGetEntriesQuery(filter Filter) (string, error) {
var tableName string
if filter.Persistent {
tableName = persistentTableName
} else {
tableName = runtimeTableName
}

queryTemplate := template.New("dynamicGetEntriesQuery")
queryTemplateParse, err := queryTemplate.Parse(
`SELECT * FROM {{.Table}}
{{if .MessageID}} INTERSECT SELECT * FROM {{.Table}} WHERE message_id='{{.MessageID}}'{{end}}
{{if .Worker}} INTERSECT SELECT * FROM {{.Table}} WHERE worker_name='{{.Worker}}'{{end}}
{{if .From}} INTERSECT SELECT * FROM {{.Table}} WHERE sent>='{{.From}}'{{end}}
{{if .To}} INTERSECT SELECT * FROM {{.Table}} WHERE sent<='{{.To}}'{{end}}
{{if not .Persistent}} INTERSECT SELECT * FROM {{.Table}} WHERE sent>='{{.InitializedAt}}'{{end}}
ORDER BY sent`,
)
if err != nil {
Expand All @@ -247,13 +213,16 @@ func (j *MessageJournal) buildDynamicGetEntriesQuery(filter Filter) (string, err
var compiledQuery bytes.Buffer
queryCompileErr := queryTemplateParse.Execute(&compiledQuery,
struct {
Table string
MessageID string
Worker string
From string
To string
Table string
InitializedAt string
Persistent bool
MessageID string
Worker string
From string
To string
}{
tableName, filter.MessageID, filter.Worker, filter.From, filter.To,
messageJournalTableName, j.initializedAt.String(), filter.Persistent,
filter.MessageID, filter.Worker, filter.From, filter.To,
})
if queryCompileErr != nil {
return "", fmt.Errorf("cannot compile query template: %w", queryCompileErr)
Expand Down
8 changes: 4 additions & 4 deletions internal/messagejournal/messagejournal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestGetEntries(t *testing.T) {
description: "get journal entries - unfiltered empty",
entries: []yggdrasil.WorkerMessage{},
input: Filter{
Persistent: false,
Persistent: true,
TruncateLength: 100,
MessageID: "",
Worker: "",
Expand All @@ -75,7 +75,7 @@ func TestGetEntries(t *testing.T) {
placeholderWorkerMessageEntry,
},
input: Filter{
Persistent: false,
Persistent: true,
TruncateLength: 100,
MessageID: "",
Worker: "",
Expand All @@ -99,7 +99,7 @@ func TestGetEntries(t *testing.T) {
placeholderWorkerMessageEntry,
},
input: Filter{
Persistent: false,
Persistent: true,
TruncateLength: 100,
MessageID: "test-invalid-filtered-message-id",
Worker: "",
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestGetEntries(t *testing.T) {
},
},
input: Filter{
Persistent: false,
Persistent: true,
TruncateLength: 100,
MessageID: "test-filtered-message-id",
Worker: "",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
DROP TABLE IF EXISTS runtime;
DROP TABLE IF EXISTS persistent;
DROP TABLE IF EXISTS journal;
12 changes: 1 addition & 11 deletions internal/messagejournal/migrations/000001_messagejournal.up.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
DROP TABLE IF EXISTS runtime;
CREATE TABLE IF NOT EXISTS runtime (
id INTEGER NOT NULL PRIMARY KEY,
message_id VARCHAR(36) NOT NULL,
sent DATETIME NOT NULL,
worker_name VARCHAR(128) NOT NULL,
response_to VARCHAR(36),
worker_event INTEGER,
worker_message TEXT
);
CREATE TABLE IF NOT EXISTS persistent (
CREATE TABLE IF NOT EXISTS journal (
id INTEGER NOT NULL PRIMARY KEY,
message_id VARCHAR(36) NOT NULL,
sent DATETIME NOT NULL,
Expand Down

0 comments on commit 78c3b88

Please sign in to comment.