From e11e29452ba6f71154099c4b1f8cd8846afb8aba Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Sat, 18 May 2024 14:08:45 -0500 Subject: [PATCH 1/6] Adding channel to control mlflow logging processing during multiprocessing --- Project.toml | 4 +++- src/MLJFlow.jl | 1 + src/base.jl | 15 ++++++++++++++- src/types.jl | 1 + test/base.jl | 4 ++-- test/multiprocessing.jl | 38 ++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 4 +++- 7 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 test/multiprocessing.jl diff --git a/Project.toml b/Project.toml index bf446bc..e5b43f7 100644 --- a/Project.toml +++ b/Project.toml @@ -7,6 +7,7 @@ version = "0.4.2" MLFlowClient = "64a0f543-368b-4a9a-827a-e71edb2a0b83" MLJBase = "a7f614a8-145f-11e9-1d2a-a57a1082229d" MLJModelInterface = "e80e1ace-859a-464e-9ed9-23947d8ae3ea" +MLJTuning = "03970b2e-30c4-11ea-3135-d1576263f10f" [compat] MLFlowClient = "0.5.1" @@ -18,8 +19,9 @@ julia = "1.6" MLFlowClient = "64a0f543-368b-4a9a-827a-e71edb2a0b83" MLJDecisionTreeInterface = "c6f25543-311c-4c74-83dc-3ea6d1015661" MLJModels = "d491faf4-2d78-11e9-2867-c94bc002c0b7" +MLJTuning = "03970b2e-30c4-11ea-3135-d1576263f10f" StatisticalMeasures = "a19d573c-0a75-4610-95b3-7071388c7541" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "MLFlowClient", "MLJModels", "MLJDecisionTreeInterface", "StatisticalMeasures"] +test = ["Test", "MLFlowClient", "MLJModels", "MLJDecisionTreeInterface", "StatisticalMeasures", "MLJTuning"] diff --git a/src/MLJFlow.jl b/src/MLJFlow.jl index f42d1b6..4274732 100644 --- a/src/MLJFlow.jl +++ b/src/MLJFlow.jl @@ -5,6 +5,7 @@ using MLJModelInterface: flat_params using MLFlowClient: MLFlow, logparam, logmetric, createrun, MLFlowRun, updaterun, logartifact, getorcreateexperiment +using .Threads: nthreads import Base: show import MLJBase: save, log_evaluation diff --git a/src/base.jl b/src/base.jl index 82413a0..5cbb19a 100644 --- a/src/base.jl +++ b/src/base.jl @@ -1,4 +1,12 @@ -function log_evaluation(logger::Logger, performance_evaluation) +const TASK_QUEUE = Channel{Tuple}() + +function process_queue() + for (f, l, p) in TASK_QUEUE + f(l, p) + end +end + +function _log_evaluation(logger::Logger, performance_evaluation) experiment = getorcreateexperiment(logger.service, logger.experiment_name; artifact_location=logger.artifact_location) run = createrun(logger.service, experiment; @@ -19,6 +27,11 @@ function log_evaluation(logger::Logger, performance_evaluation) updaterun(logger.service, run, "FINISHED") end + +function log_evaluation(logger::Logger, performance_evaluation) + @sync put!(TASK_QUEUE, (_log_evaluation, logger, performance_evaluation)) +end + function save(logger::Logger, machine:: Machine) io = IOBuffer() save(io, machine) diff --git a/src/types.jl b/src/types.jl index a491479..33bc032 100644 --- a/src/types.jl +++ b/src/types.jl @@ -32,6 +32,7 @@ end function Logger(apiroot; experiment_name="MLJ experiment", artifact_location=nothing, verbosity=1) service = MLFlow(apiroot) + @async process_queue() Logger(service, verbosity, experiment_name, artifact_location) end diff --git a/test/base.jl b/test/base.jl index 02d3f00..6140474 100644 --- a/test/base.jl +++ b/test/base.jl @@ -12,8 +12,8 @@ measures=[LogLoss(), Accuracy()], verbosity=1, logger=logger) @testset "log_evaluation" begin - runs = searchruns(logger.service, - getexperiment(logger.service, logger.experiment_name)) + experiment = getexperiment(logger.service, logger.experiment_name) + runs = searchruns(logger.service, experiment) @test typeof(runs[1]) == MLFlowRun end diff --git a/test/multiprocessing.jl b/test/multiprocessing.jl new file mode 100644 index 0000000..4076a19 --- /dev/null +++ b/test/multiprocessing.jl @@ -0,0 +1,38 @@ +@testset verbose = true "multiprocessing" begin + logger = MLJFlow.Logger(ENV["MLFLOW_TRACKING_URI"]; + experiment_name="MLJFlow multiprocessing tests", + artifact_location="/tmp/mlj-test") + + X, y = make_moons(100) + DecisionTreeClassifier = @load DecisionTreeClassifier pkg=DecisionTree + + model = DecisionTreeClassifier() + r = range(model, :max_depth, lower=1, upper=6) + + function test_tuned_model(acceleration_method) + tuned_model = TunedModel( + model=model, + range=r, + logger=logger, + acceleration=acceleration_method, + n=100, + ) + tuned_model_mach = machine(tuned_model, X, y) + fit!(tuned_model_mach) + + experiment = getorcreateexperiment(logger.service, logger.experiment_name) + runs = searchruns(logger.service, experiment) + + @assert length(runs) == 100 + + deleteexperiment(logger.service, experiment) + end + + @testset "log_evaluation_with_cpu_threads" begin + test_tuned_model(CPUThreads()) + end + + @testset "log_evaluation_with_cpu_processes" begin + test_tuned_model(CPUProcesses()) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index fd090ce..5122038 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,9 +1,11 @@ using Test +using .Threads using MLJFlow using MLJBase using MLJModels +using MLJTuning using MLFlowClient using MLJModelInterface using StatisticalMeasures @@ -21,4 +23,4 @@ end include("base.jl") include("types.jl") include("service.jl") - +include("multiprocessing.jl") From c7ee07dc983f1c2c8481a4458cda7cff36956ffc Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Mon, 20 May 2024 00:41:20 -0500 Subject: [PATCH 2/6] Adding `result_channel` to create an awaitable resource, ensuring correct concurrency workflow --- src/base.jl | 15 +++++++-------- src/types.jl | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/base.jl b/src/base.jl index 5cbb19a..56b7655 100644 --- a/src/base.jl +++ b/src/base.jl @@ -1,10 +1,4 @@ -const TASK_QUEUE = Channel{Tuple}() - -function process_queue() - for (f, l, p) in TASK_QUEUE - f(l, p) - end -end +const LOGGING_TASKS_CHANNEL = Channel{Tuple}() function _log_evaluation(logger::Logger, performance_evaluation) experiment = getorcreateexperiment(logger.service, logger.experiment_name; @@ -29,7 +23,12 @@ end function log_evaluation(logger::Logger, performance_evaluation) - @sync put!(TASK_QUEUE, (_log_evaluation, logger, performance_evaluation)) + result_channel = Channel{MLFlowRun}(1) + + put!(LOGGING_TASKS_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) + wait(result_channel) + + return take!(result_channel) end function save(logger::Logger, machine:: Machine) diff --git a/src/types.jl b/src/types.jl index 33bc032..6aa19c8 100644 --- a/src/types.jl +++ b/src/types.jl @@ -32,7 +32,21 @@ end function Logger(apiroot; experiment_name="MLJ experiment", artifact_location=nothing, verbosity=1) service = MLFlow(apiroot) - @async process_queue() + + # NOTE: This background loop allows to execute the logging operations from + # the LOGGING_TASKS_CHANNEL. The execution result is sent back to the + # requesting thread through the result_channel. + # Until May 2024, mlflow does not support concurrent experiment creation, + # which does not allow to run the logging operations in multi-threading and + # multi-processing. + # + # Its usage can be seen in the `log_evaluation` function in `base.jl`. + Threads.@spawn begin + for (logging_function, logger, performance_evaluation, result_channel) in LOGGING_TASKS_CHANNEL + result = logging_function(logger, performance_evaluation) + put!(result_channel, result) + end + end Logger(service, verbosity, experiment_name, artifact_location) end From 0e5a16c7630c997deb42781f9b0a4a9571ab527e Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Mon, 20 May 2024 21:11:57 -0500 Subject: [PATCH 3/6] Moving `LOGGING_TASKS_CHANNEL` from global to `Logger` instance --- src/base.jl | 5 +---- src/types.jl | 9 +++++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/base.jl b/src/base.jl index 56b7655..d42f0eb 100644 --- a/src/base.jl +++ b/src/base.jl @@ -1,5 +1,3 @@ -const LOGGING_TASKS_CHANNEL = Channel{Tuple}() - function _log_evaluation(logger::Logger, performance_evaluation) experiment = getorcreateexperiment(logger.service, logger.experiment_name; artifact_location=logger.artifact_location) @@ -21,11 +19,10 @@ function _log_evaluation(logger::Logger, performance_evaluation) updaterun(logger.service, run, "FINISHED") end - function log_evaluation(logger::Logger, performance_evaluation) result_channel = Channel{MLFlowRun}(1) - put!(LOGGING_TASKS_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) + put!(logger._LOGGING_TASKS_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) wait(result_channel) return take!(result_channel) diff --git a/src/types.jl b/src/types.jl index 6aa19c8..9fcb0de 100644 --- a/src/types.jl +++ b/src/types.jl @@ -28,7 +28,9 @@ struct Logger verbosity::Int experiment_name::String artifact_location::Union{String,Nothing} + _LOGGING_TASKS_CHANNEL::Channel{Tuple} end + function Logger(apiroot; experiment_name="MLJ experiment", artifact_location=nothing, verbosity=1) service = MLFlow(apiroot) @@ -41,15 +43,18 @@ function Logger(apiroot; experiment_name="MLJ experiment", # multi-processing. # # Its usage can be seen in the `log_evaluation` function in `base.jl`. + _LOGGING_TASKS_CHANNEL = Channel{Tuple}() + Threads.@spawn begin - for (logging_function, logger, performance_evaluation, result_channel) in LOGGING_TASKS_CHANNEL + for (logging_function, logger, performance_evaluation, result_channel) in _LOGGING_TASKS_CHANNEL result = logging_function(logger, performance_evaluation) put!(result_channel, result) end end - Logger(service, verbosity, experiment_name, artifact_location) + Logger(service, verbosity, experiment_name, artifact_location, _LOGGING_TASKS_CHANNEL) end + function show(io::IO, logger::MLJFlow.Logger) print(io, "MLFLowLogger(\"$(logger.service.apiroot)\",\n" * From 6332639f2552b459dabf50927e91ba350cd66956 Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Mon, 20 May 2024 21:46:42 -0500 Subject: [PATCH 4/6] Separating open and closing responsibilities from `LOGGIN_CHANNEL` --- src/base.jl | 2 +- src/types.jl | 56 +++++++++++++++++++++++++++++++++++----------------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/base.jl b/src/base.jl index d42f0eb..b1a2722 100644 --- a/src/base.jl +++ b/src/base.jl @@ -22,7 +22,7 @@ end function log_evaluation(logger::Logger, performance_evaluation) result_channel = Channel{MLFlowRun}(1) - put!(logger._LOGGING_TASKS_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) + put!(logger._LOGGING_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) wait(result_channel) return take!(result_channel) diff --git a/src/types.jl b/src/types.jl index 9fcb0de..703216b 100644 --- a/src/types.jl +++ b/src/types.jl @@ -28,12 +28,45 @@ struct Logger verbosity::Int experiment_name::String artifact_location::Union{String,Nothing} - _LOGGING_TASKS_CHANNEL::Channel{Tuple} + _LOGGING_CHANNEL::Channel{Tuple} end function Logger(apiroot; experiment_name="MLJ experiment", artifact_location=nothing, verbosity=1) service = MLFlow(apiroot) + LOGGING_CHANNEL = open_logging_channel() + + Logger(service, verbosity, experiment_name, artifact_location, LOGGING_CHANNEL) +end + +function show(io::IO, logger::MLJFlow.Logger) + print(io, + "MLFLowLogger(\"$(logger.service.apiroot)\",\n" * + " experiment_name=\"$(logger.experiment_name)\",\n" * + " artifact_location=\"$(logger.artifact_location)\",\n" * + ") using MLFlow API version $(logger.service.apiversion)" + ) +end + +""" + close(logger::Logger) + +Each logger instance has a background loop that allows to execute the logging +operations from the `_LOGGING_TASKS_CHANNEL`. This function closes the channel +to stop the background loop. +""" +function close(logger::Logger) + close(logger._LOGGING_CHANNEL) +end + +""" + open_logging_channel(logger::Logger) + +To allow safe concurrent logging operations, this function opens the +`_LOGGING_TASKS_CHANNEL` of the logger and starts a background worker. +""" +function open_logging_channel() + LOGGING_CHANNEL = Channel{Tuple}() # NOTE: This background loop allows to execute the logging operations from # the LOGGING_TASKS_CHANNEL. The execution result is sent back to the @@ -43,23 +76,10 @@ function Logger(apiroot; experiment_name="MLJ experiment", # multi-processing. # # Its usage can be seen in the `log_evaluation` function in `base.jl`. - _LOGGING_TASKS_CHANNEL = Channel{Tuple}() - - Threads.@spawn begin - for (logging_function, logger, performance_evaluation, result_channel) in _LOGGING_TASKS_CHANNEL - result = logging_function(logger, performance_evaluation) - put!(result_channel, result) - end + Threads.@spawn for (logging_function, logger, performance_evaluation, result_channel) in LOGGING_CHANNEL + result = logging_function(logger, performance_evaluation) + put!(result_channel, result) end - Logger(service, verbosity, experiment_name, artifact_location, _LOGGING_TASKS_CHANNEL) -end - -function show(io::IO, logger::MLJFlow.Logger) - print(io, - "MLFLowLogger(\"$(logger.service.apiroot)\",\n" * - " experiment_name=\"$(logger.experiment_name)\",\n" * - " artifact_location=\"$(logger.artifact_location)\",\n" * - ") using MLFlow API version $(logger.service.apiversion)" - ) + return LOGGING_CHANNEL end From 420b23589c9aa1a284075426688aa64cd7131d37 Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Tue, 21 May 2024 02:09:25 -0500 Subject: [PATCH 5/6] Some format changes regarding logging Channel --- src/MLJFlow.jl | 8 +++----- src/base.jl | 2 +- src/types.jl | 20 ++++++++++---------- test/runtests.jl | 1 - 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/MLJFlow.jl b/src/MLJFlow.jl index 4274732..61a59fa 100644 --- a/src/MLJFlow.jl +++ b/src/MLJFlow.jl @@ -1,11 +1,9 @@ module MLJFlow -using MLJBase: Model, Machine, name +using MLJBase: Model, Machine, name using MLJModelInterface: flat_params -using MLFlowClient: MLFlow, logparam, logmetric, - createrun, MLFlowRun, updaterun, logartifact, - getorcreateexperiment -using .Threads: nthreads +using MLFlowClient: MLFlow, logparam, logmetric, createrun, MLFlowRun, + updaterun, logartifact, getorcreateexperiment import Base: show import MLJBase: save, log_evaluation diff --git a/src/base.jl b/src/base.jl index b1a2722..977c1be 100644 --- a/src/base.jl +++ b/src/base.jl @@ -22,7 +22,7 @@ end function log_evaluation(logger::Logger, performance_evaluation) result_channel = Channel{MLFlowRun}(1) - put!(logger._LOGGING_CHANNEL, (_log_evaluation, logger, performance_evaluation, result_channel)) + put!(logger._logging_channel, (_log_evaluation, logger, performance_evaluation, result_channel)) wait(result_channel) return take!(result_channel) diff --git a/src/types.jl b/src/types.jl index 703216b..5082190 100644 --- a/src/types.jl +++ b/src/types.jl @@ -28,15 +28,15 @@ struct Logger verbosity::Int experiment_name::String artifact_location::Union{String,Nothing} - _LOGGING_CHANNEL::Channel{Tuple} + _logging_channel::Channel{Tuple} end function Logger(apiroot; experiment_name="MLJ experiment", artifact_location=nothing, verbosity=1) service = MLFlow(apiroot) - LOGGING_CHANNEL = open_logging_channel() + logging_channel = open_logging_channel() - Logger(service, verbosity, experiment_name, artifact_location, LOGGING_CHANNEL) + Logger(service, verbosity, experiment_name, artifact_location, logging_channel) end function show(io::IO, logger::MLJFlow.Logger) @@ -52,34 +52,34 @@ end close(logger::Logger) Each logger instance has a background loop that allows to execute the logging -operations from the `_LOGGING_TASKS_CHANNEL`. This function closes the channel +operations from the `_logging_channel`. This function closes the channel to stop the background loop. """ function close(logger::Logger) - close(logger._LOGGING_CHANNEL) + close(logger._logging_channel) end """ open_logging_channel(logger::Logger) To allow safe concurrent logging operations, this function opens the -`_LOGGING_TASKS_CHANNEL` of the logger and starts a background worker. +`_logging_channel` of the logger and starts a background worker. """ function open_logging_channel() - LOGGING_CHANNEL = Channel{Tuple}() + logging_channel = Channel{Tuple}() # NOTE: This background loop allows to execute the logging operations from - # the LOGGING_TASKS_CHANNEL. The execution result is sent back to the + # the logging_channel. The execution result is sent back to the # requesting thread through the result_channel. # Until May 2024, mlflow does not support concurrent experiment creation, # which does not allow to run the logging operations in multi-threading and # multi-processing. # # Its usage can be seen in the `log_evaluation` function in `base.jl`. - Threads.@spawn for (logging_function, logger, performance_evaluation, result_channel) in LOGGING_CHANNEL + Threads.@spawn for (logging_function, logger, performance_evaluation, result_channel) in logging_channel result = logging_function(logger, performance_evaluation) put!(result_channel, result) end - return LOGGING_CHANNEL + return logging_channel end diff --git a/test/runtests.jl b/test/runtests.jl index 5122038..cbe8a7a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,4 @@ using Test -using .Threads using MLJFlow From f4d3e306400482997615f01c383e1da1d4882896 Mon Sep 17 00:00:00 2001 From: Jose Esparza <28990958+pebeto@users.noreply.github.com> Date: Tue, 21 May 2024 09:10:31 -0500 Subject: [PATCH 6/6] Removing `MLJTuning.jl` dependency, but holding it on the test suite --- Project.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Project.toml b/Project.toml index e5b43f7..247952f 100644 --- a/Project.toml +++ b/Project.toml @@ -7,7 +7,6 @@ version = "0.4.2" MLFlowClient = "64a0f543-368b-4a9a-827a-e71edb2a0b83" MLJBase = "a7f614a8-145f-11e9-1d2a-a57a1082229d" MLJModelInterface = "e80e1ace-859a-464e-9ed9-23947d8ae3ea" -MLJTuning = "03970b2e-30c4-11ea-3135-d1576263f10f" [compat] MLFlowClient = "0.5.1" @@ -24,4 +23,5 @@ StatisticalMeasures = "a19d573c-0a75-4610-95b3-7071388c7541" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "MLFlowClient", "MLJModels", "MLJDecisionTreeInterface", "StatisticalMeasures", "MLJTuning"] +test = ["Test", "MLJModels", "MLJTuning", "MLFlowClient", + "StatisticalMeasures", "MLJDecisionTreeInterface"]