Skip to content
This repository has been archived by the owner on Jan 26, 2023. It is now read-only.

Commit

Permalink
Merge pull request #29 from asecurityteam/add-bulk-fetch
Browse files Browse the repository at this point in the history
Bulk paginated api for backfilling
  • Loading branch information
ensary authored Sep 19, 2019
2 parents 6ed0361 + 1507f33 commit 11d8fd0
Show file tree
Hide file tree
Showing 8 changed files with 624 additions and 45 deletions.
128 changes: 128 additions & 0 deletions api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,125 @@ paths:
#! if eq .Response.Body.errorType "InvalidInput" !# 400
#! else !# 500
#! end !#, "bodyPassthrough": true}'
/v1/cloud/asset:
get:
summary: "Retrieve a list of known cloud assets at point in time in reverse order by time (newest to oldest) split into pages of 'count' items."
parameters:
- name: "time"
in: "query"
description: "The point in time for the first page of results to fetch"
required: true
schema:
type: "string"
format: "date-time"
- name: "count"
in: "query"
description: "Maximum number of matching cloud assets to return per page. 100 by default"
required: false
schema:
type: "integer"
minimum: 1
default: 100
- name: "type"
in: "query"
description: "AWS resource type. Currently supported values: AWS::EC2::Instance, AWS::ElasticLoadBalancing::LoadBalancer, AWS::ElasticLoadBalancingV2::LoadBalancer"
required: true
schema:
type: "string"
responses:
200:
description: "First page of the list of all assets found at the given time, limited to count"
content:
application/json:
schema:
$ref: "#/components/schemas/BulkCloudAssets"
400:
description: "Invalid input"
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
404:
description: "The asset is not found"
x-transportd:
backend: app
enabled:
- "accesslog"
- "requestvalidation"
- "responsevalidation"
- "lambda"
lambda:
arn: "fetchAllAssetsByTime"
async: false
request: >
{
"time": "#!index .Request.Query.time 0!#",
"count": #!if .Request.Query.count !# #!index .Request.Query.count 0!# #! else !# 100 #! end !# ,
"type": "#!index .Request.Query.type 0!#"
}
success: '{"status": 200, "bodyPassthrough": true}'
error: >
{
"status":
#! if eq .Response.Body.errorType "InvalidInput" !# 400,
#! else !#
#! if eq .Response.Body.errorType "NotFound" !# 404,
#! else !# 500,
#! end !#
#! end !#
"bodyPassthrough": true
}
/v1/cloud/bulk/{PageToken}:
get:
summary: "Retrieve the next page of bulk cloud assets at point in time in reverse order by time (newest to oldest)"
parameters:
- name: "PageToken"
in: "path"
description: "The token for the page in the list provided by a previous cloud asset fetch call"
required: true
schema:
type: "string"
responses:
200:
description: "The page from the list of all assets found at the given time"
content:
application/json:
schema:
$ref: "#/components/schemas/BulkCloudAssets"
400:
description: "Invalid input"
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
404:
description: "The asset is not found"
x-transportd:
backend: app
enabled:
- "accesslog"
- "requestvalidation"
- "responsevalidation"
- "lambda"
lambda:
arn: "fetchMoreAssetsByPageToken"
async: false
request: >
{
"pageToken": "#!.Request.URL.PageToken!#"
}
success: '{"status": 200, "bodyPassthrough": true}'
error: >
{
"status":
#! if eq .Response.Body.errorType "InvalidInput" !# 400,
#! else !#
#! if eq .Response.Body.errorType "NotFound" !# 404,
#! else !# 500,
#! end !#
#! end !#
"bodyPassthrough": true
}
/v1/cloud/ip/{ipAddress}:
get:
summary: "Retrieve a cloud asset at a point in time by IP Address"
Expand Down Expand Up @@ -363,6 +482,15 @@ components:
enum: [ADDED, DELETED]
required:
- changeType
BulkCloudAssets:
type: object
properties:
nextPageToken:
type: string
response:
type: array
items:
$ref: "#/components/schemas/CloudAssetDetails"
CloudAssets:
type: object
properties:
Expand Down
24 changes: 18 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ func main() {
StatFn: domain.StatFromContext,
Fetcher: dbStorage,
}
fetchAllAssetsByTime := &v1.CloudFetchAllAssetsByTimeHandler{
LogFn: domain.LoggerFromContext,
StatFn: domain.StatFromContext,
Fetcher: dbStorage,
}
fetchAllAssetsByTimePage := &v1.CloudFetchAllAssetsByTimePageHandler{
LogFn: domain.LoggerFromContext,
StatFn: domain.StatFromContext,
Fetcher: dbStorage,
}
createPartition := &v1.CreatePartitionHandler{
LogFn: domain.LoggerFromContext,
Generator: dbStorage,
Expand All @@ -51,12 +61,14 @@ func main() {
Deleter: dbStorage,
}
handlers := map[string]serverfull.Function{
"insert": serverfull.NewFunction(insert.Handle),
"fetchByIP": serverfull.NewFunction(fetchByIP.Handle),
"fetchByHostname": serverfull.NewFunction(fetchByHostname.Handle),
"createPartition": serverfull.NewFunction(createPartition.Handle),
"getPartitions": serverfull.NewFunction(getPartitions.Handle),
"deletePartitions": serverfull.NewFunction(deletePartitions.Handle),
"insert": serverfull.NewFunction(insert.Handle),
"fetchByIP": serverfull.NewFunction(fetchByIP.Handle),
"fetchByHostname": serverfull.NewFunction(fetchByHostname.Handle),
"fetchAllAssetsByTime": serverfull.NewFunction(fetchAllAssetsByTime.Handle),
"fetchMoreAssetsByPageToken": serverfull.NewFunction(fetchAllAssetsByTimePage.Handle),
"createPartition": serverfull.NewFunction(createPartition.Handle),
"getPartitions": serverfull.NewFunction(getPartitions.Handle),
"deletePartitions": serverfull.NewFunction(deletePartitions.Handle),
}

fetcher := &serverfull.StaticFetcher{Functions: handlers}
Expand Down
5 changes: 5 additions & 0 deletions pkg/domain/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type CloudAssetByHostnameFetcher interface {
FetchByHostname(ctx context.Context, when time.Time, hostname string) ([]CloudAssetDetails, error)
}

// CloudAllAssetsByTimeFetcher fetches details for all cloud assets based on limit and optional offset with a given point in time
type CloudAllAssetsByTimeFetcher interface {
FetchAll(ctx context.Context, when time.Time, count uint, offset uint, assetType string) ([]CloudAssetDetails, error)
}

// Partition represents a database partition with the specified time range
type Partition struct {
Name string
Expand Down
165 changes: 165 additions & 0 deletions pkg/handlers/v1/cloud_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,33 @@ package v1

import (
"context"
"encoding/base32"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/asecurityteam/asset-inventory-api/pkg/domain"
"github.com/asecurityteam/asset-inventory-api/pkg/logs"
)

const (
awsEC2 = "AWS::EC2::Instance"
awsELB = "AWS::ElasticLoadBalancing::LoadBalancer"
awsALB = "AWS::ElasticLoadBalancingV2::LoadBalancer"
)

// CloudAssets represents a list of assets
type CloudAssets struct {
Assets []CloudAssetDetails `json:"assets"`
}

// PagedCloudAssets represents a list of assets with next page token for bulk requests
type PagedCloudAssets struct {
CloudAssets
NextPageToken string `json:"nextPageToken"`
}

// CloudAssetDetails represent an asset and associated attributes
type CloudAssetDetails struct {
PrivateIPAddresses []string `json:"privateIpAddresses"`
Expand All @@ -38,6 +53,47 @@ type CloudAssetFetchByHostnameParameters struct {
Timestamp string `json:"time"`
}

// CloudAssetFetchAllByTimestampParameters represents the incoming payload for bulk fetching cloud assets for point in time with optional pagination
type CloudAssetFetchAllByTimestampParameters struct {
Timestamp string `json:"time"`
Count uint `json:"count"`
Offset uint `json:"offset"`
Type string `json:"type"`
}

func (p *CloudAssetFetchAllByTimestampParameters) toNextPageToken() (string, error) {
nextPageParameters := CloudAssetFetchAllByTimestampParameters{
Timestamp: p.Timestamp,
Count: p.Count,
Offset: p.Offset + p.Count,
Type: p.Type,
}
js, err := json.Marshal(nextPageParameters)
if err != nil {
return "", err
}
token := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(js)
return token, nil
}

func fetchAllByTimeStampParametersForToken(token string) (*CloudAssetFetchAllByTimestampParameters, error) {
js, err := base32.StdEncoding.WithPadding(base32.NoPadding).DecodeString(token)
if err != nil {
return nil, err
}
ret := CloudAssetFetchAllByTimestampParameters{}
err = json.Unmarshal(js, &ret)
if err != nil {
return nil, err
}
return &ret, nil
}

// CloudAssetFetchAllByTimeStampPageParameters represents the request for subsequent pages of bulk fetching cloud assets
type CloudAssetFetchAllByTimeStampPageParameters struct {
PageToken string `json:"pageToken"`
}

// CloudFetchByIPHandler defines a lambda handler for fetching cloud assets with a given IP address
type CloudFetchByIPHandler struct {
LogFn domain.LogFn
Expand Down Expand Up @@ -108,6 +164,115 @@ func (h *CloudFetchByHostnameHandler) Handle(ctx context.Context, input CloudAss
return extractOutput(assets), nil
}

func validateAssetType(input string) (string, error) {
switch input {
case awsEC2, awsELB, awsALB:
return input, nil
default:
return "", fmt.Errorf("unknown asset type %s", input)
}
}

// CloudFetchAllAssetsByTimeHandler defines a lambda handler for bulk fetching cloud assets known at specific point in time
type CloudFetchAllAssetsByTimeHandler struct {
LogFn domain.LogFn
StatFn domain.StatFn
Fetcher domain.CloudAllAssetsByTimeFetcher
}

// Handle handles fetching cloud assets with pagination
func (h *CloudFetchAllAssetsByTimeHandler) Handle(ctx context.Context, input CloudAssetFetchAllByTimestampParameters) (PagedCloudAssets, error) {
logger := h.LogFn(ctx)

ts, e := time.Parse(time.RFC3339Nano, input.Timestamp)
if e != nil {
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "time", Cause: e}
}

if input.Count == 0 {
e = errors.New("missing or malformed required parameter count")
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "count", Cause: e}
}

assetType, e := validateAssetType(input.Type)
if e != nil {
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "type", Cause: e}
}

var offset uint // do not offset as this is the first page
assets, e := h.Fetcher.FetchAll(ctx, ts, input.Count, offset, assetType)
if e != nil {
logger.Error(logs.StorageError{Reason: e.Error()})
return PagedCloudAssets{}, e
}
if len(assets) == 0 {
return PagedCloudAssets{}, NotFound{ID: "any"}
}

nextPageToken, e := input.toNextPageToken()
if e != nil {
logger.Error(logs.StorageError{Reason: e.Error()})
}
return PagedCloudAssets{extractOutput(assets), nextPageToken}, nil
}

// CloudFetchAllAssetsByTimePageHandler defines a lambda handler for bulk fetching subsequent pages of cloud assets known at specific point in time
type CloudFetchAllAssetsByTimePageHandler struct {
LogFn domain.LogFn
StatFn domain.StatFn
Fetcher domain.CloudAllAssetsByTimeFetcher
}

// Handle handles subsequent page fetching of cloud assets
func (h *CloudFetchAllAssetsByTimePageHandler) Handle(ctx context.Context, input CloudAssetFetchAllByTimeStampPageParameters) (PagedCloudAssets, error) {
logger := h.LogFn(ctx)
params, e := fetchAllByTimeStampParametersForToken(input.PageToken)
if e != nil {
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "pageToken", Cause: e}
}
ts, e := time.Parse(time.RFC3339Nano, params.Timestamp)
if e != nil {
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "pageToken", Cause: e}
}

//generic error to report to caller to avoid exposing the internal token structure NB, the specific error is still logged
tokenError := errors.New("malformed pageToken")
if params.Count == 0 {
e = errors.New("missing or malformed required parameter count")
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "pageToken", Cause: tokenError}
}

if params.Offset == 0 {
e = errors.New("missing or malformed required parameter offset")
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "pageToken", Cause: tokenError}
}

assetType, e := validateAssetType(params.Type)
if e != nil {
logger.Info(logs.InvalidInput{Reason: e.Error()})
return PagedCloudAssets{}, InvalidInput{Field: "pageToken", Cause: tokenError}
}

assets, e := h.Fetcher.FetchAll(ctx, ts, params.Count, params.Offset, assetType)
if e != nil {
logger.Error(logs.StorageError{Reason: e.Error()})
return PagedCloudAssets{}, e
}
if len(assets) == 0 {
return PagedCloudAssets{}, NotFound{ID: "any"}
}

nextPageToken, _ := params.toNextPageToken()
return PagedCloudAssets{extractOutput(assets), nextPageToken}, nil
}

func extractOutput(assets []domain.CloudAssetDetails) CloudAssets {
cloudAssets := CloudAssets{
Assets: make([]CloudAssetDetails, len(assets)),
Expand Down
Loading

0 comments on commit 11d8fd0

Please sign in to comment.