Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collect the completed workflow after persisted to database #1802

Merged
merged 11 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.tx

ENV NAMESPACE ""

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE}
# Set Workflow TTL to 7 days
ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 604800

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH}
41 changes: 22 additions & 19 deletions backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,30 @@ import (
)

var (
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
ttlSecondsAfterWorkflowFinish int64
)

const (
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
ttlSecondsAfterWorkflowFinishFlagName = "ttlSecondsAfterWorkflowFinish"
)

func main() {
Expand Down Expand Up @@ -122,4 +124,5 @@ func init() {
flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName,
"/apis/v1beta1", "The base path for the ML pipeline API server.")
flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.")
flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 604800 /* 7 days */, "The TTL for Argo workflow to persist after workflow finish.")
}
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type PersistenceAgent struct {

// NewPersistenceAgent returns a new persistence agent.
func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()
workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows()
Expand All @@ -64,7 +64,7 @@ func NewPersistenceAgent(

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.Kind,
workflowInformer.Informer(), true,
worker.NewWorkflowSaver(workflowClient, pipelineClient))
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))

agent := &PersistenceAgent{
swfClient: swfClient,
Expand Down
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
"My Retriable Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
"My Permanent Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down
24 changes: 16 additions & 8 deletions backend/src/agent/persistence/worker/workflow_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
log "github.com/sirupsen/logrus"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"time"
)

// WorkflowSaver provides a function to persist a workflow to a database.
type WorkflowSaver struct {
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
ttlSecondsAfterWorkflowFinish int64
}

func NewWorkflowSaver(client client.WorkflowClientInterface,
pipelineClient client.PipelineClientInterface) *WorkflowSaver {
pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver {
return &WorkflowSaver{
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish,
}
}

Expand All @@ -53,7 +56,12 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch
"Workflow (%s): transient failure: %v", key, err)

}

if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish {
hongye-sun marked this conversation as resolved.
Show resolved Hide resolved
// Skip persisting the workflow if the workflow is finished
// and the workflow hasn't being passing the TTL
log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name)
return nil
}
// Save this Workflow to the database.
err = s.pipelineClient.ReportWorkflow(wf)
retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)
Expand Down
83 changes: 68 additions & 15 deletions backend/src/agent/persistence/worker/workflow_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package worker
import (
"fmt"
"testing"
"time"

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/backend/src/agent/persistence/client"
Expand All @@ -39,9 +40,7 @@ func TestWorkflow_Save_Success(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -53,9 +52,7 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -70,9 +67,7 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -97,9 +92,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -124,13 +117,73 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, true, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "transient failure")
}

func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.Equal(t, nil, err)
}

func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 1)

// Sleep 2 seconds to make sure workflow passed TTL
time.Sleep(2 * time.Second)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "permanent failure")
}
Loading