Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only "sequental' plan can work #649

Open
Yunuuuu opened this issue Sep 29, 2022 · 5 comments
Open

Only "sequental' plan can work #649

Yunuuuu opened this issue Sep 29, 2022 · 5 comments
Labels
waiting on Waiting on a follow-up reply

Comments

@Yunuuuu
Copy link

Yunuuuu commented Sep 29, 2022

(Please use https://github.com/HenrikBengtsson/future/discussions for Q&A)

Describe the bug
I have following code, I have run it in "multicore" with workers equal to 1 or 2.

future::plan("multicore", workers = 2)
biomisc::run_absolute(
    cnv_data[Sample %in% samples],
    snv_data[Tumor_Sample_Barcode %in% samples],
    primary_disease = "Bladder Cancer",
    results_dir = here("results", "ABSOLUTE")
)

in the same computer, but run in another R session

future::plan("multicore", workers = 1)
biomisc::run_absolute(
    cnv_data[Sample %in% samples],
    snv_data[Tumor_Sample_Barcode %in% samples],
    primary_disease = "Bladder Cancer",
    results_dir = here("results", "ABSOLUTE")
)

biomisc::run_absolute (https://github.com/Yunuuuu/biomisc/blob/main/R/run_absolute.R) contains following code:

p <- progressr::progressor(
            along = absolute_filepath[["sample_id"]],
            auto_finish = FALSE
        )
        future.apply::future_lapply(
            absolute_filepath[["sample_id"]],
            function(sample_id) {
                p(type = "update")
                maf_fn <- absolute_filepath[["maf"]][[sample_id]]
                if (is.na(maf_fn)) {
                    maf_fn <- NULL
                }
                absolute_safe(
                    seg_dat_fn = absolute_filepath[["seg"]][[sample_id]],
                    maf_fn = maf_fn,
                    sample_name = sample_id,
                    sigma_p = sigma_p, max_sigma_h = max_sigma_h,
                    min_ploidy = min_ploidy, max_ploidy = max_ploidy,
                    primary_disease = primary_disease, platform = platform,
                    results_dir = run_absolute_dir,
                    max_as_seg_count = max_as_seg_count,
                    max_non_clonal = max_non_clonal,
                    max_neg_genome = max_neg_genome,
                    copy_num_type = copy_num_type,
                    min_mut_af = min_mut_af
                )
            },
            future.globals = TRUE
        )
        p(type = "done")

A clear and concise description of what the bug is.
When I use workers 2, I top command indicates R usage is around 1-5%.
So I run another R session with worker 1, which can lead to a up to 600-650% CPU usage
16ae8430e2812232ffca90ad5d41e91
and after the second R session command returned a result(begin later), the first R session still running here:
image

Session information

R session output:

[R]> sessionInfo()
R version 4.2.1 (2022-06-23)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 22.04.1 LTS

Matrix products: default
BLAS/LAPACK: /usr/lib/x86_64-linux-gnu/libmkl_rt.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=zh_CN.UTF-8        LC_COLLATE=en_US.UTF-8
 [5] LC_MONETARY=zh_CN.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=zh_CN.UTF-8       LC_NAME=C
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=zh_CN.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base

other attached packages:
[1] here_1.0.1        data.table_1.14.3

loaded via a namespace (and not attached):
 [1] progressr_0.11.0     Rcpp_1.0.9           codetools_0.2-18     listenv_0.8.0        prettyunits_1.1.1    future_1.28.0
 [7] crayon_1.5.1         digest_0.6.29        rprojroot_2.0.3      parallelly_1.32.1    R6_2.5.1             lifecycle_1.0.2.9000
[13] future.apply_1.9.1   progress_1.2.2       rlang_1.0.5          cli_3.4.0            vctrs_0.4.1          ellipsis_0.3.2
[19] tools_4.2.1          hms_1.1.2            ABSOLUTE_1.0.6       numDeriv_2016.8-1.1  parallel_4.2.1       compiler_4.2.1
[25] pkgconfig_2.0.3      globals_0.16.1       biomisc_0.0.0.9000

the first R future session:

[R]> future::futureSessfuture::futureSessionInfo()
*** Package versions
future 1.28.0, parallelly 1.32.1, parallel 4.2.1, globals 0.16.1, listenv 0.8.0

*** Allocations
availableCores():
system  nproc
    16     16
availableWorkers():
$system
 [1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[11] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"


*** Settings
- future.plan=<not set>
- future.fork.multithreading.enable=<not set>
- future.globals.maxSize=<not set>
- future.globals.onReference=<not set>
- future.resolve.recursive=<not set>
- future.rng.onMisuse=<not set>
- future.wait.timeout=<not set>
- future.wait.interval=<not set>
- future.wait.alpha=<not set>
- future.startup.script=<not set>

*** Backends
Number of workers: 2
List of future strategies:
1. multicore:
   - args: function (..., workers = 2, envir = parent.frame())
   - tweaked: TRUE
   - call: future::plan("multicore", workers = 2)

*** Basic tests
Main R session details:
      pid     r sysname           release                                     version nodename machine   login    user
1 1034146 4.2.1   Linux 5.15.0-47-generic #51-Ubuntu SMP Thu Aug 11 07:51:15 UTC 2022  host001  x86_64 user001 user001
  effective_user
1        user001

the second future R session

[R]>  future::futureSes future::futureSessionInfo()
*** Package versions
future 1.28.0, parallelly 1.32.1, parallel 4.2.1, globals 0.16.1, listenv 0.8.0

*** Allocations
availableCores():
system  nproc
    16     16
availableWorkers():
$system
 [1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[11] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"


*** Settings
- future.plan=<not set>
- future.fork.multithreading.enable=<not set>
- future.globals.maxSize=<not set>
- future.globals.onReference=<not set>
- future.resolve.recursive=<not set>
- future.rng.onMisuse=<not set>
- future.wait.timeout=<not set>
- future.wait.interval=<not set>
- future.wait.alpha=<not set>
- future.startup.script=<not set>

*** Backends
Number of workers: 1
List of future strategies:
1. multicore:
   - args: function (..., workers = 1, envir = parent.frame())
   - tweaked: TRUE
   - call: future::plan("multicore", workers = 1)

*** Basic tests
Main R session details:
      pid     r sysname           release                                     version nodename machine   login    user
1 1036230 4.2.1   Linux 5.15.0-47-generic #51-Ubuntu SMP Thu Aug 11 07:51:15 UTC 2022  host001  x86_64 user001 user001
  effective_user
1        user001
Worker R session details:
  worker     pid     r sysname           release                                     version nodename machine   login    user
1      1 1036230 4.2.1   Linux 5.15.0-47-generic #51-Ubuntu SMP Thu Aug 11 07:51:15 UTC 2022  host001  x86_64 user001 user001
  effective_user
1        user001
Number of unique worker PIDs: 1 (as expected)
@Yunuuuu Yunuuuu added the bug label Sep 29, 2022
@Yunuuuu
Copy link
Author

Yunuuuu commented Sep 29, 2022

Hi, I designed absolute_safe in this way, I don't know if these condition message can lead to this problem:

absolute_safe <- function(seg_dat_fn, maf_fn,
                          sample_name, sigma_p, max_sigma_h,
                          min_ploidy, max_ploidy, primary_disease, platform,
                          results_dir, max_as_seg_count, max_non_clonal,
                          max_neg_genome, copy_num_type, min_mut_af) {
    absolute_args <- list(
        sample.name = sample_name,
        sigma.p = sigma_p, max.sigma.h = max_sigma_h,
        min.ploidy = min_ploidy, max.ploidy = max_ploidy,
        primary.disease = primary_disease, platform = platform,
        results.dir = results_dir, max.as.seg.count = max_as_seg_count,
        max.non.clonal = max_non_clonal,
        max.neg.genome = max_neg_genome,
        copy_num_type = copy_num_type
    )

    rlang::try_fetch(
        {
            suppressWarnings(rlang::inject(ABSOLUTE::RunAbsolute(
                seg.dat.fn = seg_dat_fn,
                !!!absolute_args,
                maf.fn = maf_fn,
                min.mut.af = min_mut_af
            )))
        },
        error = function(cnd) {
            if (any(grepl("mutations left", conditionMessage(cnd)))) {
                cli::cli_warn(c(
                    "Detecting error in sample: {.filed {sample_name}}",
                    "x" = conditionMessage(cnd),
                    "i" = "Try to fix error by removing maf ({.file maf_fn}) file"
                ))
                rlang::try_fetch(
                    {
                        suppressWarnings(rlang::inject(ABSOLUTE::RunAbsolute(
                            seg.dat.fn = seg_dat_fn,
                            !!!absolute_args,
                            maf.fn = NULL,
                            min.mut.af = NULL
                        )))
                        cli::cli_inform(c(
                            "v" = "Fixing {.filed {sample_name}} successfully"
                        ))
                    },
                    error = function(cnd2) {
                        cli::cli_warn(c(
                            "Fixing {.filed {sample_name}} failed",
                            "x" = conditionMessage(cnd2),
                            "i" = "Skipping this sample."
                        ))
                    }
                )
            } else {
                cli::cli_warn(c(
                    "Detecting error in sample: {.filed {sample_name}}",
                    "x" = conditionMessage(cnd),
                    "i" = "Skipping this sample"
                ))
            }
        }
    )
}

@HenrikBengtsson
Copy link
Collaborator

Quick comments:

In case you're not already aware, when you use plan(multicore, workers = 1), it'll fall back to plan(sequential). To truly use a single forked parallel worker, use plan(multicore, workers = I(1)). Then you'd compare one vs two parallel workers.

When I use workers 2, I top command indicates R usage is around 1-5%.
So I run another R session with worker 1, which can lead to a up to 600-650% CPU usage
and after the second R session command returned a result(begin later), the first R session still running here

I'm not sure if I fully understand. Are you're saying you would not expect plan(multicore, workers = 1), which is plan(sequential), to use 500-600% of the CPU, but rather only 100% CPU. If so, yes, that shouldn't be the case. To me, that suggests that there's something else in your core that runs in parallel, e.g. one of the packages you use. This could happen if there's some C/C++ code involved that run multithreaded. I would check if ABSOLUTE runs in parallel.

If you're saying plan(multicore, workers = 1), which is plan(sequential), runs faster than plan(multicore, workers = 2), then, yes, that's surprising. It could be that there are too many progressr updates being generated in a short period of time, which could clog up the communication channel that multicore uses. But, from your code it looks like there's only one progress update per sample processed, and I doubt you're running into this problem. It could be some other race condition going on. It could also be that your code, or code that you call, is not fork safe.

As next steps:

  1. I would try to compare plan(multicore, workers = I(1)) to plan(multicore, workers = 2) and see if you get consistent results there, i.e. two workers are roughly twice as fast as one. Don't run them at the same time to minimize the risk of hitting the same data/files at the same time.

  2. I would also try with multisession ("regular" parallelization) rather than multicore (forked parallelization), i.e. plan(multisession, workers = I(1)) and plan(multisession, workers = 2). Looking at your code, and knowing a bit about ABSOLUTE, I don't think there's a benefit of using forked processing, i.e. it's always safer to stay away from if you can. Forked processing in R comes with a lot of hidden pitfalls, and as a developer, there's nothing you can really do to protect yourself from them.

@HenrikBengtsson HenrikBengtsson added waiting on Waiting on a follow-up reply and removed bug labels Oct 4, 2022
@Yunuuuu
Copy link
Author

Yunuuuu commented Oct 11, 2022

Thanks for your reply, @HenrikBengtsson . I have tested the combination between ABSOLUTE and future.

There are two consequence.

  • if we used multicore plan with more than one workers, the progress will never over (I have waited for several days)
  • if we used multisession plan with more than one workers, the results are dependent on the sample size. I have tried small samples (3 samples), the resulted files are the same with what returned by sequential plan. But when I tried a large samples (796 samples), it gave a lot of NA value (without error), I tried the same analysis with sequential plan, it's much faster than the same analysis controlled by multisession plan, and the resulted files will give more true estimated value. I don't know why. It seems different ABSOLUTE process will clash with each other? (As running ABSOLUTE is a time-consuming process, I have no time to test another samples)

I finally decided to remove future strategy in ABSOLUTE process and used raw lapply to manipulate the loops across different samples. Thanks for your kind help.

@Yunuuuu
Copy link
Author

Yunuuuu commented Oct 11, 2022

The official ABOSOLUTE also post some code to run ABSOLUTE parallelly which running foreach with doMC (multicore).

## library(doMC)
## registerDoMC(20)
foreach (scan=scans, .combine=c) %dopar% {
  DoAbsolute(scan, sif)
}

@Yunuuuu
Copy link
Author

Yunuuuu commented Nov 1, 2022

After continual usage of future in everyday parallel function in linux, I found results error or future cannot make full use of all the cores (I setting via parameter workers). Indeed, I have used future a lot in windows previously where we often used a raw R blas. In linux, it's easy and often we compile with a high performance blas like MKL and openblas. There may be some conflicts between these BLAS and forked processing. Maybe we can add some warnings when detect the usage of MKL or openblas in future process?

Some similar finding also exits here: #390.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting on Waiting on a follow-up reply
Projects
None yet
Development

No branches or pull requests

2 participants