Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify distributor when a new task added #678

Conversation

ThisIsClark
Copy link
Collaborator

@ThisIsClark ThisIsClark commented Aug 28, 2021

What this PR does / why we need it:
When a new performance task created, notify distributor to distribute it instead of scan db periodically.

Which issue this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close that issue when PR gets merged): fixes #

Special notes for your reviewer:

Release note:

Test case:
1. Start node1, node1 would be the leader
2. Start node2, node1 would still be the leader
3. Register a devide, node1 would be notified the a new job created
4. Node1 down, node2 would be the leader
5. Register a devide, node2 would be notified the a new job created
6. Node1 up, node2 would still be the leader
7. Register a devide, node2 would be notified the a new job created

@ThisIsClark ThisIsClark force-pushed the new_task_notification branch from 6326b85 to 01dd9e6 Compare August 30, 2021 02:14
@ThisIsClark ThisIsClark changed the title Notify distributor when a new task added [WIP] Notify distributor when a new task added Aug 30, 2021
Copy link
Collaborator

@joseph-v joseph-v left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -76,6 +79,15 @@ def schedule_boot_jobs(self):
LOG.error("Failed to initialize periodic tasks, reason: %s.",
six.text_type(e))
raise e
metrics_task_server = service. \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sub routine is started only by leader.. if we keep metrics_task_service here only leader will have this service running..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new service here, to get notification when new job created.

@@ -46,18 +46,9 @@ def main():
task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()
metrics_task_server = service. \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric worker node need not start resource task and alert task.. if we start new service inside scheduler start we cant control the service start during multi process spawn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reversed

@@ -46,18 +46,9 @@ def main():
task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()
metrics_task_server = service. \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric worker node need not start resource task and alert task.. if we start new service inside scheduler start we cant control the service start during multi process spawn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reversed

@@ -74,6 +74,16 @@ def __call__(self):
else:
LOG.debug("Periodic job distribution completed.")

def distribute_new_job(self, task_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about function name 'assign_new_job' ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is that the file is task_distributor, so I named this function with distribute

@@ -109,8 +111,8 @@ def create(self, req, body):
capabilities = self.driver_api.get_capabilities(
context=ctxt, storage_id=storage['id'])
validation.validate_capabilities(capabilities)
_create_performance_monitoring_task(ctxt, storage['id'],
capabilities)
self.perf_job_manager.create_perf_job(ctxt, storage['id'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we move this function to task manager.. API itself can create task table.. Why we need to distribute create task?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_performance_monitoring_task only do one thing: add the job to db
self.perf_job_manager.create_perf_job do two things: add the job to db and put the task id to RabbitMQ

@ThisIsClark ThisIsClark force-pushed the new_task_notification branch 2 times, most recently from 8c34e9b to b2ac417 Compare August 30, 2021 07:31
@codecov
Copy link

codecov bot commented Aug 30, 2021

Codecov Report

Merging #678 (df0d160) into perf_coll_fw_enhance (903138a) will decrease coverage by 0.06%.
The diff coverage is 54.71%.

@@                   Coverage Diff                    @@
##           perf_coll_fw_enhance     #678      +/-   ##
========================================================
- Coverage                 70.18%   70.11%   -0.07%     
========================================================
  Files                       159      161       +2     
  Lines                     14936    14976      +40     
  Branches                   1822     1822              
========================================================
+ Hits                      10483    10501      +18     
- Misses                     3846     3868      +22     
  Partials                    607      607              
Impacted Files Coverage Δ
...in/leader_election/distributor/perf_job_manager.py 0.00% <0.00%> (ø)
delfin/task_manager/metrics_manager.py 0.00% <0.00%> (ø)
delfin/task_manager/metrics_rpcapi.py 64.70% <25.00%> (-5.30%) ⬇️
delfin/task_manager/scheduler/schedule_manager.py 55.10% <33.33%> (-1.42%) ⬇️
...in/leader_election/distributor/task_distributor.py 85.71% <66.66%> (-4.29%) ⬇️
delfin/task_manager/perf_job_controller.py 85.71% <85.71%> (ø)
delfin/api/v1/storages.py 76.35% <100.00%> (-1.22%) ⬇️

@ThisIsClark ThisIsClark changed the title [WIP] Notify distributor when a new task added Notify distributor when a new task added Aug 30, 2021
@ThisIsClark ThisIsClark force-pushed the new_task_notification branch from b2ac417 to 4bbd926 Compare August 30, 2021 08:41
@@ -74,6 +74,17 @@ def __call__(self):
else:
LOG.debug("Periodic job distribution completed.")

def distribute_new_job(self, task_id):
executor = CONF.host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting executor should be a function , which generates the executor based on some alogorithm.. This Applicable to all reference of CONF.host in distributor.. Are we planning that logic in a different PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would in another PR

@@ -74,6 +74,17 @@ def __call__(self):
else:
LOG.debug("Periodic job distribution completed.")

def distribute_new_job(self, task_id):
executor = CONF.host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting executor should be a function , which generates the executor based on some alogorithm.. This Applicable to all reference of CONF.host in distributor.. Are we planning that logic in a different PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

RPC_API_VERSION = '1.0'

def __init__(self):
self.target = messaging.Target(topic='JobGenerator',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be put under metrics_manager , under same topic as CONF.host

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

db.task_create(context=context, values=task)
# Add it to RabbitMQ
filters = {'storage_id': storage_id}
task_id = db.task_get_all(context, filters=filters)[0].get('id')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below steps can be handled within metrics_rpc_api ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

@@ -76,6 +79,15 @@ def schedule_boot_jobs(self):
LOG.error("Failed to initialize periodic tasks, reason: %s.",
six.text_type(e))
raise e
job_generator = service. \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, This service is not required. We can put the JobGenerator under metrics_manager itself

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This service should be only on leader node, so I create a new service here

@ThisIsClark ThisIsClark force-pushed the new_task_notification branch 2 times, most recently from 6b946f1 to 02f9b2f Compare August 30, 2021 12:07
@ThisIsClark ThisIsClark force-pushed the new_task_notification branch from 02f9b2f to df0d160 Compare August 30, 2021 12:33
Copy link
Member

@NajmudheenCT NajmudheenCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sushanthakumar
Copy link
Collaborator

LGTM

@ThisIsClark ThisIsClark merged commit d2b4140 into sodafoundation:perf_coll_fw_enhance Aug 31, 2021
kumarashit added a commit that referenced this pull request Sep 14, 2021
* Make job scheduler local to task process (#674)

* Make job scheduler local to task process

* Notify distributor when a new task added (#678)

* Remove db-scan for new task creation (#680)

* Use consistent hash to manage the topic (#681)

* Remove the periodically call from task distributor (#686)

* Start one historic collection immediate when a job is rescheduled (#685)

* Start one historic collection immediate when a job is rescheduled

* Remove failed task distributor (#687)

* Improving Failed job handling and telemetry job removal (#689)

Co-authored-by: ThisIsClark <liuyuchibubao@gmail.com>
Co-authored-by: Ashit Kumar <akopensrc@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants