Skip to content

Commit

Permalink
Ensure error logs are always printed (#822)
Browse files Browse the repository at this point in the history
* Add macro ensuring errors are logged

* mix format

* Refactor logs

* Fix small bug

* Update lib/membrane/core/macros.ex

Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>

* Implement CR suggestions

* Remove leftover

* Release v1.1.0

---------

Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>
  • Loading branch information
FelonEkonom and varsill authored Jun 11, 2024
1 parent c7afa10 commit 5f53b95
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 206 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Changelog

## 1.1.0-rc1
## 1.1.0
* Add new callbacks `handle_child_setup_completed/3` and `handle_child_playing/3` in Bins and Pipelines. [#801](https://github.com/membraneframework/membrane_core/pull/801)

## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Set `:ratio` dependency version to `"~> 3.0 or ~> 4.0"`. [#780](https://github.com/membraneframework/membrane_core/pull/780)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma
The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get`

```elixir
{:membrane_core, "~> 1.0"}
{:membrane_core, "~> 1.1"}
```

**Standalone libraries**
Expand Down
41 changes: 28 additions & 13 deletions lib/membrane/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ defmodule Membrane.Clock do
alias Membrane.Core.Message
alias Membrane.Time

require Membrane.Core.Utils, as: Utils

@typedoc @moduledoc
@type t :: pid

Expand Down Expand Up @@ -115,6 +117,12 @@ defmodule Membrane.Clock do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for])

state =
Expand All @@ -131,7 +139,13 @@ defmodule Membrane.Clock do
end

@impl GenServer
def handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
def handle_cast(request, state) do
Utils.log_on_error do
do_handle_cast(request, state)
end
end

defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
if state.proxy_for, do: unsubscribe(state.proxy_for)

state = %{state | proxy_for: proxy_for}
Expand All @@ -147,8 +161,7 @@ defmodule Membrane.Clock do
{:noreply, state}
end

@impl GenServer
def handle_cast({:clock_subscribe, pid}, state) do
defp do_handle_cast({:clock_subscribe, pid}, state) do
state
|> update_in([:subscribers, pid], fn
nil ->
Expand All @@ -162,8 +175,7 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_cast({:clock_unsubscribe, pid}, state) do
defp do_handle_cast({:clock_unsubscribe, pid}, state) do
if Map.has_key?(state.subscribers, pid) do
{subs, state} =
state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1))
Expand All @@ -175,24 +187,27 @@ defmodule Membrane.Clock do
~> {:noreply, &1}
end

@impl GenServer
def handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
@impl true
def handle_info(msg, state) do
Utils.log_on_error do
do_handle_info(msg, state)
end
end

defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
{:noreply, handle_clock_update(till_next, state)}
end

@impl GenServer
def handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
{:noreply, broadcast_and_update_ratio(ratio, state)}
end

@impl GenServer
# When ratio from previously proxied clock comes in after unsubscribing
def handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
{:noreply, state}
end

@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:noreply, handle_unsubscribe(pid, state)}
end

Expand Down
40 changes: 27 additions & 13 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Membrane.Core.Bin do

alias Membrane.ResourceGuard

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message
require Membrane.Core.Telemetry
require Membrane.Logger
Expand Down Expand Up @@ -79,6 +80,12 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
Process.link(options.parent_supervisor)
%{name: name, module: module} = options

Expand Down Expand Up @@ -144,13 +151,17 @@ defmodule Membrane.Core.Bin do

@impl GenServer
def handle_continue(:setup, state) do
state = Parent.LifecycleController.handle_setup(state)
{:noreply, state}
Utils.log_on_error do
state = Parent.LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_info(message, state) do
do_handle_info(message, state)
Utils.log_on_error do
do_handle_info(message, state)
end
end

@compile {:inline, do_handle_info: 2}
Expand All @@ -162,7 +173,6 @@ defmodule Membrane.Core.Bin do

defp do_handle_info(Message.new(:parent_notification, notification), state) do
state = Child.LifecycleController.handle_parent_notification(notification, state)

{:noreply, state}
end

Expand Down Expand Up @@ -251,22 +261,26 @@ defmodule Membrane.Core.Bin do
end

@impl GenServer
def handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
def handle_call(request, from, state) do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end

defp do_handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
{reply, state} = PadController.handle_link(direction, this, other, params, state)
{:reply, reply, state}
end

@impl GenServer
def handle_call(Message.new(:get_clock), _from, state) do
defp do_handle_call(Message.new(:get_clock), _from, state) do
{:reply, state.synchronization.clock, state}
end

@impl GenServer
def handle_call(Message.new(:get_child_pid, child_name), _from, state) do
defp do_handle_call(Message.new(:get_child_pid, child_name), _from, state) do
reply =
with %State{children: %{^child_name => %{pid: child_pid}}} <- state do
{:ok, child_pid}
Expand Down
24 changes: 2 additions & 22 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ defmodule Membrane.Core.CallbackHandler do
Error handling actions returned by callback #{inspect(state.module)}.#{callback}
""")

log_debug_orginal_error(actions, e, __STACKTRACE__)

reraise e, __STACKTRACE__
end

Expand All @@ -197,32 +195,14 @@ defmodule Membrane.Core.CallbackHandler do
rescue
e ->
Membrane.Logger.error("""
Error handling action #{inspect(action)} returned by callback #{inspect(state.module)}.#{callback}
Error handling action returned by callback #{inspect(state.module)}.#{callback}.
Action: #{inspect(action, pretty: true)}
""")

log_debug_orginal_error(action, e, __STACKTRACE__)

reraise e, __STACKTRACE__
end
end)

handler_module.handle_end_of_actions(state)
end

# We log it, because sometimes, for some reason, crashing process doesn't cause
# printing error logs on stderr, so this debug log allows us to get some info
# about what happened in case of process crash
defp log_debug_orginal_error(action_or_actions, error, stacktrace) do
action_or_actions =
if(is_list(action_or_actions), do: "actions ", else: "action ") <>
inspect(action_or_actions, limit: :infinity)

Membrane.Logger.debug("""
Error while handling #{action_or_actions}
Orginal error:
#{inspect(error, pretty: true, limit: :infinity)}
#{Exception.format_stacktrace(stacktrace)}
""")
end
end
53 changes: 33 additions & 20 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Membrane.Core.Element do

alias Membrane.Core.{SubprocessSupervisor, TimerController}

require Membrane.Core.Utils, as: Utils
require Membrane.Core.Message, as: Message
require Membrane.Core.Stalker, as: Stalker
require Membrane.Core.Telemetry, as: Telemetry
Expand Down Expand Up @@ -94,6 +95,12 @@ defmodule Membrane.Core.Element do

@impl GenServer
def init(options) do
Utils.log_on_error do
do_init(options)
end
end

defp do_init(options) do
Process.link(options.parent_supervisor)

observability_config = %{
Expand Down Expand Up @@ -155,50 +162,56 @@ defmodule Membrane.Core.Element do

@impl GenServer
def handle_continue(:setup, state) do
state = LifecycleController.handle_setup(state)
{:noreply, state}
Utils.log_on_error do
state = LifecycleController.handle_setup(state)
{:noreply, state}
end
end

@impl GenServer
def handle_call(Message.new(:get_clock), _from, state) do
def handle_call(request, from, state) do
Utils.log_on_error do
do_handle_call(request, from, state)
end
end

defp do_handle_call(Message.new(:get_clock), _from, state) do
{:reply, state.synchronization.clock, state}
end

@impl GenServer
def handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
defp do_handle_call(
Message.new(:handle_link, [direction, this, other, params]),
_from,
state
) do
{reply, state} = PadController.handle_link(direction, this, other, params, state)
{:reply, reply, state}
end

@impl GenServer
def handle_call(Message.new(:set_stream_sync, sync), _from, state) do
defp do_handle_call(Message.new(:set_stream_sync, sync), _from, state) do
state = put_in(state.synchronization.stream_sync, sync)
{:reply, :ok, state}
end

@impl GenServer
def handle_call(Message.new(:get_child_pid, _child_name), _from, state) do
defp do_handle_call(Message.new(:get_child_pid, _child_name), _from, state) do
{:reply, {:error, :element_cannot_have_children}, state}
end

@impl GenServer
def handle_call(message, {pid, _tag}, _state) do
defp do_handle_call(message, {pid, _tag}, _state) do
raise Membrane.ElementError,
"Received invalid message #{inspect(message)} from #{inspect(pid)}"
end

@impl GenServer
def handle_info(message, state) do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)
Utils.log_on_error do
Telemetry.report_metric(
:queue_len,
:erlang.process_info(self(), :message_queue_len) |> elem(1)
)

do_handle_info(message, state)
do_handle_info(message, state)
end
end

@compile {:inline, do_handle_info: 2}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

use GenServer

require Membrane.Core.Utils, as: Utils

@type t :: pid()

@spec start_link(any()) :: {:ok, t}
Expand All @@ -18,25 +20,33 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

@impl true
def handle_call({:add_get, atomic_ref, value}, _from, _state) do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.add_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:sub_get, atomic_ref, value}, _from, _state) do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.sub_get(atomic_ref, 1, value)
{:reply, result, nil}
end
end

@impl true
def handle_call({:get, atomic_ref}, _from, _state) do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
Utils.log_on_error do
result = :atomics.get(atomic_ref, 1)
{:reply, result, nil}
end
end

@impl true
def handle_cast({:put, atomic_ref, value}, _state) do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
Utils.log_on_error do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
end
end
end
Loading

0 comments on commit 5f53b95

Please sign in to comment.