Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1193 from hashicorp/f-pruning
Browse files Browse the repository at this point in the history
internal/server/singleprocess/state: Add ability to prune jobs and deployments
  • Loading branch information
evanphx authored Mar 19, 2021
2 parents d1e2cda + eb8db62 commit 0b97ac0
Show file tree
Hide file tree
Showing 11 changed files with 802 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changelog/1193.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
server: Prune old deployments and jobs from server memory. This limits the number
of deployments and jobs to 10,000. The data for the old entries is still stored on disk
but it is not indexed in memory, to allow data recovery should it be needed.
```
35 changes: 35 additions & 0 deletions internal/server/singleprocess/prune.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package singleprocess

import (
"context"
"sync"
"time"

"github.com/hashicorp/go-hclog"
)

func (s *service) runPrune(
ctx context.Context,
wg *sync.WaitGroup,
funclog hclog.Logger,
) {
defer wg.Done()

funclog.Info("starting")
defer funclog.Info("exiting")

tk := time.NewTicker(10 * time.Minute)
defer tk.Stop()

for {
select {
case <-ctx.Done():
return
case <-tk.C:
err := s.state.Prune()
if err != nil {
funclog.Error("error pruning data", "error", err)
}
}
}
}
5 changes: 5 additions & 0 deletions internal/server/singleprocess/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func New(opts ...Option) (pb.WaypointServer, error) {
s.bgWg.Add(1)
go s.runPollQueuer(s.bgCtx, &s.bgWg, log.Named("poll_queuer"))

// Start out state pruning background goroutine. This calls
// Prune on the state every 10 minutes.
s.bgWg.Add(1)
go s.runPrune(s.bgCtx, &s.bgWg, log.Named("prune"))

return &s, nil
}

Expand Down
77 changes: 74 additions & 3 deletions internal/server/singleprocess/state/app_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -41,6 +42,16 @@ type appOperation struct {
// Bucket is the global bucket for all records of this operation.
Bucket []byte

// The number of records that should be indexed off disk. This allows
// dormant records to remain on disk but not indexed.
MaximumIndexedRecords int

// This guards indexedRecords for manipulation during pruning
pruneMu sync.Mutex

// Holds how many records we've indexed at runtime.
indexedRecords int

// seq is the previous sequence number to set. This is initialized by the
// index init on server boot and `sync/atomic` should be used to increment
// it on each use.
Expand Down Expand Up @@ -87,6 +98,13 @@ func (op *appOperation) register() {
dbBuckets = append(dbBuckets, op.Bucket)
dbIndexers = append(dbIndexers, op.indexInit)
schemas = append(schemas, op.memSchema)

if op.MaximumIndexedRecords > 0 {
pruneFns = append(pruneFns, func(memTxn *memdb.Txn) (string, int, error) {
cnt, err := op.pruneOld(memTxn, op.MaximumIndexedRecords)
return op.memTableName(), cnt, err
})
}
}

// Put inserts or updates an operation record.
Expand Down Expand Up @@ -128,6 +146,23 @@ func (op *appOperation) Get(s *State, ref *pb.Ref_Operation) (interface{}, error
"unknown operation reference type: %T", ref.Target)
}

// Check if we are tracking this value in the indexes before returning
// it. When pruning, we leave the values on disk but remove them
// from the indexes.
raw, err := memTxn.First(
op.memTableName(),
opIdIndexName,
id,
)
if err != nil {
return err
}

if raw == nil {
return status.Errorf(codes.NotFound,
"value with given id not found: %s", id)
}

return op.dbGet(tx, []byte(id), result)
})
if err != nil {
Expand Down Expand Up @@ -469,7 +504,18 @@ func (op *appOperation) appSeq(ref *pb.Ref_Application) *uint64 {
// persisted on disk.
func (op *appOperation) indexInit(s *State, dbTxn *bolt.Tx, memTxn *memdb.Txn) error {
bucket := dbTxn.Bucket(op.Bucket)
return bucket.ForEach(func(k, v []byte) error {
c := bucket.Cursor()

var cnt int

// This algorithm depends on boltdb's iteration order. Specificly that the keys are
// lexically order AND because we're using ULID's for the keys in production, the newest
// records will have the higher lexical value and thusly be at the end of the database.
//
// So we just start at the end and insert records until we hit the maximum, knowing
// we'll be inserted the newest records.

for k, v := c.Last(); k != nil; k, v = c.Prev() {
result := op.newStruct()
if err := proto.Unmarshal(v, result); err != nil {
return err
Expand All @@ -489,8 +535,14 @@ func (op *appOperation) indexInit(s *State, dbTxn *bolt.Tx, memTxn *memdb.Txn) e
}
}

return nil
})
cnt++

if op.MaximumIndexedRecords > 0 && cnt >= op.MaximumIndexedRecords {
break
}
}

return nil
}

// indexPut writes an index record for a single operation record.
Expand Down Expand Up @@ -543,9 +595,28 @@ func (op *appOperation) indexPut(
StartTime: startTime,
CompleteTime: completeTime,
}

// If there is no maximum, don't track the record count.
if op.MaximumIndexedRecords != 0 {
op.pruneMu.Lock()
op.indexedRecords++
op.pruneMu.Unlock()
}

return rec, txn.Insert(op.memTableName(), rec)
}

func (op *appOperation) pruneOld(memTxn *memdb.Txn, max int) (int, error) {
return pruneOld(memTxn, pruneOp{
lock: &op.pruneMu,
table: op.memTableName(),
index: opIdIndexName,
indexArgs: []interface{}{""},
max: max,
cur: &op.indexedRecords,
})
}

func (op *appOperation) valueField(value interface{}, field string) interface{} {
fv := op.valueFieldReflect(value, field)
if !fv.IsValid() {
Expand Down
2 changes: 2 additions & 0 deletions internal/server/singleprocess/state/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
var deploymentOp = &appOperation{
Struct: (*pb.Deployment)(nil),
Bucket: []byte("deployment"),

MaximumIndexedRecords: 10000,
}

func init() {
Expand Down
45 changes: 45 additions & 0 deletions internal/server/singleprocess/state/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,53 @@ package state

import (
"testing"

pb "github.com/hashicorp/waypoint/internal/server/gen"
serverptypes "github.com/hashicorp/waypoint/internal/server/ptypes"
"github.com/stretchr/testify/require"
)

func TestDeployment(t *testing.T) {
deploymentOp.Test(t)
}

func TestDeploymentPrune(t *testing.T) {
t.Run("prunes old records", func(t *testing.T) {
require := require.New(t)

s := TestState(t)
defer s.Close()

require.NoError(s.DeploymentPut(false, serverptypes.TestValidDeployment(t, &pb.Deployment{
Id: "A",
})))

require.NoError(s.DeploymentPut(false, serverptypes.TestValidDeployment(t, &pb.Deployment{
Id: "B",
})))

require.NoError(s.DeploymentPut(false, serverptypes.TestValidDeployment(t, &pb.Deployment{
Id: "C",
})))

memTxn := s.inmem.Txn(true)
defer memTxn.Abort()

cnt, err := deploymentOp.pruneOld(memTxn, 2)
require.NoError(err)

memTxn.Commit()

require.Equal(1, cnt)
require.Equal(2, deploymentOp.indexedRecords)

dep, err := s.DeploymentGet(&pb.Ref_Operation{
Target: &pb.Ref_Operation_Id{
Id: "A",
},
})

require.Error(err)
require.Nil(dep)
})
}
70 changes: 58 additions & 12 deletions internal/server/singleprocess/state/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
jobQueueTimeIndexName = "queue-time"
jobTargetIdIndexName = "target-id"
jobSingletonIdIndexName = "singleton-id"

maximumJobsIndexed = 10000
)

func init() {
Expand Down Expand Up @@ -159,6 +161,17 @@ type jobIndex struct {
OutputBuffer *logbuffer.Buffer
}

// A helper, pulled out rather than on a value to allow it to be used against
// pb.Job,s and jobIndex's alike.
func jobIsCompleted(state pb.Job_State) bool {
switch state {
case pb.Job_ERROR, pb.Job_SUCCESS:
return true
default:
return false
}
}

// Job is the exported structure that is returned for most state APIs
// and gives callers access to more information than the pure job structure.
type Job struct {
Expand Down Expand Up @@ -848,26 +861,35 @@ func (s *State) JobIsAssignable(ctx context.Context, jobpb *pb.Job) (bool, error
// jobIndexInit initializes the config index from persisted data.
func (s *State) jobIndexInit(dbTxn *bolt.Tx, memTxn *memdb.Txn) error {
bucket := dbTxn.Bucket(jobBucket)
return bucket.ForEach(func(k, v []byte) error {
c := bucket.Cursor()

var cnt int

for k, v := c.Last(); k != nil; k, v = c.Prev() {
var value pb.Job
if err := proto.Unmarshal(v, &value); err != nil {
return err
}

idx, err := s.jobIndexSet(memTxn, k, &value)
if err != nil {
return err
}

// If the job was running or waiting, set it as assigned.
if value.State == pb.Job_RUNNING || value.State == pb.Job_WAITING {
if err := s.jobAssignedSet(memTxn, idx, true); err != nil {
// if we still have headroom for more indexed jobs OR the job hasn't finished yet,
// index it.
if cnt < maximumJobsIndexed || !jobIsCompleted(value.State) {
cnt++
idx, err := s.jobIndexSet(memTxn, k, &value)
if err != nil {
return err
}

// If the job was running or waiting, set it as assigned.
if value.State == pb.Job_RUNNING || value.State == pb.Job_WAITING {
if err := s.jobAssignedSet(memTxn, idx, true); err != nil {
return err
}
}
}
}

return nil
})
return nil
}

// jobIndexSet writes an index record for a single job.
Expand Down Expand Up @@ -1012,7 +1034,31 @@ func (s *State) jobCreate(dbTxn *bolt.Tx, memTxn *memdb.Txn, jobpb *pb.Job) erro

// Insert into the DB
_, err = s.jobIndexSet(memTxn, id, jobpb)
return err
if err != nil {
return err
}

s.pruneMu.Lock()
defer s.pruneMu.Unlock()

s.indexedJobs++

return nil
}

func (s *State) jobsPruneOld(memTxn *memdb.Txn, max int) (int, error) {
return pruneOld(memTxn, pruneOp{
lock: &s.pruneMu,
table: jobTableName,
index: jobQueueTimeIndexName,
indexArgs: []interface{}{pb.Job_QUEUED, time.Unix(0, 0)},
max: max,
cur: &s.indexedJobs,
check: func(raw interface{}) bool {
job := raw.(*jobIndex)
return !jobIsCompleted(job.State)
},
})
}

func (s *State) jobById(dbTxn *bolt.Tx, id string) (*pb.Job, error) {
Expand Down
Loading

0 comments on commit 0b97ac0

Please sign in to comment.