From d3ea741044129306885bcf28694dd2fd0869e413 Mon Sep 17 00:00:00 2001 From: Matt Enlow Date: Fri, 12 Jan 2024 09:27:42 -0700 Subject: [PATCH 1/4] Replace Enum.map(&Task.async(...)) with Task.async_stream --- lib/credo/check/consistency/collector.ex | 8 ++++---- lib/credo/check/design/duplicated_code.ex | 19 ++++++------------ lib/credo/cli/output/summary.ex | 7 ++----- lib/credo/sources.ex | 24 ++++++----------------- 4 files changed, 18 insertions(+), 40 deletions(-) diff --git a/lib/credo/check/consistency/collector.ex b/lib/credo/check/consistency/collector.ex index 8bd2e8736..45c3ceef9 100644 --- a/lib/credo/check/consistency/collector.ex +++ b/lib/credo/check/consistency/collector.ex @@ -155,8 +155,8 @@ defmodule Credo.Check.Consistency.Collector do ) do frequencies_per_source_file = source_files - |> Enum.map(&Task.async(fn -> {&1, collector.collect_matches(&1, params)} end)) - |> Enum.map(&Task.await(&1, :infinity)) + |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, timeout: :infinity) + |> Enum.map(fn {:ok, frequencies} -> frequencies end) frequencies = total_frequencies(frequencies_per_source_file) @@ -167,8 +167,8 @@ defmodule Credo.Check.Consistency.Collector do result = frequencies_per_source_file |> source_files_with_issues(most_frequent_match) - |> Enum.map(&Task.async(fn -> issue_formatter.(most_frequent_match, &1, params) end)) - |> Enum.flat_map(&Task.await(&1, :infinity)) + |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), timeout: :infinity) + |> Enum.flat_map(fn {:ok, issue} -> issue end) result else diff --git a/lib/credo/check/design/duplicated_code.ex b/lib/credo/check/design/duplicated_code.ex index 476c9e92f..8a29ce19e 100644 --- a/lib/credo/check/design/duplicated_code.ex +++ b/lib/credo/check/design/duplicated_code.ex @@ -54,18 +54,11 @@ defmodule Credo.Check.Design.DuplicatedCode do defp append_issues_via_issue_service(found_hashes, source_files, nodes_threshold, params, exec) when is_map(found_hashes) do found_hashes - |> Enum.map( - &Task.async(fn -> - do_append_issues_via_issue_service( - &1, - source_files, - nodes_threshold, - params, - exec - ) - end) + |> Task.async_stream( + &do_append_issues_via_issue_service(&1, source_files, nodes_threshold, params, exec), + timeout: :infinity ) - |> Enum.map(&Task.await(&1, :infinity)) + |> Enum.map(fn {:ok, result} -> result end) end defp do_append_issues_via_issue_service( @@ -97,8 +90,8 @@ defmodule Credo.Check.Design.DuplicatedCode do chunked_nodes = source_files |> Enum.chunk_every(30) - |> Enum.map(&Task.async(fn -> calculate_hashes_for_chunk(&1, mass_threshold) end)) - |> Enum.map(&Task.await(&1, :infinity)) + |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), timeout: :infinity) + |> Enum.map(fn {:ok, hashes} -> hashes end) nodes = Enum.reduce(chunked_nodes, %{}, fn current_hashes, existing_hashes -> diff --git a/lib/credo/cli/output/summary.ex b/lib/credo/cli/output/summary.ex index dcce0f2cc..a733ed711 100644 --- a/lib/credo/cli/output/summary.ex +++ b/lib/credo/cli/output/summary.ex @@ -165,13 +165,10 @@ defmodule Credo.CLI.Output.Summary do Credo.Code.prewalk(source_file, &scope_count_traverse/2, 0) end - defp scope_count([]), do: 0 - defp scope_count(source_files) when is_list(source_files) do source_files - |> Enum.map(&Task.async(fn -> scope_count(&1) end)) - |> Enum.map(&Task.await/1) - |> Enum.reduce(&(&1 + &2)) + |> Task.async_stream(&scope_count/1) + |> Enum.reduce(0, fn {:ok, n}, sum -> n + sum end) end @def_ops [:defmodule, :def, :defp, :defmacro] diff --git a/lib/credo/sources.ex b/lib/credo/sources.ex index 860976ddc..10c0200af 100644 --- a/lib/credo/sources.ex +++ b/lib/credo/sources.ex @@ -170,24 +170,12 @@ defmodule Credo.Sources do end defp read_files(filenames, parse_timeout) do - tasks = Enum.map(filenames, &Task.async(fn -> to_source_file(&1) end)) - - task_dictionary = - tasks - |> Enum.zip(filenames) - |> Enum.into(%{}) - - tasks_with_results = Task.yield_many(tasks, parse_timeout) - - results = - Enum.map(tasks_with_results, fn {task, res} -> - # Shutdown the tasks that did not reply nor exit - {task, res || Task.shutdown(task, :brutal_kill)} - end) - - Enum.map(results, fn - {_task, {:ok, value}} -> value - {task, nil} -> SourceFile.timed_out(task_dictionary[task]) + filenames + |> Task.async_stream(&to_source_file/1, timeout: parse_timeout, on_timeout: :kill_task) + |> Stream.zip(filenames) + |> Enum.map(fn + {{:exit, :timeout}, filename} -> SourceFile.timed_out(filename) + {{:ok, value}, _} -> value end) end From fd9ce5e21e4dde4c1ccf16cf82ff7dcf1b037093 Mon Sep 17 00:00:00 2001 From: Matt Enlow Date: Fri, 12 Jan 2024 09:49:29 -0700 Subject: [PATCH 2/4] optimization: remove ordering --- lib/credo/check.ex | 3 ++- lib/credo/check/consistency/collector.ex | 4 ++-- lib/credo/check/design/duplicated_code.ex | 7 ++++--- lib/credo/check/runner.ex | 7 +------ lib/credo/cli/output/summary.ex | 2 +- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/lib/credo/check.ex b/lib/credo/check.ex index 3e09f81e5..101fb3e4e 100644 --- a/lib/credo/check.ex +++ b/lib/credo/check.ex @@ -402,7 +402,8 @@ defmodule Credo.Check do source_files |> Task.async_stream(fn source -> run_on_source_file(exec, source, params) end, max_concurrency: exec.max_concurrent_check_runs, - timeout: :infinity + timeout: :infinity, + ordered: false ) |> Stream.run() diff --git a/lib/credo/check/consistency/collector.ex b/lib/credo/check/consistency/collector.ex index 45c3ceef9..a4696d329 100644 --- a/lib/credo/check/consistency/collector.ex +++ b/lib/credo/check/consistency/collector.ex @@ -155,7 +155,7 @@ defmodule Credo.Check.Consistency.Collector do ) do frequencies_per_source_file = source_files - |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, timeout: :infinity) + |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, timeout: :infinity, ordered: false) |> Enum.map(fn {:ok, frequencies} -> frequencies end) frequencies = total_frequencies(frequencies_per_source_file) @@ -167,7 +167,7 @@ defmodule Credo.Check.Consistency.Collector do result = frequencies_per_source_file |> source_files_with_issues(most_frequent_match) - |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), timeout: :infinity) + |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), timeout: :infinity, ordered: false) |> Enum.flat_map(fn {:ok, issue} -> issue end) result diff --git a/lib/credo/check/design/duplicated_code.ex b/lib/credo/check/design/duplicated_code.ex index 8a29ce19e..1f5aeb399 100644 --- a/lib/credo/check/design/duplicated_code.ex +++ b/lib/credo/check/design/duplicated_code.ex @@ -56,9 +56,10 @@ defmodule Credo.Check.Design.DuplicatedCode do found_hashes |> Task.async_stream( &do_append_issues_via_issue_service(&1, source_files, nodes_threshold, params, exec), - timeout: :infinity + timeout: :infinity, + ordered: false ) - |> Enum.map(fn {:ok, result} -> result end) + |> Stream.run() end defp do_append_issues_via_issue_service( @@ -90,7 +91,7 @@ defmodule Credo.Check.Design.DuplicatedCode do chunked_nodes = source_files |> Enum.chunk_every(30) - |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), timeout: :infinity) + |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), timeout: :infinity, ordered: false) |> Enum.map(fn {:ok, hashes} -> hashes end) nodes = diff --git a/lib/credo/check/runner.ex b/lib/credo/check/runner.ex index 6c309dd27..e44388d0b 100644 --- a/lib/credo/check/runner.ex +++ b/lib/credo/check/runner.ex @@ -20,12 +20,7 @@ defmodule Credo.Check.Runner do |> fix_deprecated_notation_for_checks_without_params() check_tuples - |> Task.async_stream( - fn check_tuple -> - run_check(exec, check_tuple) - end, - timeout: :infinity - ) + |> Task.async_stream(&run_check(exec, &1), timeout: :infinity, ordered: false) |> Stream.run() :ok diff --git a/lib/credo/cli/output/summary.ex b/lib/credo/cli/output/summary.ex index a733ed711..a48c51f2d 100644 --- a/lib/credo/cli/output/summary.ex +++ b/lib/credo/cli/output/summary.ex @@ -167,7 +167,7 @@ defmodule Credo.CLI.Output.Summary do defp scope_count(source_files) when is_list(source_files) do source_files - |> Task.async_stream(&scope_count/1) + |> Task.async_stream(&scope_count/1, ordered: false) |> Enum.reduce(0, fn {:ok, n}, sum -> n + sum end) end From a07f9a373b72d897e5b040b26a982f2000c71cba Mon Sep 17 00:00:00 2001 From: Matt Enlow Date: Fri, 12 Jan 2024 12:03:39 -0700 Subject: [PATCH 3/4] simplify duplicated code pipeline --- lib/credo/check/design/duplicated_code.ex | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/credo/check/design/duplicated_code.ex b/lib/credo/check/design/duplicated_code.ex index 1f5aeb399..d87355053 100644 --- a/lib/credo/check/design/duplicated_code.ex +++ b/lib/credo/check/design/duplicated_code.ex @@ -88,14 +88,11 @@ defmodule Credo.Check.Design.DuplicatedCode do end defp duplicate_nodes(source_files, mass_threshold) do - chunked_nodes = + nodes = source_files |> Enum.chunk_every(30) |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), timeout: :infinity, ordered: false) - |> Enum.map(fn {:ok, hashes} -> hashes end) - - nodes = - Enum.reduce(chunked_nodes, %{}, fn current_hashes, existing_hashes -> + |> Enum.reduce(%{}, fn {:ok, current_hashes}, existing_hashes -> Map.merge(existing_hashes, current_hashes, fn _hash, node_items1, node_items2 -> node_items1 ++ node_items2 end) @@ -197,10 +194,7 @@ defmodule Credo.Check.Design.DuplicatedCode do else hash = ast |> Credo.Code.remove_metadata() |> to_hash node_item = %{node: ast, filename: filename, mass: nil} - node_items = Map.get(existing_hashes, hash, []) - - updated_hashes = Map.put(existing_hashes, hash, node_items ++ [node_item]) - + updated_hashes = Map.update(existing_hashes, hash, [node_item], &[node_item | &1]) {ast, updated_hashes} end end From 91dc1d6f409a3b004f194a8c666a96a5fc0e25bf Mon Sep 17 00:00:00 2001 From: Matt Enlow Date: Fri, 12 Jan 2024 12:04:12 -0700 Subject: [PATCH 4/4] format --- lib/credo/check/consistency/collector.ex | 10 ++++++++-- lib/credo/check/design/duplicated_code.ex | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/credo/check/consistency/collector.ex b/lib/credo/check/consistency/collector.ex index a4696d329..6adb263af 100644 --- a/lib/credo/check/consistency/collector.ex +++ b/lib/credo/check/consistency/collector.ex @@ -155,7 +155,10 @@ defmodule Credo.Check.Consistency.Collector do ) do frequencies_per_source_file = source_files - |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, timeout: :infinity, ordered: false) + |> Task.async_stream(&{&1, collector.collect_matches(&1, params)}, + timeout: :infinity, + ordered: false + ) |> Enum.map(fn {:ok, frequencies} -> frequencies end) frequencies = total_frequencies(frequencies_per_source_file) @@ -167,7 +170,10 @@ defmodule Credo.Check.Consistency.Collector do result = frequencies_per_source_file |> source_files_with_issues(most_frequent_match) - |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), timeout: :infinity, ordered: false) + |> Task.async_stream(&issue_formatter.(most_frequent_match, &1, params), + timeout: :infinity, + ordered: false + ) |> Enum.flat_map(fn {:ok, issue} -> issue end) result diff --git a/lib/credo/check/design/duplicated_code.ex b/lib/credo/check/design/duplicated_code.ex index d87355053..342522e76 100644 --- a/lib/credo/check/design/duplicated_code.ex +++ b/lib/credo/check/design/duplicated_code.ex @@ -91,7 +91,10 @@ defmodule Credo.Check.Design.DuplicatedCode do nodes = source_files |> Enum.chunk_every(30) - |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), timeout: :infinity, ordered: false) + |> Task.async_stream(&calculate_hashes_for_chunk(&1, mass_threshold), + timeout: :infinity, + ordered: false + ) |> Enum.reduce(%{}, fn {:ok, current_hashes}, existing_hashes -> Map.merge(existing_hashes, current_hashes, fn _hash, node_items1, node_items2 -> node_items1 ++ node_items2