Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add job refresh #227

Merged
merged 45 commits into from
Apr 8, 2022
Merged

feat: add job refresh #227

merged 45 commits into from
Apr 8, 2022

Conversation

arinda-arif
Copy link
Contributor

@arinda-arif arinda-arif commented Mar 17, 2022

#180

Acceptance criteria:

  • Should able to refresh:
    • selected jobs, or
    • all jobs in selected namespaces, or
    • all jobs in a project
  • After doing refresh, ALL jobs in the requested project will be deployed
  • Dependency should be correctly persisted
  • If dependency resolution is failed for a job, it should not failed the whole refresh process
  • Users should know which jobs failed on refresh or deploy
  • Metrics on every process in refresh should be pushed

@arinda-arif arinda-arif marked this pull request as draft March 17, 2022 02:51
@arinda-arif arinda-arif linked an issue Mar 17, 2022 that may be closed by this pull request
@arinda-arif
Copy link
Contributor Author

arinda-arif commented Mar 17, 2022

raystack/proton#114

@coveralls
Copy link

coveralls commented Mar 17, 2022

Pull Request Test Coverage Report for Build 2112938619

  • 270 of 397 (68.01%) changed or added relevant lines in 12 files are covered.
  • 17 unchanged lines in 1 file lost coverage.
  • Overall coverage increased (+0.2%) to 74.688%

Changes Missing Coverage Covered Lines Changed/Added Lines %
ext/scheduler/airflow2/airflow.go 3 4 75.0%
job/priority_resolver.go 1 2 50.0%
job/dependency_resolver.go 101 103 98.06%
job/deployer.go 40 42 95.24%
models/project.go 0 3 0.0%
job/service.go 92 97 94.85%
models/job.go 13 21 61.9%
api/handler/v1beta1/observer.go 0 47 0.0%
models/progress.go 0 58 0.0%
Files with Coverage Reduction New Missed Lines %
models/job.go 17 47.06%
Totals Coverage Status
Change from base Build 2100149915: 0.2%
Covered Lines: 5872
Relevant Lines: 7862

💛 - Coveralls

@arinda-arif arinda-arif marked this pull request as ready for review March 22, 2022 03:40
store/store.go Outdated Show resolved Hide resolved
store/store.go Outdated Show resolved Hide resolved
job/service.go Outdated Show resolved Hide resolved
job/service.go Outdated
var (
errDependencyResolution = fmt.Errorf("dependency resolution")

resolveDependencyFailureGauge = promauto.NewGauge(prometheus.GaugeOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have a single gauge with two values success & failure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have changed it to only a single gauge. differentiate the metric values with status label.

job/service.go Outdated

type AssetCompiler func(jobSpec models.JobSpec, scheduledAt time.Time) (models.JobAssets, error)

// DependencyResolver compiles static and runtime dependencies
type DependencyResolver interface {
Resolve(ctx context.Context, projectSpec models.ProjectSpec, jobSpec models.JobSpec, observer progress.Observer) (models.JobSpec, error)
ResolveAndPersist(ctx context.Context, projectSpec models.ProjectSpec, jobSpec models.JobSpec, observer progress.Observer) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Resolve And Persist, lets reuse resolve and create new function for persist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, I think it will reduce the complexity. however, in the existing resolve function, there is a resolveHookDependencies process, which is not needed (as we are going to store the requested and fetch all after). should we also decouple it from Resolve or keep it until we refactor job Sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, reuse resolve and introduce Persist instead of ResolveAndPersist

job/service.go Outdated

// Refresh fetches all the requested jobs, resolves its dependencies, assign proper priority weights,
// compile all jobs in the project and upload them to the destination store.
func (srv *Service) Refresh(ctx context.Context, projectSpec models.ProjectSpec, namespaceJobNamePairs []models.NamespaceJobNamePair,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not work with the *pb.RefreshJobsRequest only & fetch all the relevant jobspecs, I don't see a need for this model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was limiting the usage of protobuf model to only in the handler and not passing it to the service layer. this namespaceJobNamePairs is only being used to get all of the jobSpecs for dependency resolution. should we do it early in the handler? what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, still use NamespaceJobNamePair to avoid using external contract in the service layer. However, we are not passing NamespaceSpec, but NamespaceName instead (and fetching the spec just before where it is being used).

job/service.go Outdated
// Resolve dependency
if err = srv.resolveAndPersistDependency(ctx, projectSpec, namespaceJobNamePairs, progressObserver); err != nil {
// if err is caused by dependency resolution, ignore this as error.
var merrs *multierror.Error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see a need for this just return error if fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are several possible errors that happens in that particular function: multierror (errors from dependency resolution), when compiling assets, or when fetching the job specs. if it is from dependency resolution, we should skip the error, as we don't want the whole refresh process to be canceled. looks like it is not clear enough, i will refactor it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have modified this so it will not check for multierror, it will check for the error type.

job/service.go Outdated
func (srv *Service) Refresh(ctx context.Context, projectSpec models.ProjectSpec, namespaceJobNamePairs []models.NamespaceJobNamePair,
progressObserver progress.Observer) (err error) {
// Resolve dependency
if err = srv.resolveAndPersistDependency(ctx, projectSpec, namespaceJobNamePairs, progressObserver); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier we can do these steps

  1. fetch all job specs
  2. resolve dependencies
  3. persist dependencies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modifiedthis, however persist dependencies and resolve dependencies are still in a same parallel runner. please check this.

}

func (r *dependencyResolver) FetchJobDependencies(ctx context.Context, projectSpec models.ProjectSpec,
observer progress.Observer) (map[uuid.UUID][]models.JobSpecDependency, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type to be map[JobID]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we are still using uuid for now, to avoid having very big change in 1 PR

job/service.go Outdated
}

// Fetch dependency and enrich
jobDependencies, err := srv.dependencyResolver.FetchJobDependencies(ctx, projectSpec, progressObserver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, as you are refetching the dependencies which were constructed above i believe your intention is to move to seperate place later. It would be better to do that right now.

  1. FetchAllJobsAgain
  2. FetchAllJobDependencies
  3. EnrichJobsWithJobDependencies
  4. EnrichJobsWithHookDependencies // This needs to be optimized by not making call to plugin inside enrichment rather we cache the dependencies between plugins during bootstrap and use pluginService for these kind of purposes.
  5. PriorityResolution
  6. Grouping Namespecs // This step too, why are we keep on passing the projectJobSpecRep and fetching jobs lets just enrich the JobSpecModel with Project & NamespaceSpec or store ProjectId and NamespaceID in the JobSpec which can be used for grouping.

I expect all the data from datasources or third party integrations to happen only once and this data can be cached for optimization purposes. in here we are passing jobspecrepo in every place and fetching jobspecs again and again which can be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. please help to recheck

job/service.go Outdated
return jobSpecs, nil
}

func (srv *Service) prepareJobSpecs(ctx context.Context, projectSpec models.ProjectSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepare is generic better to have specific functions so readers can easily understand whats happening

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed this to fetchJobSpecs


// resolve specs in parallel
runner := parallel.NewRunner(parallel.WithTicket(ConcurrentTicketPerSec), parallel.WithLimit(ConcurrentLimit))
for _, jobSpec := range jobSpecs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible we can see a mechanism on abstract out all the operations of how the parallel running is happening by keeping things declarative

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decided to not refactor this part in this PR, as abstracting this out is quite tricky, and will see a better way to do this.

return nil
}

func (r *dependencyResolver) FetchJobDependencies(ctx context.Context, projectSpec models.ProjectSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FetchJobDependencies I believe can be simplified the logic of fetching and updating with inter/intra job dependencies .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have modified this, FetchJobDependencies will just fetch the dependencies, adapting to the required model is handled on the enriching part (in deployer).

cmd/job_refresh.go Show resolved Hide resolved
if !resp.GetSuccess() {
deployFailedCounter++
if verbose {
l.Info(coloredError(fmt.Sprintf("%d. %s failed to be deployed: %s", deployCounter, resp.GetJobName(), resp.GetMessage())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors can be warn messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

if !resp.GetSuccess() {
refreshFailedCounter++
if verbose {
l.Info(coloredError(fmt.Sprintf("error '%s': failed to refresh dependency, %s", resp.GetJobName(), resp.GetMessage())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failures to be considered as warn logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

cmd/job_refresh.go Show resolved Hide resolved

namespaceJobNamePairs := sv.prepareNamespaceJobNamePairs(req.NamespaceJobs)

if err := sv.jobSvc.Refresh(respStream.Context(), req.ProjectName, namespaceJobNamePairs, observers); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you split this into multiple lines, will be more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the proto request to accept request for project refresh or bunch of namespaces in project or bunch of jobs. project is mandatory. namespaces and jobs list is optional.
We can have different functions for refreshing all jobs for a project. refresh all jobs of given namespaces, refresh bunch of jobs in a project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this what you meant?

message RefreshJobsRequest {
  string project_name = 1;
  repeated string namespace_names = 2;
  repeated string job_names = 3;
}

currently, to refresh a bunch of jobs in a project, we need namespace info of where the jobs belong (for auth). we can go with what you proposed, but will still require the namespace that needs to be provided in the namespace_names (which is not straightforward), or we can avoid requesting the namespace for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, the proto PR: raystack/proton#127

@@ -16,3 +16,8 @@ type NamespaceSpec struct {
}

const AllNamespace = "*"

type NamespaceJobNamePair struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this struct, we can avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this as we are no longer need namespace to refresh specific jobs.

return srv.deployer.Deploy(ctx, projectSpec, progressObserver)
}

func (srv *Service) fetchJobSpecs(ctx context.Context, projectSpec models.ProjectSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchJobSpecs should be split into 4 functions, fetchAllForAProject, fetchAllForGivenNamespaces, fetchSpecsForGivenJobNames, fetchJobSpecs will be the wrapper function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

job/service.go Outdated
defer resolveDependencyHistogram.Observe(time.Since(start).Seconds())

// compile assets before resolving in parallel
for i, jSpec := range jobSpecs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compiling assets can also be part of the parallel run, anyreason to not keep this, and by the we decided to not fail in case any failure with a single job spec and proceed with others, but here we are returning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

job/deployer.go Outdated
return deployError
}

func (d *deployer) enrichJobSpecWithJobDependencies(ctx context.Context, jobSpecs []models.JobSpec, jobDependencies []models.JobIDDependenciesPair) ([]models.JobSpec, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can refactor this method

  1. createJobSpecMap can be dissolved into fetchJobSpecsForDependentJobs which can go into the main function call and the map creation will just create JobSpecMap given jobSpecs and dependentJobSpecs, here we can optimize by grouping the jobs by dependent project and fetching only once. Or why not fetch the jobspecs through a foreign key contraint, then we can avoid this fetch altogether.
  2. We will have a function to enrich a single obSpec With JobDependencies, which will be reused here. When ever there is nesting better to break down into multiple functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as discussed: not fetching the jobspecs through foreign key constraint (to avoid performance issue), jobSpecMap and enrichment are now being done when fetching job specs with job dependencies.

job/service.go Outdated
// resolve dependency and persist
if err := srv.resolveDependency(ctx, projectSpec, jobSpecs, progressObserver); err != nil {
// if err is caused by other than asset compilation, ignore the error.
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fail in case of asset compilation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be removed by now. fixing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}
)

cmd.Flags().StringVarP(&projectName, "project", "p", projectName, "Optimus project name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are accepting the flags but it doesn't work, as we are always referring from config. @deryrahman is working on fixing loading of specs from a single place, lets do the same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's known issue @sravankorumilli , will be fixed on #274, let's merge it once the PR is approved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now i have removed the project flag

@arinda-arif arinda-arif merged commit 52f15fc into main Apr 8, 2022
@arinda-arif arinda-arif deleted the refresh-jobs branch April 8, 2022 04:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Recompiling & Refreshing of Jobs in the scheduler.
5 participants