Skip to content

Commit

Permalink
added get outcome thread logic in generic worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Meetatgoogle committed Mar 5, 2025
1 parent 47ba05f commit 1a552fa
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
14 changes: 10 additions & 4 deletions py/sight/demo/w1.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def warn(*args, **kwargs):
from sight.sight import Sight
from sight.widgets.decision import decision
from sight.widgets.decision import proposal
from helpers.logs.logs_handler import logger as logging


FLAGS = flags.FLAGS

Expand Down Expand Up @@ -214,10 +216,10 @@ async def propose_actions_wrapper(sight: Sight, question_label: str) -> None:
# both base and treatment are considerred to be same dict here
propose_actions(sight, question_label, sample, sample)))

print("waiting for all get outcome to finish.....")
logging.info("waiting for all get outcome to finish.....")
diff_time_series = await asyncio.gather(*tasks)
print("all get outcome are finished.....")
print(f'Combine Series : {diff_time_series}')
logging.info("all get outcome are finished.....")
logging.info(f'Combine Series : {diff_time_series}')


def driver(sight: Sight) -> None:
Expand All @@ -229,7 +231,7 @@ def driver(sight: Sight) -> None:

for _ in range(1):
next_point = decision.decision_point(get_question_label(), sight)
print('next_point : ', next_point)
logging.info('next_point : %s', next_point)

# using next_points to propose actions
asyncio.run(
Expand Down Expand Up @@ -274,6 +276,10 @@ def main(argv: Sequence[str]) -> None:
sight.set_object_code_loc(sight_obj, frame)
sight.log_object(sight_obj, True)

# this thread checks the outcome for proposed action from server
decision.init_sight_polling_thread(sight.id,
get_question_label_to_propose_actions())

decision.run(sight=sight,
question_label=get_question_label(),
driver_fn=driver)
Expand Down
12 changes: 7 additions & 5 deletions py/sight/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
from sight.utils.proto_conversion import convert_proto_to_dict
from sight.widgets.decision.resource_lock import RWLockDictWrapper
from sight_service.proto import service_pb2
from helpers.logs.logs_handler import logger as logging

POLL_LIMIT = 300 # POLL_TIME_INTERVAL th part of second
POLL_TIME_INTERVAL = 6 # seconds

POLL_LIMIT = 10 # POLL_TIME_INTERVAL th part of second
POLL_TIME_INTERVAL = 2 # seconds
global_outcome_mapping = RWLockDictWrapper()


Expand Down Expand Up @@ -90,7 +92,7 @@ def poll_network_batch_outcome(sight_id, question_label):
# print("pending action ids : ", pending_action_ids)
if len(pending_action_ids):
counter = POLL_LIMIT
print(f'BATCH POLLING THE IDS FOR => {len(pending_action_ids)}')
logging.info(f'BATCH POLLING THE IDS FOR => %s',{len(pending_action_ids)})
# print(f'BATCH POLLING THE IDS FOR => {pending_action_ids}')
outcome_of_action_ids = get_all_outcomes(sight_id, question_label,
pending_action_ids)
Expand All @@ -103,8 +105,8 @@ def poll_network_batch_outcome(sight_id, question_label):
global_outcome_mapping.update(new_dict)

else:
print(
f'Not sending request as no pending ids ...=> {pending_action_ids} with counter => {counter}'
logging.info(
f'Not sending request as no pending ids ...=> %s with counter => %s', pending_action_ids, counter
)
if counter <= 0:
return
Expand Down
2 changes: 1 addition & 1 deletion py/sight/widgets/decision/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def init_sight_polling_thread(sight_id, question_label):
# print
status_update_thread = threading.Thread(target=poll_network_batch_outcome,
args=(sight_id, question_label))
print('*************** starting thread ************')
logging.info('*************** starting thread ************')
status_update_thread.start()


Expand Down
11 changes: 6 additions & 5 deletions py/sight/widgets/decision/trials.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ def launch(

response = service.call(lambda s, meta: s.Launch(req, 300, metadata=meta))
# start polling thread, fetching outcome from server for proposed actions
if (decision_configuration.optimizer_type == sight_pb2.
DecisionConfigurationStart.OptimizerType.OT_WORKLIST_SCHEDULER and
response.display_string == "Worklist Scheduler SUCCESS!"):
decision.init_sight_polling_thread(sight.id,
decision_configuration.question_label)
# as we are awaiting till we get response back for this proposal of workerlist_scheduler, removing this thread
# if (decision_configuration.optimizer_type == sight_pb2.
# DecisionConfigurationStart.OptimizerType.OT_WORKLIST_SCHEDULER and
# response.display_string == "Worklist Scheduler SUCCESS!"):
# decision.init_sight_polling_thread(sight.id,
# decision_configuration.question_label)
logging.info('##### Launch response=%s #####', response)

logging.debug('<<<<<<<<< Out %s method of %s file.', method_name, _file_name)
Expand Down
4 changes: 3 additions & 1 deletion sight_service/service_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,16 @@ def Close(self, request, context):

logging.info('request in close is : %s', request)
with self.optimizers.instances_lock.gen_rlock():
# there is an issue with this : even one of the worker calls the close,
# fixed now - there is an issue with this : even one of the worker calls the close,
# this will call the close on the optimizer - need to fix this
# logging.info("request => %s", request)
if request.HasField("question_label"):
instance = self.optimizers.get_instance(request.client_id,
request.question_label)
# print('*********lenght of instances : ', len(instances))
if instance:
# for question, obj in instances.items():
# logging.info('instance found : %s', instance)
obj = instance.close(request)
else:
logging.info(
Expand Down

0 comments on commit 1a552fa

Please sign in to comment.