From a48ee9dd4231d8b8815cef93556ef0974fe8bd3d Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Mon, 27 Jun 2022 13:07:34 -0700 Subject: [PATCH 1/2] activity log refactoring port --- vault/activity_log.go | 721 +++++++++++++----------------- vault/activity_log_util_common.go | 120 +++++ 2 files changed, 424 insertions(+), 417 deletions(-) create mode 100644 vault/activity_log_util_common.go diff --git a/vault/activity_log.go b/vault/activity_log.go index 0af4fc4ed8a2..b5cac367bc36 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -1504,22 +1504,15 @@ type ResponseNamespace struct { } type ResponseMonth struct { - Timestamp string `json:"timestamp"` - Counts *ResponseCounts `json:"counts"` - Namespaces []*ResponseMonthlyNamespace `json:"namespaces"` - NewClients *ResponseNewClients `json:"new_clients" mapstructure:"new_clients"` + Timestamp string `json:"timestamp"` + Counts *ResponseCounts `json:"counts"` + Namespaces []*ResponseNamespace `json:"namespaces"` + NewClients *ResponseNewClients `json:"new_clients" mapstructure:"new_clients"` } type ResponseNewClients struct { - Counts *ResponseCounts `json:"counts"` - Namespaces []*ResponseMonthlyNamespace `json:"namespaces"` -} - -type ResponseMonthlyNamespace struct { - NamespaceID string `json:"namespace_id"` - NamespacePath string `json:"namespace_path"` - Counts *ResponseCounts `json:"counts"` - Mounts []*ResponseMount `json:"mounts"` + Counts *ResponseCounts `json:"counts"` + Namespaces []*ResponseNamespace `json:"namespaces"` } type ResponseMount struct { @@ -1557,12 +1550,19 @@ func (a *ActivityLog) DefaultStartTime(endTime time.Time) time.Time { } func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.Time, limitNamespaces int) (map[string]interface{}, error) { - queryNS, err := namespace.FromContext(ctx) - if err != nil { - return nil, err - } - - pq, err := a.queryStore.Get(ctx, startTime, endTime) + var computePartial bool + // If the endTime of the query is the current month, request data from the queryStore + // with the endTime equal to the end of the last month, and add in the current month + // data. + precomputedQueryEndTime := endTime + if timeutil.IsCurrentMonth(endTime, time.Now().UTC()) { + precomputedQueryEndTime = timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, timeutil.StartOfMonth(endTime))) + computePartial = true + } + + // From the precomputed queries stored in the queryStore (computed at the end of each month) + // get the query associated with the start and end time specified + pq, err := a.queryStore.Get(ctx, startTime, precomputedQueryEndTime) if err != nil { return nil, err } @@ -1570,207 +1570,80 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T return nil, nil } - responseData := make(map[string]interface{}) - responseData["start_time"] = pq.StartTime.Format(time.RFC3339) - responseData["end_time"] = pq.EndTime.Format(time.RFC3339) - byNamespace := make([]*ResponseNamespace, 0) - - totalEntities := 0 - totalTokens := 0 + // Calculate the namespace response breakdowns and totals for entities and tokens from the initial + // namespace data. + totalEntities, totalTokens, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, pq.Namespaces) + if err != nil { + return nil, err + } - for _, nsRecord := range pq.Namespaces { - ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core) + // If we need to add the current month's client counts into the total, compute the namespace + // breakdown for the current month as well. + var partialByMonth map[int64]*processMonth + var partialByNamespace map[string]*processByNamespace + var totalEntitiesCurrent int + var totalTokensCurrent int + var byNamespaceResponseCurrent []*ResponseNamespace + if computePartial { + // Traverse through current month's activitylog data and group clients + // into months and namespaces + partialByMonth, partialByNamespace = a.populateNamespaceAndMonthlyBreakdowns() + + // Convert the byNamespace breakdowns into structs that are + // consumable by the /activity endpoint, so as to reuse code between these two + // endpoints. + byNamespaceComputation := a.transformALNamespaceBreakdowns(partialByNamespace) + + // Calculate the namespace response breakdowns and totals for entities and tokens from the initial + // namespace data. + totalEntitiesCurrent, totalTokensCurrent, byNamespaceResponseCurrent, err = a.calculateByNamespaceResponseForQuery(ctx, byNamespaceComputation) if err != nil { return nil, err } - if a.includeInResponse(queryNS, ns) { - mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts)) - for _, mountRecord := range nsRecord.Mounts { - mountResponse = append(mountResponse, &ResponseMount{ - MountPath: mountRecord.MountPath, - Counts: &ResponseCounts{ - DistinctEntities: int(mountRecord.Counts.EntityClients), - EntityClients: int(mountRecord.Counts.EntityClients), - NonEntityClients: int(mountRecord.Counts.NonEntityClients), - NonEntityTokens: int(mountRecord.Counts.NonEntityClients), - Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients), - }, - }) - } - // Sort the mounts in descending order of usage - sort.Slice(mountResponse, func(i, j int) bool { - return mountResponse[i].Counts.Clients > mountResponse[j].Counts.Clients - }) - - var displayPath string - if ns == nil { - displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID) - } else { - displayPath = ns.Path - } - byNamespace = append(byNamespace, &ResponseNamespace{ - NamespaceID: nsRecord.NamespaceID, - NamespacePath: displayPath, - Counts: ResponseCounts{ - DistinctEntities: int(nsRecord.Entities), - EntityClients: int(nsRecord.Entities), - NonEntityTokens: int(nsRecord.NonEntityTokens), - NonEntityClients: int(nsRecord.NonEntityTokens), - Clients: int(nsRecord.Entities + nsRecord.NonEntityTokens), - }, - Mounts: mountResponse, - }) - totalEntities += int(nsRecord.Entities) - totalTokens += int(nsRecord.NonEntityTokens) - } + // Add the current month's namespace data the precomputed query namespaces + byNamespaceResponse = append(byNamespaceResponse, byNamespaceResponseCurrent...) } + // Sort clients within each namespace + a.sortALResponseNamespaces(byNamespaceResponse) - sort.Slice(byNamespace, func(i, j int) bool { - return byNamespace[i].Counts.Clients > byNamespace[j].Counts.Clients - }) if limitNamespaces > 0 { - if limitNamespaces > len(byNamespace) { - limitNamespaces = len(byNamespace) - } - byNamespace = byNamespace[:limitNamespaces] - // recalculate total entities and tokens - totalEntities = 0 - totalTokens = 0 - for _, namespaceData := range byNamespace { - totalEntities += namespaceData.Counts.DistinctEntities - totalTokens += namespaceData.Counts.NonEntityTokens - } - } - - responseData["by_namespace"] = byNamespace - responseData["total"] = &ResponseCounts{ - DistinctEntities: totalEntities, - EntityClients: totalEntities, - NonEntityTokens: totalTokens, - NonEntityClients: totalTokens, - Clients: totalEntities + totalTokens, + totalEntities, totalTokens, byNamespaceResponse = a.limitNamespacesInALResponse(byNamespaceResponse, limitNamespaces) } - prepareNSResponse := func(nsRecords []*activity.MonthlyNamespaceRecord) ([]*ResponseMonthlyNamespace, error) { - nsResponse := make([]*ResponseMonthlyNamespace, 0, len(nsRecords)) - for _, nsRecord := range nsRecords { - if int(nsRecord.Counts.EntityClients) == 0 && int(nsRecord.Counts.NonEntityClients) == 0 { - continue - } - - ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core) - if err != nil { - return nil, err - } - if a.includeInResponse(queryNS, ns) { - mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts)) - for _, mountRecord := range nsRecord.Mounts { - if int(mountRecord.Counts.EntityClients) == 0 && int(mountRecord.Counts.NonEntityClients) == 0 { - continue - } - - mountResponse = append(mountResponse, &ResponseMount{ - MountPath: mountRecord.MountPath, - Counts: &ResponseCounts{ - EntityClients: int(mountRecord.Counts.EntityClients), - NonEntityClients: int(mountRecord.Counts.NonEntityClients), - Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients), - }, - }) - } - - var displayPath string - if ns == nil { - displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID) - } else { - displayPath = ns.Path - } - nsResponse = append(nsResponse, &ResponseMonthlyNamespace{ - NamespaceID: nsRecord.NamespaceID, - NamespacePath: displayPath, - Counts: &ResponseCounts{ - EntityClients: int(nsRecord.Counts.EntityClients), - NonEntityClients: int(nsRecord.Counts.NonEntityClients), - Clients: int(nsRecord.Counts.EntityClients + nsRecord.Counts.NonEntityClients), - }, - Mounts: mountResponse, - }) - } + distinctEntitiesResponse := totalEntities + if computePartial { + currentMonth, err := a.computeCurrentMonthForBillingPeriod(partialByMonth, startTime, endTime) + if err != nil { + return nil, err } - return nsResponse, nil + pq.Months = append(pq.Months, currentMonth) + distinctEntitiesResponse += pq.Months[len(pq.Months)-1].NewClients.Counts.EntityClients } - months := make([]*ResponseMonth, 0, len(pq.Months)) - for _, monthsRecord := range pq.Months { - newClientsResponse := &ResponseNewClients{} - if int(monthsRecord.NewClients.Counts.EntityClients+monthsRecord.NewClients.Counts.NonEntityClients) != 0 { - newClientsNSResponse, err := prepareNSResponse(monthsRecord.NewClients.Namespaces) - if err != nil { - return nil, err - } - newClientsResponse.Counts = &ResponseCounts{ - EntityClients: int(monthsRecord.NewClients.Counts.EntityClients), - NonEntityClients: int(monthsRecord.NewClients.Counts.NonEntityClients), - Clients: int(monthsRecord.NewClients.Counts.EntityClients + monthsRecord.NewClients.Counts.NonEntityClients), - } - newClientsResponse.Namespaces = newClientsNSResponse - } - - monthResponse := &ResponseMonth{ - Timestamp: time.Unix(monthsRecord.Timestamp, 0).UTC().Format(time.RFC3339), - } - if int(monthsRecord.Counts.EntityClients+monthsRecord.Counts.NonEntityClients) != 0 { - nsResponse, err := prepareNSResponse(monthsRecord.Namespaces) - if err != nil { - return nil, err - } - monthResponse.Counts = &ResponseCounts{ - EntityClients: int(monthsRecord.Counts.EntityClients), - NonEntityClients: int(monthsRecord.Counts.NonEntityClients), - Clients: int(monthsRecord.Counts.EntityClients + monthsRecord.Counts.NonEntityClients), - } - monthResponse.Namespaces = nsResponse - monthResponse.NewClients = newClientsResponse - months = append(months, monthResponse) - } + // Now populate the response based on breakdowns. + responseData := make(map[string]interface{}) + responseData["start_time"] = pq.StartTime.Format(time.RFC3339) + responseData["end_time"] = pq.EndTime.Format(time.RFC3339) + responseData["by_namespace"] = byNamespaceResponse + responseData["total"] = &ResponseCounts{ + DistinctEntities: distinctEntitiesResponse, + EntityClients: totalEntities + totalEntitiesCurrent, + NonEntityTokens: totalTokens + totalTokensCurrent, + NonEntityClients: totalTokens + totalTokensCurrent, + Clients: totalEntities + totalEntitiesCurrent + totalTokens + totalTokensCurrent, } - // Sort the months in ascending order of timestamps - sort.Slice(months, func(i, j int) bool { - firstTimestamp, errOne := time.Parse(time.RFC3339, months[i].Timestamp) - secondTimestamp, errTwo := time.Parse(time.RFC3339, months[j].Timestamp) - if errOne == nil && errTwo == nil { - return firstTimestamp.Before(secondTimestamp) - } - // Keep the nondeterministic ordering in storage - a.logger.Error("unable to parse activity log timestamps", "timestamp", - months[i].Timestamp, "error", errOne, "timestamp", months[j].Timestamp, "error", errTwo) - return i < j - }) - - // Within each month sort everything by descending order of activity - for _, month := range months { - sort.Slice(month.Namespaces, func(i, j int) bool { - return month.Namespaces[i].Counts.Clients > month.Namespaces[j].Counts.Clients - }) - - for _, ns := range month.Namespaces { - sort.Slice(ns.Mounts, func(i, j int) bool { - return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients - }) - } + // Create and populate the month response structs based on the monthly breakdown. + months, err := a.prepareMonthsResponseForQuery(ctx, pq.Months) + if err != nil { + return nil, err + } - sort.Slice(month.NewClients.Namespaces, func(i, j int) bool { - return month.NewClients.Namespaces[i].Counts.Clients > month.NewClients.Namespaces[j].Counts.Clients - }) + // Sort the months and clients within each month before adding the months to the response + a.sortActivityLogMonthsResponse(months) - for _, ns := range month.NewClients.Namespaces { - sort.Slice(ns.Mounts, func(i, j int) bool { - return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients - }) - } - } + // Modify the final month output to make response more consumable based on API request months = modifyResponseMonths(months, startTime, endTime) responseData["months"] = months @@ -1937,40 +1810,28 @@ func newByNamespace() *processByNamespace { } } -type processNamespace struct { - Counts *processCounts - Mounts map[string]*processMount -} - -func newProcessNamespace() *processNamespace { - return &processNamespace{ - Counts: newProcessCounts(), - Mounts: make(map[string]*processMount), - } -} - type processNewClients struct { Counts *processCounts - Namespaces map[string]*processNamespace + Namespaces map[string]*processByNamespace } func newProcessNewClients() *processNewClients { return &processNewClients{ Counts: newProcessCounts(), - Namespaces: make(map[string]*processNamespace), + Namespaces: make(map[string]*processByNamespace), } } type processMonth struct { Counts *processCounts - Namespaces map[string]*processNamespace + Namespaces map[string]*processByNamespace NewClients *processNewClients } func newProcessMonth() *processMonth { return &processMonth{ Counts: newProcessCounts(), - Namespaces: make(map[string]*processNamespace), + Namespaces: make(map[string]*processByNamespace), NewClients: newProcessNewClients(), } } @@ -2000,7 +1861,7 @@ func processClientRecord(e *activity.EntityRecord, byNamespace map[string]*proce } if _, present := byMonth[monthTimestamp].Namespaces[e.NamespaceID]; !present { - byMonth[monthTimestamp].Namespaces[e.NamespaceID] = newProcessNamespace() + byMonth[monthTimestamp].Namespaces[e.NamespaceID] = newByNamespace() } if _, present := byMonth[monthTimestamp].Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; !present { @@ -2008,7 +1869,7 @@ func processClientRecord(e *activity.EntityRecord, byNamespace map[string]*proce } if _, present := byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID]; !present { - byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID] = newProcessNamespace() + byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID] = newByNamespace() } if _, present := byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; !present { @@ -2230,68 +2091,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { Namespaces: make([]*activity.NamespaceRecord, 0, len(byNamespace)), Months: make([]*activity.MonthRecord, 0, len(byMonth)), } - - processNamespaces := func(nsMap map[string]*processNamespace) []*activity.MonthlyNamespaceRecord { - nsRecord := make([]*activity.MonthlyNamespaceRecord, 0, len(nsMap)) - for nsID, nsData := range nsMap { - // Process mount specific data within a namespace within a given month - mountRecord := make([]*activity.MountRecord, 0, len(nsMap[nsID].Mounts)) - for mountAccessor, mountData := range nsMap[nsID].Mounts { - var displayPath string - if mountAccessor == "" { - displayPath = "no mount accessor (pre-1.10 upgrade?)" - } else { - valResp := a.core.router.ValidateMountByAccessor(mountAccessor) - if valResp == nil { - displayPath = fmt.Sprintf("deleted mount; accessor %q", mountAccessor) - } else { - displayPath = valResp.MountPath - } - } - - mountRecord = append(mountRecord, &activity.MountRecord{ - MountPath: displayPath, - Counts: &activity.CountsRecord{ - EntityClients: len(mountData.Counts.Entities), - NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities), - }, - }) - } - - // Process ns specific data within a given month - nsRecord = append(nsRecord, &activity.MonthlyNamespaceRecord{ - NamespaceID: nsID, - Counts: &activity.CountsRecord{ - EntityClients: len(nsData.Counts.Entities), - NonEntityClients: int(nsData.Counts.Tokens) + len(nsData.Counts.NonEntities), - }, - Mounts: mountRecord, - }) - } - return nsRecord - } - - for timestamp, monthData := range byMonth { - newClientsNSRecord := processNamespaces(monthData.NewClients.Namespaces) - newClientRecord := &activity.NewClientRecord{ - Counts: &activity.CountsRecord{ - EntityClients: len(monthData.NewClients.Counts.Entities), - NonEntityClients: int(monthData.NewClients.Counts.Tokens) + len(monthData.NewClients.Counts.NonEntities), - }, - Namespaces: newClientsNSRecord, - } - - // Process all the months - pq.Months = append(pq.Months, &activity.MonthRecord{ - Timestamp: timestamp, - Counts: &activity.CountsRecord{ - EntityClients: len(monthData.Counts.Entities), - NonEntityClients: int(monthData.Counts.Tokens) + len(monthData.Counts.NonEntities), - }, - Namespaces: processNamespaces(monthData.Namespaces), - NewClients: newClientRecord, - }) - } + pq.Months = a.transformMonthBreakdowns(byMonth) for nsID, entry := range byNamespace { mountRecord := make([]*activity.MountRecord, 0, len(entry.Mounts)) @@ -2445,211 +2245,298 @@ func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.Ga return a.PartialMonthMetrics(ctx) } -// partialMonthClientCount returns the number of clients used so far this month. -// If activity log is not enabled, the response will be nil -func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) { - a.fragmentLock.RLock() - defer a.fragmentLock.RUnlock() - - if !a.enabled { - // nothing to count - return nil, nil - } - +// populateNamespaceAndMonthlyBreakdowns traverses the partial month data +// stored in memory and groups them by months and namespaces. +func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*processMonth, map[string]*processByNamespace) { // Parse the monthly clients and prepare the breakdowns. byNamespace := make(map[string]*processByNamespace) byMonth := make(map[int64]*processMonth) for _, e := range a.partialMonthClientTracker { processClientRecord(e, byNamespace, byMonth, time.Now()) } + return byMonth, byNamespace +} - queryNS, err := namespace.FromContext(ctx) - if err != nil { - return nil, err +func (a *ActivityLog) transformMonthBreakdowns(byMonth map[int64]*processMonth) []*activity.MonthRecord { + monthly := make([]*activity.MonthRecord, 0) + processByNamespaces := func(nsMap map[string]*processByNamespace) []*activity.MonthlyNamespaceRecord { + nsRecord := make([]*activity.MonthlyNamespaceRecord, 0, len(nsMap)) + for nsID, nsData := range nsMap { + // Process mount specific data within a namespace within a given month + mountRecord := make([]*activity.MountRecord, 0, len(nsMap[nsID].Mounts)) + for mountAccessor, mountData := range nsMap[nsID].Mounts { + var displayPath string + if mountAccessor == "" { + displayPath = "no mount accessor (pre-1.10 upgrade?)" + } else { + valResp := a.core.router.ValidateMountByAccessor(mountAccessor) + if valResp == nil { + displayPath = fmt.Sprintf("deleted mount; accessor %q", mountAccessor) + } else { + displayPath = valResp.MountPath + } + } + + mountRecord = append(mountRecord, &activity.MountRecord{ + MountPath: displayPath, + Counts: &activity.CountsRecord{ + EntityClients: len(mountData.Counts.Entities), + NonEntityClients: int(mountData.Counts.Tokens) + len(mountData.Counts.NonEntities), + }, + }) + } + + // Process ns specific data within a given month + nsRecord = append(nsRecord, &activity.MonthlyNamespaceRecord{ + NamespaceID: nsID, + Counts: &activity.CountsRecord{ + EntityClients: len(nsData.Counts.Entities), + NonEntityClients: int(nsData.Counts.Tokens) + len(nsData.Counts.NonEntities), + }, + Mounts: mountRecord, + }) + } + return nsRecord } + for timestamp, monthData := range byMonth { + newClientsNSRecord := processByNamespaces(monthData.NewClients.Namespaces) + newClientRecord := &activity.NewClientRecord{ + Counts: &activity.CountsRecord{ + EntityClients: len(monthData.NewClients.Counts.Entities), + NonEntityClients: int(monthData.NewClients.Counts.Tokens) + len(monthData.NewClients.Counts.NonEntities), + }, + Namespaces: newClientsNSRecord, + } - // Now populate the response based on breakdowns. - responseData := make(map[string]interface{}) + // Process all the months + monthly = append(monthly, &activity.MonthRecord{ + Timestamp: timestamp, + Counts: &activity.CountsRecord{ + EntityClients: len(monthData.Counts.Entities), + NonEntityClients: int(monthData.Counts.Tokens) + len(monthData.Counts.NonEntities), + }, + Namespaces: processByNamespaces(monthData.Namespaces), + NewClients: newClientRecord, + }) + } + return monthly +} +func (a *ActivityLog) calculateByNamespaceResponseForQuery(ctx context.Context, byNamespace []*activity.NamespaceRecord) (int, int, []*ResponseNamespace, error) { + queryNS, err := namespace.FromContext(ctx) + if err != nil { + return 0, 0, nil, err + } byNamespaceResponse := make([]*ResponseNamespace, 0) totalEntities := 0 totalTokens := 0 - for nsID, nsRecord := range byNamespace { - ns, err := NamespaceByID(ctx, nsID, a.core) + for _, nsRecord := range byNamespace { + ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core) if err != nil { - return nil, err + return 0, 0, nil, err } - if a.includeInResponse(queryNS, ns) { mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts)) - for mountPath, mountRecord := range nsRecord.Mounts { + for _, mountRecord := range nsRecord.Mounts { mountResponse = append(mountResponse, &ResponseMount{ - MountPath: mountPath, + MountPath: mountRecord.MountPath, Counts: &ResponseCounts{ - EntityClients: len(mountRecord.Counts.Entities), - NonEntityClients: len(mountRecord.Counts.NonEntities), - Clients: len(mountRecord.Counts.Entities) + len(mountRecord.Counts.NonEntities), + DistinctEntities: int(mountRecord.Counts.EntityClients), + EntityClients: int(mountRecord.Counts.EntityClients), + NonEntityClients: int(mountRecord.Counts.NonEntityClients), + NonEntityTokens: int(mountRecord.Counts.NonEntityClients), + Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients), }, }) } + // Sort the mounts in descending order of usage sort.Slice(mountResponse, func(i, j int) bool { return mountResponse[i].Counts.Clients > mountResponse[j].Counts.Clients }) var displayPath string if ns == nil { - displayPath = fmt.Sprintf("deleted namespace %q", nsID) + displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID) } else { displayPath = ns.Path } byNamespaceResponse = append(byNamespaceResponse, &ResponseNamespace{ - NamespaceID: nsID, + NamespaceID: nsRecord.NamespaceID, NamespacePath: displayPath, Counts: ResponseCounts{ - DistinctEntities: len(nsRecord.Counts.Entities), - EntityClients: len(nsRecord.Counts.Entities), - NonEntityTokens: len(nsRecord.Counts.NonEntities), - NonEntityClients: len(nsRecord.Counts.NonEntities), - Clients: len(nsRecord.Counts.Entities) + len(nsRecord.Counts.NonEntities), + DistinctEntities: int(nsRecord.Entities), + EntityClients: int(nsRecord.Entities), + NonEntityTokens: int(nsRecord.NonEntityTokens), + NonEntityClients: int(nsRecord.NonEntityTokens), + Clients: int(nsRecord.Entities + nsRecord.NonEntityTokens), }, Mounts: mountResponse, }) - totalEntities += len(nsRecord.Counts.Entities) - totalTokens += len(nsRecord.Counts.NonEntities) + totalEntities += int(nsRecord.Entities) + totalTokens += int(nsRecord.NonEntityTokens) } } + return totalEntities, totalTokens, byNamespaceResponse, nil +} - sort.Slice(byNamespaceResponse, func(i, j int) bool { - return byNamespaceResponse[i].Counts.Clients > byNamespaceResponse[j].Counts.Clients - }) - responseData["by_namespace"] = byNamespaceResponse - responseData["distinct_entities"] = totalEntities - responseData["entity_clients"] = totalEntities - responseData["non_entity_tokens"] = totalTokens - responseData["non_entity_clients"] = totalTokens - responseData["clients"] = totalEntities + totalTokens - +func (a *ActivityLog) prepareMonthsResponseForQuery(ctx context.Context, byMonth []*activity.MonthRecord) ([]*ResponseMonth, error) { months := make([]*ResponseMonth, 0, len(byMonth)) - prepareNSResponse := func(processedNamespaces map[string]*processNamespace) ([]*ResponseMonthlyNamespace, error) { - nsResponse := make([]*ResponseMonthlyNamespace, 0, len(processedNamespaces)) - for nsID, nsRecord := range processedNamespaces { - if len(nsRecord.Counts.Entities) == 0 && len(nsRecord.Counts.NonEntities) == 0 { - continue - } - - ns, err := NamespaceByID(ctx, nsID, a.core) - if err != nil { - return nil, err - } - if a.includeInResponse(queryNS, ns) { - mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts)) - for mountPath, mountRecord := range nsRecord.Mounts { - if len(mountRecord.Counts.Entities) == 0 && len(mountRecord.Counts.NonEntities) == 0 { - continue - } - - mountResponse = append(mountResponse, &ResponseMount{ - MountPath: mountPath, - Counts: &ResponseCounts{ - EntityClients: len(mountRecord.Counts.Entities), - NonEntityClients: len(mountRecord.Counts.NonEntities), - Clients: len(mountRecord.Counts.Entities) + len(mountRecord.Counts.NonEntities), - }, - }) - } - - var displayPath string - if ns == nil { - displayPath = fmt.Sprintf("deleted namespace %q", nsID) - } else { - displayPath = ns.Path - } - nsResponse = append(nsResponse, &ResponseMonthlyNamespace{ - NamespaceID: nsID, - NamespacePath: displayPath, - Counts: &ResponseCounts{ - EntityClients: len(nsRecord.Counts.Entities), - NonEntityClients: len(nsRecord.Counts.NonEntities), - Clients: len(nsRecord.Counts.Entities) + len(nsRecord.Counts.NonEntities), - }, - Mounts: mountResponse, - }) - } - } - return nsResponse, nil - } - - for timestamp, month := range byMonth { + for _, monthsRecord := range byMonth { newClientsResponse := &ResponseNewClients{} - if len(month.NewClients.Counts.Entities) != 0 || len(month.NewClients.Counts.NonEntities) != 0 { - newClientsNSResponse, err := prepareNSResponse(month.NewClients.Namespaces) + if int(monthsRecord.NewClients.Counts.EntityClients+monthsRecord.NewClients.Counts.NonEntityClients) != 0 { + newClientsNSResponse, err := a.prepareNamespaceResponse(ctx, monthsRecord.NewClients.Namespaces) if err != nil { return nil, err } newClientsResponse.Counts = &ResponseCounts{ - EntityClients: len(month.NewClients.Counts.Entities), - NonEntityClients: len(month.NewClients.Counts.NonEntities), - Clients: len(month.NewClients.Counts.Entities) + len(month.NewClients.Counts.NonEntities), + EntityClients: int(monthsRecord.NewClients.Counts.EntityClients), + NonEntityClients: int(monthsRecord.NewClients.Counts.NonEntityClients), + Clients: int(monthsRecord.NewClients.Counts.EntityClients + monthsRecord.NewClients.Counts.NonEntityClients), } newClientsResponse.Namespaces = newClientsNSResponse } - monthResponse := &ResponseMonth{} - if len(month.Counts.Entities) != 0 || len(month.Counts.NonEntities) != 0 { - nsResponse, err := prepareNSResponse(month.Namespaces) + monthResponse := &ResponseMonth{ + Timestamp: time.Unix(monthsRecord.Timestamp, 0).UTC().Format(time.RFC3339), + } + if int(monthsRecord.Counts.EntityClients+monthsRecord.Counts.NonEntityClients) != 0 { + nsResponse, err := a.prepareNamespaceResponse(ctx, monthsRecord.Namespaces) if err != nil { return nil, err } - - monthResponse.Timestamp = time.Unix(timestamp, 0).UTC().Format(time.RFC3339) monthResponse.Counts = &ResponseCounts{ - EntityClients: len(month.Counts.Entities), - NonEntityClients: len(month.Counts.NonEntities), - Clients: len(month.Counts.Entities) + len(month.Counts.NonEntities), + EntityClients: int(monthsRecord.Counts.EntityClients), + NonEntityClients: int(monthsRecord.Counts.NonEntityClients), + Clients: int(monthsRecord.Counts.EntityClients + monthsRecord.Counts.NonEntityClients), } monthResponse.Namespaces = nsResponse monthResponse.NewClients = newClientsResponse - months = append(months, monthResponse) } } + return months, nil +} - // Sort the months in ascending order of timestamps - sort.Slice(months, func(i, j int) bool { - firstTimestamp, errOne := time.Parse(time.RFC3339, months[i].Timestamp) - secondTimestamp, errTwo := time.Parse(time.RFC3339, months[j].Timestamp) - if errOne == nil && errTwo == nil { - return firstTimestamp.Before(secondTimestamp) +// prepareNamespaceResponse populates the namespace portion of the activity log response struct +// from +func (a *ActivityLog) prepareNamespaceResponse(ctx context.Context, nsRecords []*activity.MonthlyNamespaceRecord) ([]*ResponseNamespace, error) { + queryNS, err := namespace.FromContext(ctx) + if err != nil { + return nil, err + } + nsResponse := make([]*ResponseNamespace, 0, len(nsRecords)) + for _, nsRecord := range nsRecords { + if int(nsRecord.Counts.EntityClients) == 0 && int(nsRecord.Counts.NonEntityClients) == 0 { + continue } - // Keep the nondeterministic ordering in storage - a.logger.Error("unable to parse activity log timestamps for partial client count", - "timestamp", months[i].Timestamp, "error", errOne, "timestamp", months[j].Timestamp, "error", errTwo) - return i < j - }) - // Within each month sort everything by descending order of activity - for _, month := range months { - sort.Slice(month.Namespaces, func(i, j int) bool { - return month.Namespaces[i].Counts.Clients > month.Namespaces[j].Counts.Clients - }) + ns, err := NamespaceByID(ctx, nsRecord.NamespaceID, a.core) + if err != nil { + return nil, err + } + if a.includeInResponse(queryNS, ns) { + mountResponse := make([]*ResponseMount, 0, len(nsRecord.Mounts)) + for _, mountRecord := range nsRecord.Mounts { + if int(mountRecord.Counts.EntityClients) == 0 && int(mountRecord.Counts.NonEntityClients) == 0 { + continue + } - for _, ns := range month.Namespaces { - sort.Slice(ns.Mounts, func(i, j int) bool { - return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients + mountResponse = append(mountResponse, &ResponseMount{ + MountPath: mountRecord.MountPath, + Counts: &ResponseCounts{ + EntityClients: int(mountRecord.Counts.EntityClients), + NonEntityClients: int(mountRecord.Counts.NonEntityClients), + Clients: int(mountRecord.Counts.EntityClients + mountRecord.Counts.NonEntityClients), + }, + }) + } + + var displayPath string + if ns == nil { + displayPath = fmt.Sprintf("deleted namespace %q", nsRecord.NamespaceID) + } else { + displayPath = ns.Path + } + nsResponse = append(nsResponse, &ResponseNamespace{ + NamespaceID: nsRecord.NamespaceID, + NamespacePath: displayPath, + Counts: ResponseCounts{ + EntityClients: int(nsRecord.Counts.EntityClients), + NonEntityClients: int(nsRecord.Counts.NonEntityClients), + Clients: int(nsRecord.Counts.EntityClients + nsRecord.Counts.NonEntityClients), + }, + Mounts: mountResponse, }) } + } + return nsResponse, nil +} - sort.Slice(month.NewClients.Namespaces, func(i, j int) bool { - return month.NewClients.Namespaces[i].Counts.Clients > month.NewClients.Namespaces[j].Counts.Clients - }) +// partialMonthClientCount returns the number of clients used so far this month. +// If activity log is not enabled, the response will be nil +func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) { + a.fragmentLock.RLock() + defer a.fragmentLock.RUnlock() - for _, ns := range month.NewClients.Namespaces { - sort.Slice(ns.Mounts, func(i, j int) bool { - return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients - }) + if !a.enabled { + // nothing to count + return nil, nil + } + + // Traverse through current month's activitylog data and group clients + // into months and namespaces + byMonth, byNamespace := a.populateNamespaceAndMonthlyBreakdowns() + + // Convert the byNamespace breakdowns into structs that are + // consumable by the /activity endpoint, so as to reuse code between these two + // endpoints. + byNamespaceComputation := a.transformALNamespaceBreakdowns(byNamespace) + + // Calculate the namespace response breakdowns and totals for entities and tokens from the initial + // namespace data. + totalEntities, totalTokens, byNamespaceResponse, err := a.calculateByNamespaceResponseForQuery(ctx, byNamespaceComputation) + if err != nil { + return nil, err + } + + // Sort clients within each namespace + a.sortALResponseNamespaces(byNamespaceResponse) + + // Now populate the response based on breakdowns. + responseData := make(map[string]interface{}) + responseData["by_namespace"] = byNamespaceResponse + responseData["distinct_entities"] = totalEntities + responseData["entity_clients"] = totalEntities + responseData["non_entity_tokens"] = totalTokens + responseData["non_entity_clients"] = totalTokens + responseData["clients"] = totalEntities + totalTokens + + // The partialMonthClientCount should not have more than one month worth of data. + // If it does, something has gone wrong and we should warn that the activity log data + // might be inaccurate. + if len(byMonth) != 1 { + monthTimestamps := make([]string, 0) + for timestamp := range byMonth { + dateTimeString := time.Unix(timestamp, 0).UTC().Format(time.RFC3339) + monthTimestamps = append(monthTimestamps, dateTimeString) } + a.logger.Error("more or less than one month of data recorded in current month's activity log", "timestamps", monthTimestamps) + } + + // Convert the byMonth breakdowns into structs that are + // consumable by the /activity endpoint, so as to reuse code between these two + // endpoints. + monthlyComputation := a.transformMonthBreakdowns(byMonth) + + // Create and populate the month response structs based on the monthly breakdown. + months, err := a.prepareMonthsResponseForQuery(ctx, monthlyComputation) + if err != nil { + return nil, err } + + // Sort the months and clients within each month before adding the months to the response + a.sortActivityLogMonthsResponse(months) responseData["months"] = months return responseData, nil diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go new file mode 100644 index 000000000000..c12abace2e9f --- /dev/null +++ b/vault/activity_log_util_common.go @@ -0,0 +1,120 @@ +package vault + +import ( + "sort" + "time" + + "github.com/hashicorp/vault/vault/activity" +) + +// sortALResponseNamespaces sorts the namespaces for activity log responses. +func (a *ActivityLog) sortALResponseNamespaces(byNamespaceResponse []*ResponseNamespace) { + sort.Slice(byNamespaceResponse, func(i, j int) bool { + return byNamespaceResponse[i].Counts.Clients > byNamespaceResponse[j].Counts.Clients + }) +} + +// transformALNamespaceBreakdowns takes the namespace breakdowns stored in the intermediary +// struct used in precomputation segment traversal and to store the current month data and +// reorganizes it into query structs. This helper is used by the partial month endpoint so as to +// not have to maintain two separate response data computations for two separate APIs. +func (a *ActivityLog) transformALNamespaceBreakdowns(nsData map[string]*processByNamespace) []*activity.NamespaceRecord { + byNamespace := make([]*activity.NamespaceRecord, 0) + for nsID, ns := range nsData { + + nsRecord := activity.NamespaceRecord{ + NamespaceID: nsID, + Entities: uint64(len(ns.Counts.Entities)), + NonEntityTokens: uint64(len(ns.Counts.NonEntities) + int(ns.Counts.Tokens)), + Mounts: a.transformActivityLogMounts(ns.Mounts), + } + byNamespace = append(byNamespace, &nsRecord) + } + return byNamespace +} + +// limitNamespacesInALResponse will truncate the number of namespaces shown in the activity +// endpoints to the number specified in limitNamespaces (the API filtering parameter) +func (a *ActivityLog) limitNamespacesInALResponse(byNamespaceResponse []*ResponseNamespace, limitNamespaces int) (int, int, []*ResponseNamespace) { + if limitNamespaces > len(byNamespaceResponse) { + limitNamespaces = len(byNamespaceResponse) + } + byNamespaceResponse = byNamespaceResponse[:limitNamespaces] + // recalculate total entities and tokens + totalEntities := 0 + totalTokens := 0 + for _, namespaceData := range byNamespaceResponse { + totalEntities += namespaceData.Counts.DistinctEntities + totalTokens += namespaceData.Counts.NonEntityTokens + } + return totalEntities, totalTokens, byNamespaceResponse +} + +// transformActivityLogMounts is a helper used to reformat data for transformMonthlyNamespaceBreakdowns. +// For more details, please see the function comment for transformMonthlyNamespaceBreakdowns +func (a *ActivityLog) transformActivityLogMounts(mts map[string]*processMount) []*activity.MountRecord { + mounts := make([]*activity.MountRecord, 0) + for mountpath, mountCounts := range mts { + mount := activity.MountRecord{ + MountPath: mountpath, + Counts: &activity.CountsRecord{ + EntityClients: len(mountCounts.Counts.Entities), + NonEntityClients: len(mountCounts.Counts.NonEntities) + int(mountCounts.Counts.Tokens), + }, + } + mounts = append(mounts, &mount) + } + return mounts +} + +// sortActivityLogMonthsResponse contains the sorting logic for the months +// portion of the activity log response. +func (a *ActivityLog) sortActivityLogMonthsResponse(months []*ResponseMonth) { + // Sort the months in ascending order of timestamps + sort.Slice(months, func(i, j int) bool { + firstTimestamp, errOne := time.Parse(time.RFC3339, months[i].Timestamp) + secondTimestamp, errTwo := time.Parse(time.RFC3339, months[j].Timestamp) + if errOne == nil && errTwo == nil { + return firstTimestamp.Before(secondTimestamp) + } + // Keep the nondeterministic ordering in storage + a.logger.Error("unable to parse activity log timestamps", "timestamp", + months[i].Timestamp, "error", errOne, "timestamp", months[j].Timestamp, "error", errTwo) + return i < j + }) + + // Within each month sort everything by descending order of activity + for _, month := range months { + sort.Slice(month.Namespaces, func(i, j int) bool { + return month.Namespaces[i].Counts.Clients > month.Namespaces[j].Counts.Clients + }) + + for _, ns := range month.Namespaces { + sort.Slice(ns.Mounts, func(i, j int) bool { + return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients + }) + } + + sort.Slice(month.NewClients.Namespaces, func(i, j int) bool { + return month.NewClients.Namespaces[i].Counts.Clients > month.NewClients.Namespaces[j].Counts.Clients + }) + + for _, ns := range month.NewClients.Namespaces { + sort.Slice(ns.Mounts, func(i, j int) bool { + return ns.Mounts[i].Counts.Clients > ns.Mounts[j].Counts.Clients + }) + } + } +} + +// TODO +// computeCurrentMonthForBillingPeriod computes the current month's data with respect +// to a billing period. This function is currently a stub with the bare minimum amount +// of data to get the pre-existing tests to pass. It will be filled out in a separate PR +// and this comment will be removed. +func (a *ActivityLog) computeCurrentMonthForBillingPeriod(byMonth map[int64]*processMonth, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) { + return &activity.MonthRecord{ + NewClients: &activity.NewClientRecord{Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0}}, + Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0}, + }, nil +} From e58cb59df75e4a844ac1034c10bcc5c029a2b1b2 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Mon, 27 Jun 2022 13:10:26 -0700 Subject: [PATCH 2/2] changelog --- changelog/16162.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog/16162.txt diff --git a/changelog/16162.txt b/changelog/16162.txt new file mode 100644 index 000000000000..5e3c348eae46 --- /dev/null +++ b/changelog/16162.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/activity: refactor activity log api to reuse partial api functions in activity endpoint when current month is specified +``` \ No newline at end of file