-
-
Notifications
You must be signed in to change notification settings - Fork 881
Add async support in os.rm/cp/filedirs #6989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces asynchronous support for several file system operations like rm, cp, and filedirs. This is achieved by adding a new async_task module that manages a worker thread and a task queue. The implementation is robust, featuring object pooling for thread synchronization primitives and graceful shutdown. The new asynchronous capability is immediately put to good use by making temporary file deletions non-blocking, which should provide a nice performance improvement. The changes to the threading module to support serialization of thread objects are also well-implemented. Overall, this is a great feature addition.
| function async_task.cp(srcpath, dstpath, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| srcpath = path.absolute(tostring(srcpath)) | ||
| dstpath = path.absolute(tostring(dstpath)) | ||
|
|
||
| local cmd = {kind = "cp", srcpath = srcpath, dstpath = dstpath} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end | ||
|
|
||
| -- remove files or directories | ||
| function async_task.rm(filepath, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| filepath = path.absolute(tostring(filepath)) | ||
|
|
||
| local cmd = {kind = "rm", filepath = filepath} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end | ||
|
|
||
| -- remove directories | ||
| function async_task.rmdir(dir, opt) | ||
| opt = opt or {} | ||
| local ok, errors = async_task._ensure_started() | ||
| if not ok then | ||
| return false, errors | ||
| end | ||
|
|
||
| -- post task | ||
| dir = path.absolute(tostring(dir)) | ||
|
|
||
| local cmd = {kind = "rmdir", dir = dir} | ||
| local cmd_event, cmd_result | ||
| local is_detach = opt.detach | ||
|
|
||
| -- create event and result for non-detach mode | ||
| if not is_detach then | ||
| cmd_event = _get_event() | ||
| cmd_result = _get_sharedata() | ||
|
|
||
| -- serialize thread objects for passing to worker thread | ||
| cmd.event_data = thread._serialize_object(cmd_event) | ||
| cmd.result_data = thread._serialize_object(cmd_result) | ||
| end | ||
|
|
||
| task_mutex:lock() | ||
| task_queue:push(cmd) | ||
| local queue_size = task_queue:size() | ||
| task_mutex:unlock() | ||
|
|
||
| if is_detach then | ||
| -- We cache some tasks before executing them to avoid frequent thread switching. | ||
| if queue_size > 10 then | ||
| task_event:post() | ||
| end | ||
| return true | ||
| else | ||
| -- wait for completion | ||
| task_event:post() | ||
| cmd_event:wait(-1) | ||
| local result = cmd_result:get() | ||
| _put_event(cmd_event) | ||
| _put_sharedata(cmd_result) | ||
| if result and result.ok then | ||
| return true | ||
| else | ||
| return false, result and result.errors or "unknown error" | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functions async_task.cp, async_task.rm, and async_task.rmdir share a large amount of boilerplate code for posting a task to the worker thread and handling the response. This code duplication can make maintenance harder, as any change to the task posting logic needs to be replicated in multiple places.
Consider refactoring this common logic into a helper function. This function could handle task submission, synchronization for non-detached tasks, and result handling. Each of the public functions (cp, rm, etc.) would then become a thin wrapper that prepares the command table and calls this new helper function.
#6868