Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: cloud integrations: agent check-in api #7004

Merged
merged 14 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions pkg/query-service/app/cloudintegrations/availableServices.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package cloudintegrations

import (
"bytes"
"embed"
"encoding/json"
"fmt"
"io/fs"
"path"
Expand Down Expand Up @@ -86,23 +84,25 @@ func readAllServiceDefinitions() error {
continue
}

cloudProviderDirPath := path.Join(rootDirName, d.Name())
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
cloudProvider := d.Name()

cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions", d.Name())
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}

if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", d.Name())
return fmt.Errorf("no %s services could be read", cloudProvider)
}

availableServices[d.Name()] = cloudServices
availableServices[cloudProvider] = cloudServices
}

return nil
}

func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
Expand All @@ -118,7 +118,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
}

svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(svcDirPath)
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
Expand All @@ -135,14 +135,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
return svcDefs, nil
}

func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(dirpath, "integration.json")
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")

serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
dirpath, err,
svcDirpath, err,
)
}

Expand All @@ -155,45 +155,50 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
}

hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, dirpath,
integrationSpec, serviceDefinitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)

// telemetry collection strategy can't be parsed directly
telemetryCollectionStrategyMap, ok := hydratedSpec["telemetry_collection_strategy"].(map[string]any)
if !ok {
return nil, fmt.Errorf("couldn't find telemetry_collection_strategy")
}
delete(hydratedSpec, "telemetry_collection_strategy")

hydratedSpec := hydrated.(map[string]interface{})
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
telemetryCollectionStrategy, err := ParseCloudTelemetryCollectionStrategyFromMap(
cloudProvider, telemetryCollectionStrategyMap,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize hydrated integration spec back to JSON %s: %w",
integrationJsonPath, err,
)
return nil, fmt.Errorf("couldn't parse telemetry_collection_strategy: %w", err)
}

var serviceDef CloudServiceDetails
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&serviceDef)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
integrationJsonPath, err,
)
}

serviceDef.TelemetryCollectionStrategy = telemetryCollectionStrategy

err = validateServiceDefinition(serviceDef)
if err != nil {
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}

return &serviceDef, nil
return serviceDef, nil

}

func validateServiceDefinition(s CloudServiceDetails) error {
func validateServiceDefinition(s *CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
Expand Down
95 changes: 92 additions & 3 deletions pkg/query-service/app/cloudintegrations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)

var SupportedCloudProviders = []string{
Expand Down Expand Up @@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct {
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`

raj-k-singh marked this conversation as resolved.
Show resolved Hide resolved
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
}

type GenerateConnectionUrlResponse struct {
Expand All @@ -116,7 +122,7 @@ func (c *Controller) GenerateConnectionUrl(

// TODO(Raj): Add actual cloudformation template for AWS integration after it has been shipped.
connectionUrl := fmt.Sprintf(
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?stackName=SigNozIntegration/",
"https://%s.console.aws.amazon.com/cloudformation/home?region=%s#/stacks/quickcreate?stackName=SigNozIntegration",
req.AgentConfig.Region, req.AgentConfig.Region,
)

Expand Down Expand Up @@ -163,7 +169,17 @@ type AgentCheckInRequest struct {
}

type AgentCheckInResponse struct {
Account AccountRecord `json:"account"`
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`

IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}

type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`

TelemetryConfig *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
}

func (c *Controller) CheckInAsAgent(
Expand Down Expand Up @@ -201,8 +217,81 @@ func (c *Controller) CheckInAsAgent(
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}

// prepare and return integration config to be consumed by agent
telemetryConfig, err := NewCloudTelemetryCollectionConfig(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init cloud telemetry config: %w", err,
))
}

agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryConfig: telemetryConfig,
}

if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}

services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}

cloudAccountId := *account.CloudAccountId

svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, cloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}

// accumulated config in a fixed order to ensure same config generated across runs
configuredSvcIds := maps.Keys(svcConfigs)
slices.Sort(configuredSvcIds)

for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]

if svcDetails != nil {
if svcConfig.Metrics != nil && svcConfig.Metrics.Enabled {
err := agentConfig.TelemetryConfig.MetricsCollectionConfig.UpdateWithServiceConfig(
svcDetails.TelemetryCollectionStrategy.MetricsCollectionConfig,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't accumulate metrics config for svc %s: %w", svcId, err,
))
}
}

if svcConfig.Logs != nil && svcConfig.Logs.Enabled {
err := agentConfig.TelemetryConfig.LogsCollectionConfig.UpdateWithServiceConfig(
svcDetails.TelemetryCollectionStrategy.LogsCollectionConfig,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't accumulate logs config for svc %s: %w", svcId, err,
))
}
}
}
}

return &AgentCheckInResponse{
Account: *account,
AccountId: account.Id,
CloudAccountId: cloudAccountId,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/query-service/app/cloudintegrations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) {
},
)
require.Nil(apiErr)
require.Equal(testAccountId1, resp1.Account.Id)
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
require.Equal(testAccountId1, resp1.AccountId)
require.Equal(testCloudAccountId1, resp1.CloudAccountId)

// The agent should not be able to check in with a different
// cloud account id for the same account.
Expand Down Expand Up @@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp.Account.Id)
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)

return &resp.Account
require.Equal(testAccountId, resp.AccountId)
require.Equal(cloudAccountId, resp.CloudAccountId)

acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId)
require.Nil(err)
return acc
}
Loading
Loading