From 3e05226523031d0cc047628f7749ac13278c0ad1 Mon Sep 17 00:00:00 2001
From: Julian P Samaroo <jpsamaroo@jpsamaroo.me>
Date: Tue, 20 Jun 2023 13:47:48 -0500
Subject: [PATCH] Some latency improvements

---
 src/processor.jl   |  1 +
 src/sch/dynamic.jl | 15 ++++++++++++---
 src/sch/eager.jl   |  2 +-
 src/thunk.jl       |  2 ++
 4 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/src/processor.jl b/src/processor.jl
index 024c66c53..02efdca7a 100644
--- a/src/processor.jl
+++ b/src/processor.jl
@@ -157,6 +157,7 @@ iscompatible_arg(proc::ThreadProc, opts, x) = true
 function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @nospecialize(kwargs...))
     tls = get_tls()
     task = Task() do
+        @nospecialize f args kwargs
         set_tls!(tls)
         TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
         @invokelatest f(args...; kwargs...)
diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl
index 679514b83..58cc5bdf6 100644
--- a/src/sch/dynamic.jl
+++ b/src/sch/dynamic.jl
@@ -64,6 +64,7 @@ function dynamic_listener!(ctx, state, wid)
             end
             res = try
                 (false, lock(state.lock) do
+                    @nospecialize f data
                     Base.invokelatest(f, ctx, state, task, tid, data)
                 end)
             catch err
@@ -101,6 +102,7 @@ const DYNAMIC_EXEC_LOCK = Threads.ReentrantLock()
 
 "Executes an arbitrary function within the scheduler, returning the result."
 function exec!(f, h::SchedulerHandle, args...)
+    @nospecialize f args
     failed, res = lock(DYNAMIC_EXEC_LOCK) do
         put!(h.out_chan, (h.thunk_id.id, f, args))
         take!(h.inp_chan)
@@ -195,11 +197,18 @@ function _get_dag_ids(ctx, state, task, tid, _)
 end
 
 "Adds a new Thunk to the DAG."
-add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...) =
+function add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, options...)
+    @nospecialize f args
     exec!(_add_thunk!, h, f, args, options, future, ref)
-function _add_thunk!(ctx, state, task, tid, (f, args, options, future, ref))
+end
+function _add_thunk!(ctx, state, task, tid, payload)
+    @nospecialize payload
+    f, args, options, future, ref = payload
     timespan_start(ctx, :add_thunk, tid, 0)
-    _args = map(pos_arg->pos_arg[1] => (pos_arg[2] isa ThunkID ? state.thunk_dict[pos_arg[2].id] : pos_arg[2]), args)
+    _args = Pair{Union{Symbol,Nothing},Any}[]
+    for (pos, arg) in args
+        push!(_args, pos => (arg isa ThunkID ? state.thunk_dict[arg.id] : arg))
+    end
     GC.@preserve _args begin
         thunk = Thunk(f, _args...; options...)
         # Create a `DRef` to `thunk` so that the caller can preserve it
diff --git a/src/sch/eager.jl b/src/sch/eager.jl
index 1bafc0874..a57c324f0 100644
--- a/src/sch/eager.jl
+++ b/src/sch/eager.jl
@@ -97,7 +97,7 @@ function eager_thunk()
             added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN)
             # preserve inputs until they enter the scheduler
             tid = GC.@preserve args begin
-                _args = map(args) do pos_x
+                _args = Base.mapany(args) do pos_x
                     pos, x = pos_x
                     if x isa Dagger.EagerThunk
                         return pos => ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref)
diff --git a/src/thunk.jl b/src/thunk.jl
index c89e89523..1edcdceeb 100644
--- a/src/thunk.jl
+++ b/src/thunk.jl
@@ -77,6 +77,7 @@ mutable struct Thunk
                    propagates=(),
                    kwargs...
                   )
+        @nospecialize f xs
         if !isa(f, Chunk) && (!isnothing(processor) || !isnothing(scope))
             f = tochunk(f,
                         something(processor, OSProc()),
@@ -138,6 +139,7 @@ Options(;options...) = Options((;options...))
 Options(options...) = Options((;options...))
 
 function args_kwargs_to_pairs(args, kwargs)
+    @nospecialize args kwargs
     args_kwargs = Pair{Union{Symbol,Nothing},Any}[]
     for arg in args
         push!(args_kwargs, nothing => arg)