Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-15394: Generate activity log precomputed queries #20778

Merged
merged 2 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions vault/logical_system_activity_write_testonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package vault
import (
"context"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -320,7 +321,23 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := timeutil.StartOfMonth(time.Now().UTC())
paths := []string{}

_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]

pqOpts := pqOptions{}
if writePQ || writeDistinctClients {
pqOpts.byNamespace = make(map[string]*processByNamespace)
pqOpts.byMonth = make(map[int64]*processMonth)
pqOpts.activePeriodEnd = m.latestTimestamp(now)
pqOpts.endTime = timeutil.EndOfMonth(pqOpts.activePeriodEnd)
pqOpts.activePeriodStart = m.earliestTimestamp(now)
}

for i, month := range m.months {
if month.generationParameters == nil {
continue
}
var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
Expand Down Expand Up @@ -349,6 +366,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
paths = append(paths, entityPath)
}
}

if writePQ || writeDistinctClients {
reader := newProtoSegmentReader(segments)
err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts)
if err != nil {
return nil, err
}
}
}
wg := sync.WaitGroup{}
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
Expand All @@ -358,6 +383,25 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
return paths, nil
}

func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time) time.Time {
for i, month := range m.months {
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func (m *multipleMonthsActivityClients) earliestTimestamp(now time.Time) time.Time {
for i := len(m.months) - 1; i >= 0; i-- {
month := m.months[i]
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivityClients {
m := &multipleMonthsActivityClients{
months: make([]*singleMonthActivityClients, numberOfMonths),
Expand All @@ -369,3 +413,34 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit
}
return m
}

func newProtoSegmentReader(segments map[int][]*activity.EntityRecord) SegmentReader {
allRecords := make([][]*activity.EntityRecord, 0, len(segments))
for _, records := range segments {
if segments == nil {
continue
}
allRecords = append(allRecords, records)
}
return &sliceSegmentReader{
records: allRecords,
}
}

type sliceSegmentReader struct {
records [][]*activity.EntityRecord
i int
}

func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) {
return nil, io.EOF
}

func (p *sliceSegmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) {
if p.i == len(p.records) {
return nil, io.EOF
}
record := p.records[p.i]
p.i++
return &activity.EntityActivityLog{Clients: record}, nil
}
261 changes: 146 additions & 115 deletions vault/logical_system_activity_write_testonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
"github.com/hashicorp/vault/vault/activity/generation"
Expand Down Expand Up @@ -441,136 +443,165 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) {
}
}

// Test_multipleMonthsActivityClients_write_entities writes 4 months of data
// splitting some months across segments and using empty segments and skipped
// segments. Entities are written and then storage is queried. The test verifies
// that the correct timestamps are present in the activity log and that the correct
// segment numbers for each month contain the correct number of clients
func Test_multipleMonthsActivityClients_write_entities(t *testing.T) {
// Test_handleActivityWriteData writes 4 months of data splitting some months
// across segments and using empty segments and skipped segments. Entities and
// precomputed queries are written. written and then storage is queried. The
// test verifies that the correct timestamps are present in the activity log and
// that the correct segment numbers for each month contain the correct number of
// clients
func Test_handleActivityWriteData(t *testing.T) {
index5 := int32(5)
index4 := int32(4)
data := &generation.ActivityLogMockInput{
Write: []generation.WriteOptions{
generation.WriteOptions_WRITE_ENTITIES,
data := []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
Data: []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
},
}

core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(data)
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)
t.Run("write entitites", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_ENTITIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)

times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)
times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)

sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}
sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}

month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
return segments
}
return segments
}
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)

month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)
month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
})
t.Run("write precomputed queries", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
_, err = core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)
queries, err := core.activityLog.queryStore.QueriesAvailable(context.Background())
require.NoError(t, err)
require.True(t, queries)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
now := time.Now().UTC()
start := timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now))
end := timeutil.EndOfMonth(now)
pq, err := core.activityLog.queryStore.Get(context.Background(), start, end)
require.NoError(t, err)
require.NotNil(t, pq)
require.Equal(t, end, pq.EndTime)
require.Equal(t, start, pq.StartTime)
require.Len(t, pq.Namespaces, 1)
require.Equal(t, uint64(12), pq.Namespaces[0].Entities)
require.Len(t, pq.Months, 4)
})
}