Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance test #515

Merged
merged 74 commits into from
Mar 13, 2023
Merged

Add performance test #515

merged 74 commits into from
Mar 13, 2023

Conversation

varsill
Copy link
Contributor

@varsill varsill commented Jan 24, 2023

No description provided.

@varsill varsill added the no-changelog This label has to be added if changes from the PR are not meant to be placed in the CHANGELOG.md label Jan 24, 2023
@varsill varsill requested a review from FelonEkonom February 27, 2023 09:44
queues_lengths_from_multiple_tries
|> Enum.zip()
|> Enum.map(&Tuple.to_list(&1))
|> Enum.map(&(Enum.sum(&1) / length(&1)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you divide expressions like this by 1_000_00 in other places (eg. in InProgressMemory), but not there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I express InProgressMemory in MB, and in case of queue's length I don't think it's worth to use any unit prefix :D

Comment on lines 209 to 239
[first_branch | rest_of_branches] = group_of_branches

branch_ending_with_newly_added_element =
first_branch
|> via_in(Pad.ref(:input, 0))
|> child(
{:filter, current_level.level, filter_no},
%BranchedFilter{
number_of_reductions: reductions,
generator: prepare_generator(params[:max_random]),
dispatcher: prepare_dispatcher()
},
get_if_exists: true
)

unfinished_branches =
Enum.map(0..(output_pads - 1), fn pad_no ->
get_child({:filter, current_level.level, filter_no})
|> via_out(Pad.ref(:output, pad_no))
end)

newly_finished_branches =
Enum.with_index(rest_of_branches, 1)
|> Enum.map(fn {branch, branch_index_in_group} ->
branch
|> via_in(Pad.ref(:input, branch_index_in_group))
|> get_child({:filter, current_level.level, filter_no})
end)

{unfinished_branches,
newly_finished_branches ++ [branch_ending_with_newly_added_element]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[first_branch | rest_of_branches] = group_of_branches
branch_ending_with_newly_added_element =
first_branch
|> via_in(Pad.ref(:input, 0))
|> child(
{:filter, current_level.level, filter_no},
%BranchedFilter{
number_of_reductions: reductions,
generator: prepare_generator(params[:max_random]),
dispatcher: prepare_dispatcher()
},
get_if_exists: true
)
unfinished_branches =
Enum.map(0..(output_pads - 1), fn pad_no ->
get_child({:filter, current_level.level, filter_no})
|> via_out(Pad.ref(:output, pad_no))
end)
newly_finished_branches =
Enum.with_index(rest_of_branches, 1)
|> Enum.map(fn {branch, branch_index_in_group} ->
branch
|> via_in(Pad.ref(:input, branch_index_in_group))
|> get_child({:filter, current_level.level, filter_no})
end)
{unfinished_branches,
newly_finished_branches ++ [branch_ending_with_newly_added_element]}
newly_finished_branches =
Enum.with_index(group_of_branches)
|> Enum.map(fn {branch, branch_index_in_group ->
branch
|> via_in(Pad.ref(:input, branch_index_in_group))
|> child(
{:filter, current_level.level, filter_no},
%BranchedFilter{
number_of_reductions: reductions,
generator: prepare_generator(params[:max_random]),
dispatcher: prepare_dispatcher()
},
get_if_exists: true
)
end)
unfinished_branches =
Enum.map(0..(output_pads - 1), fn pad_no ->
get_child({:filter, current_level.level, filter_no})
|> via_out(Pad.ref(:output, pad_no))
end)
{unfinished_branches, newly_finished_branches}

Copy link
Contributor Author

@varsill varsill Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot ;)
[EDIT] Well, in fact it's not possible to write it that way. In the children specification (by which I mean the argument of the spec action) there cannot be duplicated children - and what get_if_exists does is that it simply does not allow to spawn children, that are already in the parent's state (that is - the children that were spawned in the previous spec actions). Briefly speaking, the specification must be "correct" as get_if_exists won't remove duplicates in the specification itself.

get_if_exists cannot affect the duplicates in the specification because we would have some kind of a undefined behaviour. Consider the following scenario:

spec = [
  {child(:x, X, get_if_exists?: true), group: {:first_group, :temporary}},
{child(:x, X, get_if_exists?: true), group: {:second_group, :temporary}}
]

To which children group should the child :x belong in case get_if_exists?: true would remove the duplicates from the spec?

if rem(how_many_output_pads, input_pads) != 0,
do: raise("Wrong branched pipeline specification!")

{unfinished_branches_list, newly_finished_branches_list} =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{unfinished_branches_list, newly_finished_branches_list} =
{unfinished_branches_lists, newly_finished_branches_lists} =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

how_many_buffers_to_output = state.generator.(length(state.buffers))

if how_many_buffers_to_output > 0 do
output_pads = Map.keys(ctx.pads) |> Enum.filter(&match?({Membrane.Pad, :output, _}, &1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_pads = Map.keys(ctx.pads) |> Enum.filter(&match?({Membrane.Pad, :output, _}, &1))
output_pads = Map.keys(ctx.pads) |> Enum.filter(&match?(Pad.ref(:output, _id), &1))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

state.workload_simulation.()
{[forward: buffer], state}
how_many_buffers_to_output = state.generator.(length(state.buffers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a code formatting suggestion

Suggested change
how_many_buffers_to_output = state.generator.(length(state.buffers))
how_many_buffers_to_output = length(state.buffers) |> state.generator.()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done here and in the linear_filter.ex

Comment on lines +49 to +61
{actions, rest_of_buffers} =
Enum.zip(output_pads, how_many_buffers_per_pad)
|> Enum.map_reduce(buffers, fn {pad, how_many_buffers_per_pad}, buffers_left ->
{buffers_for_this_pad, rest_buffers} = Enum.split(buffers_left, how_many_buffers_per_pad)
action = {:buffer, {pad, buffers_for_this_pad}}
{action, rest_buffers}
end)

if rest_of_buffers != [] do
raise("The dispatcher function is working improperly!")
end

actions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second argument of Enum.zip/2 should be enumerable, but how_many_buffers_per_pad sounds like it is a number

Suggested change
{actions, rest_of_buffers} =
Enum.zip(output_pads, how_many_buffers_per_pad)
|> Enum.map_reduce(buffers, fn {pad, how_many_buffers_per_pad}, buffers_left ->
{buffers_for_this_pad, rest_buffers} = Enum.split(buffers_left, how_many_buffers_per_pad)
action = {:buffer, {pad, buffers_for_this_pad}}
{action, rest_buffers}
end)
if rest_of_buffers != [] do
raise("The dispatcher function is working improperly!")
end
actions
Enum.chunk_every(buffers, how_many_buffers_per_pad)
|> Enum.zip(output_pads)
|> Enum.map(fn {buffers_chunk, pad} -> {:buffer, {pad, buffers_chunk}} end)

Copy link
Contributor Author

@varsill varsill Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, how_many_buffers_per_pad is not an integer - it's a list of integers which sums to the number of buffers to be dispatched. [1, 4, 3] is a valid how_many_buffers_per_pad list for 3 pads and 8 buffers, and such a list means, that one buffer should be sent through the first pad, 4 buffers should be sent through the second pad and 3 buffers should be sent through the third pad.

@@ -4,13 +4,17 @@ defmodule Benchmark.Run.Pipeline do

@impl true
def handle_init(_ctx, options) do
{[spec: options[:spec]], %{monitoring_process: options[:monitoring_process]}}
{[spec: options[:spec]], %{monitoring_process: options[:monitoring_process], memory: []}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field memory in state seems to be unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, thanks for spotting

Comment on lines 145 to 160
child(%Membrane.Testing.Source{
output:
{1,
fn state, _size ->
if state < params[:number_of_buffers] do
{[
buffer:
{:output,
%Membrane.Buffer{payload: :crypto.strong_rand_bytes(params[:buffer_size])}},
redemand: :output
], state + 1}
else
{[end_of_stream: :output], state}
end
end}
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too much nesting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

final_level =
Enum.reduce(struct, initial_level, fn {input_pads, output_pads}, current_level ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body of the lambda in this reduce is giant, maybe we can split it to some private functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

task =
Task.async(fn ->
Enum.each(1..n, fn _x -> test_function() end)
:erlang.process_info(self())[:reductions]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_info/1 (without options) is only for debugging, probably for performance reasons

Suggested change
:erlang.process_info(self())[:reductions]
Process.info(self(), :reductions) |> elem(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@varsill varsill requested review from mat-hek and FelonEkonom March 6, 2023 12:33
benchmark/run.exs Outdated Show resolved Hide resolved
varsill and others added 2 commits March 6, 2023 14:13
Co-authored-by: Feliks Pobiedziński <38541925+FelonEkonom@users.noreply.github.com>
@varsill varsill merged commit 209552c into master Mar 13, 2023
@varsill varsill deleted the performance_test branch March 13, 2023 12:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-changelog This label has to be added if changes from the PR are not meant to be placed in the CHANGELOG.md
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants