Skip to content

Commit

Permalink
feat: add proto for parallel step logs
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Dec 2, 2024
1 parent 6b89ab0 commit bcffbd5
Show file tree
Hide file tree
Showing 11 changed files with 552 additions and 192 deletions.
13 changes: 7 additions & 6 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,13 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
WorkflowParallelStepNotificationsWorkerCount: cfg.TestkubeProWorkflowParallelStepNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand Down
22 changes: 22 additions & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,27 @@ func main() {
}
return notifications.Channel(), nil
}
getTestWorkflowParallelStepNotificationsStream := func(ctx context.Context, executionID, ref string, parallelStepIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, err
}

if execution.Result != nil && execution.Result.IsFinished() {
return nil, errors.New("test workflow execution is finished")
}

notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, ref, parallelStepIndex), executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return nil, notifications.Err()
}
return notifications.Channel(), nil
}
getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) {
return nil, errors.New("deprecated features have been disabled")
}
Expand All @@ -359,6 +380,7 @@ func main() {
getDeprecatedLogStream,
getTestWorkflowNotificationsStream,
getTestWorkflowServiceNotificationsStream,
getTestWorkflowParallelStepNotificationsStream,
clusterId,
cfg.TestkubeClusterName,
features,
Expand Down
87 changes: 44 additions & 43 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,50 @@ type Config struct {
LogsStorage string `envconfig:"LOGS_STORAGE" default:""`
WorkflowStorage string `envconfig:"WORKFLOW_STORAGE" default:"crd"`
// WhitelistedContainers is a list of containers from which logs should be collected.
WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
NatsSecure bool `envconfig:"NATS_SECURE" default:"false"`
NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"`
NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""`
NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""`
NatsCAFile string `envconfig:"NATS_CA_FILE" default:""`
NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"`
JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""`
JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""`
DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"`
TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""`
TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""`
TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""`
TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""`
TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""`
TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"`
TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"`
TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"`
TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowServiceNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_SERVICE_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"`
TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""`
TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""`
TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"`
TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"`
TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""`
TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""`
TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""`
TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""`
TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""`
TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""`
TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"`
WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
NatsSecure bool `envconfig:"NATS_SECURE" default:"false"`
NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"`
NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""`
NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""`
NatsCAFile string `envconfig:"NATS_CA_FILE" default:""`
NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"`
JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""`
JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""`
DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"`
TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""`
TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""`
TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""`
TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""`
TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""`
TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"`
TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"`
TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"`
TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowServiceNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_SERVICE_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowParallelStepNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_PARALLEL_STEP_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"`
TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""`
TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""`
TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"`
TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"`
TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""`
TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""`
TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""`
TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""`
TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""`
TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""`
TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"`
// TestkubeImageCredentialsCacheTTL is the duration for which the image pull credentials should be cached provided as a Go duration string.
// If set to 0, the cache is disabled.
TestkubeImageCredentialsCacheTTL time.Duration `envconfig:"TESTKUBE_IMAGE_CREDENTIALS_CACHE_TTL" default:"30m"`
Expand Down
27 changes: 14 additions & 13 deletions internal/config/procontext.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package config

type ProContext struct {
APIKey string
URL string
TLSInsecure bool
WorkerCount int
LogStreamWorkerCount int
WorkflowNotificationsWorkerCount int
WorkflowServiceNotificationsWorkerCount int
SkipVerify bool
EnvID string
OrgID string
Migrate string
ConnectionTimeout int
DashboardURI string
APIKey string
URL string
TLSInsecure bool
WorkerCount int
LogStreamWorkerCount int
WorkflowNotificationsWorkerCount int
WorkflowServiceNotificationsWorkerCount int
WorkflowParallelStepNotificationsWorkerCount int
SkipVerify bool
EnvID string
OrgID string
Migrate string
ConnectionTimeout int
DashboardURI string
}
6 changes: 6 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Agent struct {
testWorkflowServiceNotificationsResponseBuffer chan *cloud.TestWorkflowServiceNotificationsResponse
testWorkflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

testWorkflowParallelStepNotificationsWorkerCount int

Check failure on line 63 in pkg/agent/agent.go

View workflow job for this annotation

GitHub Actions / Lint Go

field `testWorkflowParallelStepNotificationsWorkerCount` is unused (unused)
testWorkflowParallelStepNotificationsRequestBuffer chan *cloud.TestWorkflowParallelStepNotificationsRequest

Check failure on line 64 in pkg/agent/agent.go

View workflow job for this annotation

GitHub Actions / Lint Go

field `testWorkflowParallelStepNotificationsRequestBuffer` is unused (unused)
testWorkflowParallelStepNotificationsResponseBuffer chan *cloud.TestWorkflowParallelStepNotificationsResponse

Check failure on line 65 in pkg/agent/agent.go

View workflow job for this annotation

GitHub Actions / Lint Go

field `testWorkflowParallelStepNotificationsResponseBuffer` is unused (unused)
testWorkflowParallelStepNotificationsFunc func(ctx context.Context, executionID, ref string, parallelStepIndexIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

Check failure on line 66 in pkg/agent/agent.go

View workflow job for this annotation

GitHub Actions / Lint Go

field `testWorkflowParallelStepNotificationsFunc` is unused (unused)

events chan testkube.Event
sendTimeout time.Duration
receiveTimeout time.Duration
Expand All @@ -79,6 +84,7 @@ func NewAgent(logger *zap.SugaredLogger,
logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error),
workflowNotificationsFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error),
workflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error),
workflowParallelStepNotificationsFunc func(ctx context.Context, executionID, ref string, parallelStepIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error),
clusterID string,
clusterName string,
features featureflags.FeatureFlags,
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ func TestCommandExecution(t *testing.T) {
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error)
var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)
var workflowParallelStepNotificationsStreamFunc func(ctx context.Context, executionID, ref string, parallelStepIndexIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

logger, _ := zap.NewDevelopment()
proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5}
agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc,
workflowServiceNotificationsStreamFunc, workflowParallelStepNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func TestEventLoop(t *testing.T) {
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error)
var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)
var workflowParallelStepNotificationsStreamFunc func(ctx context.Context, executionID, ref string, parallelStepIndexIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5}
agent, err := agent.NewAgent(logger.Sugar(), nil, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
agent, err := agent.NewAgent(logger.Sugar(), nil, grpcClient, logStreamFunc, workflowNotificationsStreamFunc,
workflowServiceNotificationsStreamFunc, workflowParallelStepNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
assert.NoError(t, err)
go func() {
l, err := agent.Load()
Expand Down
Loading

0 comments on commit bcffbd5

Please sign in to comment.