From 435c0064ec05eb75353557d792e3fe0dcca939bf Mon Sep 17 00:00:00 2001 From: Zeke Gabrielse Date: Sun, 15 Dec 2024 21:39:25 -0600 Subject: [PATCH] refactor logging --- db/queries/licenses.sql | 6 +-- db/queries/nodes.sql | 7 +-- internal/db/licenses.sql.go | 18 ++++---- internal/db/nodes.sql.go | 32 ++++++++++++-- internal/db/store.go | 45 ++++++++++++------- internal/licenses/manager.go | 86 +++++++++++++++++++++--------------- 6 files changed, 124 insertions(+), 70 deletions(-) diff --git a/db/queries/licenses.sql b/db/queries/licenses.sql index 00fd006..117b340 100644 --- a/db/queries/licenses.sql +++ b/db/queries/licenses.sql @@ -31,7 +31,7 @@ UPDATE licenses SET node_id = NULL, last_released_at = unixepoch() WHERE node_id = ?; --- name: ClaimUnclaimedLicenseFIFO :one +-- name: ClaimLicenseFIFO :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( @@ -43,7 +43,7 @@ WHERE id = ( ) RETURNING *; --- name: ClaimUnclaimedLicenseLIFO :one +-- name: ClaimLicenseLIFO :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( @@ -55,7 +55,7 @@ WHERE id = ( ) RETURNING *; --- name: ClaimUnclaimedLicenseRandom :one +-- name: ClaimLicenseRandom :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( diff --git a/db/queries/nodes.sql b/db/queries/nodes.sql index c2843b5..b1fed43 100644 --- a/db/queries/nodes.sql +++ b/db/queries/nodes.sql @@ -1,7 +1,7 @@ -- name: InsertNode :one INSERT INTO nodes (fingerprint, last_heartbeat_at, created_at) VALUES (?, NULL, unixepoch()) -RETURNING id, fingerprint, last_heartbeat_at, created_at; +RETURNING *; -- name: GetNodeByFingerprint :one SELECT id, fingerprint, last_heartbeat_at, created_at @@ -16,6 +16,7 @@ WHERE fingerprint = ?; -- name: DeleteNodeByFingerprint :exec DELETE FROM nodes WHERE fingerprint = ?; --- name: DeleteInactiveNodes :exec +-- name: DeleteInactiveNodes :many DELETE FROM nodes -WHERE last_heartbeat_at <= strftime('%s', 'now', ?); +WHERE last_heartbeat_at <= strftime('%s', 'now', ?) +RETURNING *; diff --git a/internal/db/licenses.sql.go b/internal/db/licenses.sql.go index 4acb686..e0d568d 100644 --- a/internal/db/licenses.sql.go +++ b/internal/db/licenses.sql.go @@ -25,7 +25,7 @@ func (q *Queries) ClaimLicense(ctx context.Context, arg ClaimLicenseParams) erro return err } -const claimUnclaimedLicenseFIFO = `-- name: ClaimUnclaimedLicenseFIFO :one +const claimLicenseFIFO = `-- name: ClaimLicenseFIFO :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( @@ -38,8 +38,8 @@ WHERE id = ( RETURNING id, file, "key", claims, last_claimed_at, last_released_at, node_id, created_at ` -func (q *Queries) ClaimUnclaimedLicenseFIFO(ctx context.Context, nodeID *int64) (License, error) { - row := q.db.QueryRowContext(ctx, claimUnclaimedLicenseFIFO, nodeID) +func (q *Queries) ClaimLicenseFIFO(ctx context.Context, nodeID *int64) (License, error) { + row := q.db.QueryRowContext(ctx, claimLicenseFIFO, nodeID) var i License err := row.Scan( &i.ID, @@ -54,7 +54,7 @@ func (q *Queries) ClaimUnclaimedLicenseFIFO(ctx context.Context, nodeID *int64) return i, err } -const claimUnclaimedLicenseLIFO = `-- name: ClaimUnclaimedLicenseLIFO :one +const claimLicenseLIFO = `-- name: ClaimLicenseLIFO :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( @@ -67,8 +67,8 @@ WHERE id = ( RETURNING id, file, "key", claims, last_claimed_at, last_released_at, node_id, created_at ` -func (q *Queries) ClaimUnclaimedLicenseLIFO(ctx context.Context, nodeID *int64) (License, error) { - row := q.db.QueryRowContext(ctx, claimUnclaimedLicenseLIFO, nodeID) +func (q *Queries) ClaimLicenseLIFO(ctx context.Context, nodeID *int64) (License, error) { + row := q.db.QueryRowContext(ctx, claimLicenseLIFO, nodeID) var i License err := row.Scan( &i.ID, @@ -83,7 +83,7 @@ func (q *Queries) ClaimUnclaimedLicenseLIFO(ctx context.Context, nodeID *int64) return i, err } -const claimUnclaimedLicenseRandom = `-- name: ClaimUnclaimedLicenseRandom :one +const claimLicenseRandom = `-- name: ClaimLicenseRandom :one UPDATE licenses SET node_id = ?, last_claimed_at = unixepoch(), claims = claims + 1 WHERE id = ( @@ -96,8 +96,8 @@ WHERE id = ( RETURNING id, file, "key", claims, last_claimed_at, last_released_at, node_id, created_at ` -func (q *Queries) ClaimUnclaimedLicenseRandom(ctx context.Context, nodeID *int64) (License, error) { - row := q.db.QueryRowContext(ctx, claimUnclaimedLicenseRandom, nodeID) +func (q *Queries) ClaimLicenseRandom(ctx context.Context, nodeID *int64) (License, error) { + row := q.db.QueryRowContext(ctx, claimLicenseRandom, nodeID) var i License err := row.Scan( &i.ID, diff --git a/internal/db/nodes.sql.go b/internal/db/nodes.sql.go index ebf081b..e5c0dd4 100644 --- a/internal/db/nodes.sql.go +++ b/internal/db/nodes.sql.go @@ -9,14 +9,38 @@ import ( "context" ) -const deleteInactiveNodes = `-- name: DeleteInactiveNodes :exec +const deleteInactiveNodes = `-- name: DeleteInactiveNodes :many DELETE FROM nodes WHERE last_heartbeat_at <= strftime('%s', 'now', ?) +RETURNING id, fingerprint, last_heartbeat_at, created_at ` -func (q *Queries) DeleteInactiveNodes(ctx context.Context, strftime interface{}) error { - _, err := q.db.ExecContext(ctx, deleteInactiveNodes, strftime) - return err +func (q *Queries) DeleteInactiveNodes(ctx context.Context, strftime interface{}) ([]Node, error) { + rows, err := q.db.QueryContext(ctx, deleteInactiveNodes, strftime) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Node + for rows.Next() { + var i Node + if err := rows.Scan( + &i.ID, + &i.Fingerprint, + &i.LastHeartbeatAt, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil } const deleteNodeByFingerprint = `-- name: DeleteNodeByFingerprint :exec diff --git a/internal/db/store.go b/internal/db/store.go index 29b27be..538ed5d 100644 --- a/internal/db/store.go +++ b/internal/db/store.go @@ -142,7 +142,9 @@ func (s *Store) PingNodeHeartbeatByFingerprint(ctx context.Context, fingerprint return s.queries.PingNodeHeartbeatByFingerprint(ctx, fingerprint) } -// TODO(ezekg) allow event data? e.g. license.lease_extended {from:x,to:y} +// TODO(ezekg) allow event data? e.g. license.lease_extended {from:x,to:y} or license.leased {node:n} or node.heartbeat_ping {count:n} +// +// but doing so would pose problems for future aggregation... func (s *Store) InsertAuditLog(ctx context.Context, eventTypeId EventTypeId, entityTypeId EntityTypeId, entityID string) error { params := InsertAuditLogParams{ EventTypeID: int64(eventTypeId), @@ -153,13 +155,13 @@ func (s *Store) InsertAuditLog(ctx context.Context, eventTypeId EventTypeId, ent return s.queries.InsertAuditLog(ctx, params) } -type InsertAuditLogsParams = []struct { +type BulkInsertAuditLogParams struct { EventTypeID EventTypeId EntityTypeID EntityTypeId EntityID string } -func (s *Store) InsertAuditLogs(ctx context.Context, logs InsertAuditLogsParams) error { +func (s *Store) BulkInsertAuditLogs(ctx context.Context, logs []BulkInsertAuditLogParams) error { tx, err := s.BeginTx(ctx) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) @@ -174,8 +176,7 @@ func (s *Store) InsertAuditLogs(ctx context.Context, logs InsertAuditLogsParams) EntityID: log.EntityID, } - err := qtx.queries.InsertAuditLog(ctx, params) - if err != nil { + if err := qtx.queries.InsertAuditLog(ctx, params); err != nil { return fmt.Errorf("failed to insert audit log: %w", err) } } @@ -187,8 +188,8 @@ func (s *Store) InsertAuditLogs(ctx context.Context, logs InsertAuditLogsParams) return nil } -func (s *Store) ClaimUnclaimedLicenseFIFO(ctx context.Context, nodeID *int64) (*License, error) { - license, err := s.queries.ClaimUnclaimedLicenseFIFO(ctx, nodeID) +func (s *Store) ClaimLicenseFIFO(ctx context.Context, nodeID *int64) (*License, error) { + license, err := s.queries.ClaimLicenseFIFO(ctx, nodeID) if err != nil { return nil, err } @@ -196,8 +197,8 @@ func (s *Store) ClaimUnclaimedLicenseFIFO(ctx context.Context, nodeID *int64) (* return &license, nil } -func (s *Store) ClaimUnclaimedLicenseLIFO(ctx context.Context, nodeID *int64) (*License, error) { - license, err := s.queries.ClaimUnclaimedLicenseLIFO(ctx, nodeID) +func (s *Store) ClaimLicenseLIFO(ctx context.Context, nodeID *int64) (*License, error) { + license, err := s.queries.ClaimLicenseLIFO(ctx, nodeID) if err != nil { return nil, err } @@ -205,8 +206,8 @@ func (s *Store) ClaimUnclaimedLicenseLIFO(ctx context.Context, nodeID *int64) (* return &license, nil } -func (s *Store) ClaimUnclaimedLicenseRandom(ctx context.Context, nodeID *int64) (*License, error) { - license, err := s.queries.ClaimUnclaimedLicenseRandom(ctx, nodeID) +func (s *Store) ClaimLicenseRandom(ctx context.Context, nodeID *int64) (*License, error) { + license, err := s.queries.ClaimLicenseRandom(ctx, nodeID) if err != nil { return nil, err } @@ -214,6 +215,19 @@ func (s *Store) ClaimUnclaimedLicenseRandom(ctx context.Context, nodeID *int64) return &license, nil } +func (s *Store) ClaimLicenseByStrategy(ctx context.Context, strategy string, nodeID *int64) (*License, error) { + switch strategy { + case "fifo": + return s.ClaimLicenseFIFO(ctx, nodeID) + case "lifo": + return s.ClaimLicenseLIFO(ctx, nodeID) + case "rand": + return s.ClaimLicenseRandom(ctx, nodeID) + default: + return s.ClaimLicenseFIFO(ctx, nodeID) + } +} + func (s *Store) GetLicenseByNodeID(ctx context.Context, nodeID *int64) (*License, error) { license, err := s.queries.GetLicenseByNodeID(ctx, nodeID) if err != nil { @@ -236,14 +250,15 @@ func (s *Store) ReleaseLicensesFromInactiveNodes(ctx context.Context, ttl time.D return licenses, nil } -func (s *Store) DeleteInactiveNodes(ctx context.Context, ttl time.Duration) error { +func (s *Store) DeleteInactiveNodes(ctx context.Context, ttl time.Duration) ([]Node, error) { t := fmt.Sprintf("-%d seconds", int(ttl.Seconds())) - if err := s.queries.DeleteInactiveNodes(ctx, t); err != nil { + nodes, err := s.queries.DeleteInactiveNodes(ctx, t) + if err != nil { slog.Error("failed to delete inactive nodes", "error", err) - return err + return nil, err } - return nil + return nodes, nil } diff --git a/internal/licenses/manager.go b/internal/licenses/manager.go index 535a3a1..d0842c4 100644 --- a/internal/licenses/manager.go +++ b/internal/licenses/manager.go @@ -7,7 +7,6 @@ import ( "fmt" "log/slog" "os" - "strconv" "time" "github.com/keygen-sh/keygen-go/v3" @@ -76,10 +75,12 @@ func (m *manager) AddLicense(ctx context.Context, licenseFilePath string, licens if err != nil { if errors.Is(err, os.ErrNotExist) { slog.Warn("license file not found", "filePath", licenseFilePath) + return fmt.Errorf("license file not found at '%s'", licenseFilePath) } slog.Error("failed to read license file", "filePath", licenseFilePath, "error", err) + return fmt.Errorf("failed to read license file: %w", err) } @@ -144,6 +145,7 @@ func (m *manager) RemoveLicense(ctx context.Context, id string) error { } slog.Debug("removed license successfully", "licenseID", id) + return nil } @@ -180,10 +182,12 @@ func (m *manager) GetLicenseByID(ctx context.Context, id string) (*db.License, e } slog.Debug("failed to fetch license by ID", "licenseID", id, "error", err) + return nil, err } slog.Debug("fetched license successfully", "licenseID", id) + return license, nil } @@ -195,15 +199,18 @@ func (m *manager) ClaimLicense(ctx context.Context, fingerprint string) (*Licens qtx := m.store.WithTx(tx) defer tx.Rollback() - node, err := m.fetchOrCreateNode(ctx, *qtx, fingerprint) + node, err := m.findOrCreateNode(ctx, *qtx, fingerprint) if err != nil { return nil, fmt.Errorf("failed to fetch or create node: %w", err) } claimedLicense, err := qtx.GetLicenseByNodeID(ctx, &node.ID) + + // extend the lease if the node already has a lease on a license if err == nil { if !m.config.ExtendOnHeartbeat { // if heartbeat is disabled, we can't extend the claimed license - slog.Warn("failed to claim license due to conflict due to heartbeat disabled", "nodeID", node.ID, "Fingerprint", node.Fingerprint) + slog.Warn("failed to claim license due to conflict due to heartbeat disabled", "nodeID", node.ID, "nodeFingerprint", node.Fingerprint) + return &LicenseOperationResult{Status: OperationStatusConflict}, nil } @@ -216,7 +223,7 @@ func (m *manager) ClaimLicense(ctx context.Context, fingerprint string) (*Licens } if m.config.EnabledAudit { - if err := m.store.InsertAuditLogs(ctx, db.InsertAuditLogsParams{ + if err := m.store.BulkInsertAuditLogs(ctx, []db.BulkInsertAuditLogParams{ {EventTypeID: db.EventTypeLicenseLeaseExtended, EntityTypeID: db.EntityTypeLicense, EntityID: claimedLicense.ID}, {EventTypeID: db.EventTypeNodeHeartbeatPing, EntityTypeID: db.EntityTypeNode, EntityID: node.Fingerprint}, }); err != nil { @@ -232,11 +239,11 @@ func (m *manager) ClaimLicense(ctx context.Context, fingerprint string) (*Licens }, nil } - // claim a new license based on the strategy - newLicense, err := m.selectLicenseClaimStrategy(ctx, *qtx, &node.ID) + // claim a new lease on a license if node doesn't have a lease + newLicense, err := qtx.ClaimLicenseByStrategy(ctx, m.config.Strategy, &node.ID) if err != nil { if errors.Is(err, sql.ErrNoRows) { - slog.Warn("no licenses available for claim", "Fingerprint", node.Fingerprint) + slog.Warn("no licenses available in pool", "nodeId", node.ID, "nodeFingerprint", node.Fingerprint) return &LicenseOperationResult{Status: OperationStatusNoLicensesAvailable}, nil } @@ -244,7 +251,6 @@ func (m *manager) ClaimLicense(ctx context.Context, fingerprint string) (*Licens return nil, fmt.Errorf("failed to claim license: %w", err) } - // ping node heartbeat if err := qtx.PingNodeHeartbeatByFingerprint(ctx, fingerprint); err != nil { return nil, fmt.Errorf("failed to update node claim: %w", err) } @@ -254,7 +260,7 @@ func (m *manager) ClaimLicense(ctx context.Context, fingerprint string) (*Licens } if m.config.EnabledAudit { - if err := m.store.InsertAuditLogs(ctx, db.InsertAuditLogsParams{ + if err := m.store.BulkInsertAuditLogs(ctx, []db.BulkInsertAuditLogParams{ {EventTypeID: db.EventTypeLicenseLeased, EntityTypeID: db.EntityTypeLicense, EntityID: newLicense.ID}, {EventTypeID: db.EventTypeNodeHeartbeatPing, EntityTypeID: db.EntityTypeNode, EntityID: node.Fingerprint}, }); err != nil { @@ -292,7 +298,7 @@ func (m *manager) ReleaseLicense(ctx context.Context, fingerprint string) (*Lice claimedLicense, err := qtx.GetLicenseByNodeID(ctx, &node.ID) if err != nil { if errors.Is(err, sql.ErrNoRows) { - slog.Warn("license release failed - claimed license not found", "Fingerprint", node.Fingerprint) + slog.Warn("license release failed - claimed license not found", "nodeFingerprint", node.Fingerprint) return &LicenseOperationResult{Status: OperationStatusNotFound}, nil } @@ -319,6 +325,7 @@ func (m *manager) ReleaseLicense(ctx context.Context, fingerprint string) (*Lice } slog.Info("license released successfully", "licenseID", claimedLicense.ID) + return &LicenseOperationResult{Status: OperationStatusSuccess}, nil } @@ -326,24 +333,24 @@ func (m *manager) Config() *Config { return m.config } -func (m *manager) fetchOrCreateNode(ctx context.Context, store db.Store, fingerprint string) (*db.Node, error) { +func (m *manager) findOrCreateNode(ctx context.Context, store db.Store, fingerprint string) (*db.Node, error) { node, err := store.GetNodeByFingerprint(ctx, fingerprint) if err != nil { if errors.Is(err, sql.ErrNoRows) { node, err = store.InsertNode(ctx, fingerprint) if err != nil { - slog.Error("failed to insert node", "Fingerprint", fingerprint, "error", err) + slog.Error("failed to insert node", "nodeFingerprint", fingerprint, "error", err) return nil, fmt.Errorf("failed to insert node: %w", err) } if m.config.EnabledAudit { - if err := store.InsertAuditLog(ctx, db.EventTypeNodeActivated, db.EntityTypeNode, strconv.FormatInt(node.ID, 10)); err != nil { - slog.Warn("failed to insert audit log", "nodeID", node.ID, "Fingerprint", node.Fingerprint, "error", err) + if err := store.InsertAuditLog(ctx, db.EventTypeNodeActivated, db.EntityTypeNode, node.Fingerprint); err != nil { + slog.Warn("failed to insert audit log", "nodeID", node.ID, "nodeFingerprint", node.Fingerprint, "error", err) } } } else { - slog.Error("failed to fetch node", "Fingerprint", fingerprint, "error", err) + slog.Error("failed to find node", "nodeFingerprint", fingerprint, "error", err) return nil, fmt.Errorf("failed to fetch node: %w", err) } @@ -352,19 +359,6 @@ func (m *manager) fetchOrCreateNode(ctx context.Context, store db.Store, fingerp return node, nil } -func (m *manager) selectLicenseClaimStrategy(ctx context.Context, store db.Store, nodeID *int64) (*db.License, error) { - switch m.config.Strategy { - case "fifo": - return store.ClaimUnclaimedLicenseFIFO(ctx, nodeID) - case "lifo": - return store.ClaimUnclaimedLicenseLIFO(ctx, nodeID) - case "rand": - return store.ClaimUnclaimedLicenseRandom(ctx, nodeID) - default: - return store.ClaimUnclaimedLicenseFIFO(ctx, nodeID) - } -} - func (m *manager) CullInactiveNodes(ctx context.Context, ttl time.Duration) error { tx, err := m.store.BeginTx(ctx) if err != nil { @@ -373,32 +367,52 @@ func (m *manager) CullInactiveNodes(ctx context.Context, ttl time.Duration) erro qtx := m.store.WithTx(tx) defer tx.Rollback() - releasedLicenses, err := qtx.ReleaseLicensesFromInactiveNodes(ctx, ttl) + licenses, err := qtx.ReleaseLicensesFromInactiveNodes(ctx, ttl) if err != nil { slog.Error("failed to release licenses from inactive nodes", "error", err) + return err } - if err := qtx.DeleteInactiveNodes(ctx, ttl); err != nil { + // FIXME(ezekg) soft-delete i.e. deactivate nodes? add config? + nodes, err := qtx.DeleteInactiveNodes(ctx, ttl) + if err != nil { slog.Error("failed to delete inactive nodes", "error", err) + return err } if err := tx.Commit(); err != nil { slog.Error("failed to commit transaction", "error", err) + return err } if m.config.EnabledAudit { - for _, lic := range releasedLicenses { - if err := m.store.InsertAuditLog(ctx, db.EventTypeNodeCulled, db.EntityTypeLicense, lic.ID); err != nil { - slog.Error("failed to insert audit log", "licenseID", lic.ID, "error", err) - } + var logs []db.BulkInsertAuditLogParams + + for _, license := range licenses { + logs = append(logs, db.BulkInsertAuditLogParams{ + EventTypeID: db.EventTypeLicenseLeaseExpired, + EntityTypeID: db.EntityTypeLicense, + EntityID: license.ID, + }) + } + + for _, node := range nodes { + logs = append(logs, db.BulkInsertAuditLogParams{ + EventTypeID: db.EventTypeNodeCulled, + EntityTypeID: db.EntityTypeNode, + EntityID: node.Fingerprint, + }) + } + + if err := m.store.BulkInsertAuditLogs(ctx, logs); err != nil { + slog.Warn("failed to insert audit logs", "error", err) } } - licenseCount := len(releasedLicenses) - slog.Debug("successfully released licenses and deleted inactive nodes", "count", licenseCount) + slog.Debug("successfully released licenses and culled inactive nodes", "licenseCount", len(licenses), "nodeCount", len(nodes)) return nil }