Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use of nest_asyncio? #3

Open
consideRatio opened this issue Jan 11, 2022 · 10 comments
Open

Use of nest_asyncio? #3

consideRatio opened this issue Jan 11, 2022 · 10 comments

Comments

@consideRatio
Copy link
Member

consideRatio commented Jan 11, 2022

This is a dedicated issue for the discussion in jupyterhub/nbgitpuller#194 (comment) surrounding the following code snippet.

import nest_asyncio

# this allows us to nest usage of the event_loop from asyncio
# being used by tornado in jupyter distro
# Ref: https://medium.com/@vyshali.enukonda/how-to-get-around-runtimeerror-this-event-loop-is-already-running-3f26f67e762e
nest_asyncio.apply()

Goal

  1. Clarify what made us need a workaround like nest_asyncio in the first place.

    I currently don't have a good grasp of where in the codebase we end up erroring without nest_asyncio.apply() called ahead of time.

    @sean-morris can you provide some more details on this?

  2. Get a recommendation and go with it

    I figure we should ask Min RK on the use of this workaround.

    It is my understanding that by using it, we modify asyncio core functionality and my main concern would be if just installing nbgitpuller automatically would lead to making a change that breaks other code. Before doing this, I figure we just try to clarify and pinpoint what code, and where, makes us need to need the workaround. I'd like to have it more explicit wherever we declare to use it.

  3. If we go with the workaround: decide on location in codebase

    We decide on the location. It just need to run once, and it can run from anywhere. So, perhaps a init.py file not related to a specific plugin makes the most sense. I think it can be in ngitpuller also if the most primitive structure that all plugins will need requires it also, and, a plugin has been detected.

    Mainly I'd like to avoid having it in nbgitpuller in situations when users don't even use plugins.

@welcome

This comment has been minimized.

@consideRatio
Copy link
Member Author

consideRatio commented Jan 14, 2022

I understand that pluggy, the library we use to define/collect/run plugin logic on how to handle custom git puller logic, isn't capable of accepting an async function. This can be traced back to: pytest-dev/pluggy#320

Due to this, the hook specification is an non-async function. The implementation of a hook now looks like this.

@hookimpl
def handle_files(helper_args, query_line_args):
"""
This begins the event loop that will both download the compressed archive and send messages
about the progress of the download to the UI.
:param dict helper_args: the function, helper_args["progress_func"], that writes messages to
the progress stream in the browser window and the download_q, helper_args["download_q"] the progress function uses.
:param dict query_line_args: this includes all the arguments included on the nbgitpuller URL
:return two parameter json unzip_dir and origin_repo_path
:rtype json object
"""
loop = asyncio.get_event_loop()
hfh = HandleFilesHelper(helper_args, query_line_args)
tasks = hfh.handle_files_helper(), helper_args["wait_for_sync_progress_queue"]()
result_handle, _ = loop.run_until_complete(asyncio.gather(*tasks))
return result_handle

The code above schedules an locally defined async function (handle_files_helper) to run in an event loop.

async def handle_files_helper(self):
"""
This calls the async generator function and handle the storing of messages from the gener() function
into the download_q
:return json object with the directory name of the download and
the origin_repo_path
:rtype json object
"""
try:
async for line in self.handle_download_and_extraction():
self.download_q.put_nowait(line)
await asyncio.sleep(0.1)
except Exception as e:
self.download_q.put_nowait(e)
raise e
# mark the end of the queue with a None value
self.download_q.put_nowait(None)
return {"output_dir": self.dir_names[0], "origin_repo_path": self.origin_repo}

Current strategy to avoid blocking issues

  1. Git pulling is done either from a main function, or from a async get function

  2. The async get function, invoked as part of an incoming web request, run the gitpuller logic in a dedicated Thread.

                gp = GitPuller(repo, repo_dir, branch=branch, depth=depth, parent=self.settings['nbapp'])
                q = Queue()
    
                def pull():
                    try:
                        for line in gp.pull():
                            q.put_nowait(line)
                        # Sentinel when we're done
                        q.put_nowait(None)
                    except Exception as e:
                        q.put_nowait(e)
                        raise e
                self.gp_thread = threading.Thread(target=pull)
                self.gp_thread.start()
  3. While the dedicated thread does work, the get function has an await (previously yield) that repeatedly sleeps and polls a progress message queue that is written to by the new thread.

Do we need async pluggy?

I don't think so. We can block the dedicated thread and can write to a message queue.

Plugin implementation now and suggestion

First let's clarify responsibilities:

  • Core gitpuller logic's responsibility: to update a persistent folder based on a git source (temp local folder if a content provider plugin is used)
  • Plugin logic's responsibility: to provide a git repo to a temp local folder

It seems like the current approach has two separate strategies to avoid blocking.

  • Core gitpuller logic relies on: running in a dedicated thread
  • Plugin logic relies on
    • Running code in an event loop
    • Duplicating the progress message reporting logic

My conclusion is that the problem arise from not using the same strategy, and that we should:

  1. Rely on a single strategy, specifically the dedicated thread strategy
  2. Rely on a progress message queue to communicate progress up to a polling function that repeatedly sleeps

Practically I think self.gp_thread.start() should start work for all logic - both the optional content provider plugin work and the core gitpuller logic. That in turn means that we should stop passing the _wait_for_sync_progress_queue function to the plugin etc, but only pass a progress_message_queue which it can write to whenever it feels like it.

@sean-morris
Copy link
Collaborator

I think I follow you here @consideRatio. I am can take a look at what you suggest.

I am about to commit to a solution that I came up with late yesterday that uses aiopluggy instead of pluggy. In this scenario, we are able to have an async hookimpl. I had to write my own code to do plugin discovery because this functionality does not exist in aiopluggy as it did in pluggy.

I realized as I was responding to this issue yesterday, that I could solve this problem with aiopluggy. I had tried this solution early on and abandoned it for a bunch of reasons that I now realize the solution to. Anyway. I will commit this, take a look and I will work on what you suggest here.

@consideRatio
Copy link
Member Author

consideRatio commented Jan 14, 2022

@sean-morris while https://github.com/Amsterdam/aiopluggy could have been relevant, I think we should steer clear from using it because the last commit was 4 years ago. An example of an issue of relying on such project is that it may not support modern versions of python or similar which we should in this actively maintained project.

I think if you expand the following...

            def pull():
                try:
                    for line in gp.pull():
                        q.put_nowait(line)
                    # Sentinel when we're done
                    q.put_nowait(None)
                except Exception as e:
                    q.put_nowait(e)
                    raise e
            self.gp_thread = threading.Thread(target=pull)
            self.gp_thread.start()

To be like...

             def pull():
                 try:
+                    # TODO: do content provider plugin work here
+                    # TODO: let the hook communicate back to this async function via
+                    #       a progress message queue which is repeatedly polled in this
+                    #       thread to report back to the UI
                     for line in gp.pull():
                         q.put_nowait(line)
                     # Sentinel when we're done
                     q.put_nowait(None)
                 except Exception as e:
                     q.put_nowait(e)
                     raise e
             self.gp_thread = threading.Thread(target=pull)
             self.gp_thread.start()

@sean-morris
Copy link
Collaborator

sean-morris commented Jan 15, 2022

@consideRatio @manics Got it. I committed the changes. It is much cleaner. The only issue I see is how to get the returned JSON structure from the hookimpl, handle_files, back to the nbgitpuller call. I hacked it a bit just so we can see it working by setting helper_args["handle_files_output"] as the last line.

  @hookimpl
  def handle_files(helper_args, query_line_args):
      hfh = HandleFilesHelper(helper_args, query_line_args)
      output_info = yield from hfh.handle_files_helper()
      helper_args["handle_files_output"] = output_info

Then in nbgitpuller:

 def pull():
      if content_provider is not None:
          ...
          helper_args = dict()
          helper_args["repo_parent_dir"] = repo_parent_dir

          try:
              for line in plugin_manager.hook.handle_files(helper_args=helper_args,query_line_args=query_line_args):
                  q.put_nowait(line)
          except Exception as e:
              q.put_nowait(e)
              raise e

          results = helper_args["handle_files_output"] <-- PULLING THE RESULT HERE
          self.repo_dir = repo_parent_dir + results["output_dir"]
          self.repo = "file://" + results["origin_repo_path"]

      gp = GitPuller(self.repo, self.repo_dir, branch=branch, depth=depth, parent=self.settings['nbapp'])

      try:
          for line in gp.pull():
              q.put_nowait(line)
          # Sentinel when we're done
          q.put_nowait(None)
      except Exception as e:
          q.put_nowait(e)
          raise e

A better/cleaner method for getting the results from handle_files may be to set up the downloader plugins as Classes with an instance variable, results, that is set as the handle_file function completes and then accessed via the object reference on the nbgitpuller side.

The only issue with this is that we have to write our own plugin discovery method because pluggy's function can't handle automated discovery via entry points if the hookimpl function is in a Class. I wrote a mock-up of what the discovery might take; it is doable but not sure if that is where we want/need to go.

@consideRatio
Copy link
Member Author

consideRatio commented Jan 17, 2022

Refactoring plugin logic execution

Currently we have logic in handlers.py to execute the plugin logic, but which represents when a user visits a /git-pull url. But, in practice we should of course behave the same if you run the gitpuller CLI. In this comment I suggest how to only declare the logic once which makes me think we may avoid some trouble of passing around relevant state.


handlers.py + pull.py both have logic to use the GitPuller class and its method pull defined in the pull.py file. The plugin logic is relevant to both. I suggest that instead of adding logic to do the plugin things in handlers.py, you do it in pull.py. That way, you get the logic to be part of both the CLI and the web request triggered execution of GitPuller().pull().

By doing that, you can also make use of the GitPuller class as an object to store state as well, and you may not need to pass anything back and forth etc.

I think it is absolutely critical that we avoid duplicating the implementation logic in two locations, and my suggestion above didn't reflect that.

             def pull():
                 try:
-                    # TODO: do content provider plugin work here
-                    # TODO: let the hook communicate back to this async function via
-                    #       a progress message queue which is repeatedly polled in this
-                    #       thread to report back to the UI
+                    # FIXME: Let the GitPuller object doing work, also do the plugin work
+                    #        practically leaving this code as it was.
                     for line in gp.pull():
                         q.put_nowait(line)
                     # Sentinel when we're done
                     q.put_nowait(None)
                 except Exception as e:
                     q.put_nowait(e)
                     raise e
             self.gp_thread = threading.Thread(target=pull)
             self.gp_thread.start()

@sean-morris
Copy link
Collaborator

ok. I am following. We are stepping into the middle of it! I will throw this up and we can check it out.

@sean-morris
Copy link
Collaborator

@consideRatio Ok, I committed the changes to pull everything into pull.py. I have not tried the CLI yet but will. We still end up needing to get the results from the downloader plugin. My suggestion is to either pass a reference to GitPuller class(which I would say is weird) or implement the plugins as classes and write our own discovery.

@consideRatio
Copy link
Member Author

Hmmm i dont get this yet.

We still end up needing to get the results from the downloader plugin.

Can you clarify what information needs to be passed from where to where?

@sean-morris
Copy link
Collaborator

sean-morris commented Jan 19, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants