Skip to content

Commit

Permalink
Merge pull request #377 from rbino/fix-375
Browse files Browse the repository at this point in the history
data_updater: serialize MessageTracker and DataUpdater.Server starts
  • Loading branch information
bettio authored May 14, 2020
2 parents cd7cbce + 216a4ba commit 6de679d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 13 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.11.1] - Unreleased
### Added
- [data_updater_plant] Add `DATA_UPDATER_PLANT_AMQP_DATA_QUEUE_TOTAL_COUNT` environment variable,
this must be equal to the total number of queues in the Astarte instance.

### Fixed
- Wait for schema_version agreement before applying any schema change (such as creating tables or a
new realm). (see [#312](https://github.com/astarte-platform/astarte/issues/312).
- [appengine_api] Fix the metric counting discarded channel events, it was not correctly increased.
- [data_update_plant] Validate UTF8 strings coming from the broker (i.e. interface and path) to
avoid passing invalid strings to the database.
- [data_updater_plant] Fix a bug that was sometimes stalling a data updater queue process (see
[#375](https://github.com/astarte-platform/astarte/issues/375).

## [0.11.0] - 2020-04-13
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ See the moduledoc for `Conform.Schema.Validator` for more details and examples.
hidden: false,
to: "astarte_data_updater_plant.data_queue_range_start"
],
data_queue_range_start: [
commented: true,
datatype: :integer,
required: false,
default: 1,
env_var: "DATA_UPDATER_PLANT_AMQP_DATA_QUEUE_TOTAL_COUNT",
doc:
"Returns the total number of data queues in the whole Astarte cluster. This should have the same value of DATA_QUEUE_COUNT in the VerneMQ plugin",
hidden: false,
to: "astarte_data_updater_plant.data_queue_total_count"
],
data_queue_range_end: [
commented: true,
datatype: :integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
# API

def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args)
index = Keyword.fetch!(args, :queue_index)
GenServer.start_link(__MODULE__, args, name: get_queue_via_tuple(index))
end

def ack(pid, delivery_tag) do
Expand All @@ -57,6 +58,39 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
GenServer.call(pid, {:requeue, delivery_tag})
end

def start_message_tracker(realm, encoded_device_id) do
with {:ok, via_tuple} <- fetch_queue_via_tuple(realm, encoded_device_id) do
GenServer.call(via_tuple, {:start_message_tracker, realm, encoded_device_id})
end
end

def start_data_updater(realm, encoded_device_id, message_tracker) do
with {:ok, via_tuple} <- fetch_queue_via_tuple(realm, encoded_device_id) do
GenServer.call(via_tuple, {:start_data_updater, realm, encoded_device_id, message_tracker})
end
end

defp get_queue_via_tuple(queue_index) when is_integer(queue_index) do
{:via, Registry, {Registry.AMQPDataConsumer, {:queue_index, queue_index}}}
end

defp fetch_queue_via_tuple(realm, encoded_device_id)
when is_binary(realm) and is_binary(encoded_device_id) do
# This is the same sharding algorithm used in astarte_vmq_plugin
# Make sure they stay in sync
queue_index =
{realm, encoded_device_id}
|> :erlang.phash2(Config.data_queue_total_count())

if queue_index >= Config.data_queue_range_start() and
queue_index <= Config.data_queue_range_end() do
{:ok, get_queue_via_tuple(queue_index)}
else
# This device is handled by a differente DUP instance
{:error, :unhandled_device}
end
end

# Server callbacks

def init(args) do
Expand Down Expand Up @@ -90,6 +124,16 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
{:reply, res, chan}
end

def handle_call({:start_message_tracker, realm, device_id}, _from, chan) do
res = DataUpdater.get_message_tracker(realm, device_id)
{:reply, res, chan}
end

def handle_call({:start_data_updater, realm, device_id, message_tracker}, _from, chan) do
res = DataUpdater.get_data_updater_process(realm, device_id, message_tracker)
{:reply, res, chan}
end

# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
{:noreply, chan}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer.Supervisor do

for queue_index <- queue_range_start..queue_range_end do
queue_name = "#{queue_prefix}#{queue_index}"
args = [queue_name: queue_name]
args = [queue_name: queue_name, queue_index: queue_index]
Supervisor.child_spec({AMQPDataConsumer, args}, id: {AMQPDataConsumer, queue_index})
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ defmodule Astarte.DataUpdaterPlant.Config do
Application.get_env(:astarte_data_updater_plant, :data_queue_range_end, 0)
end

@doc """
Returns the total number of data queues in all the Astarte cluster. Defaults to 1.
"""
def data_queue_total_count do
Application.get_env(:astarte_data_updater_plant, :data_queue_total_count, 1)
end

@doc """
Returns the AMQP consumer prefetch count for the consumer. Defaults to 300.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do
Logger.info("AMQPDataConsumer supervisor init.", tag: "data_consumer_sup_init")

children = [
{Registry, [keys: :unique, name: Registry.AMQPDataConsumer]},
{AMQPDataConsumer.ConnectionManager, amqp_opts: Config.amqp_consumer_options()},
AMQPDataConsumer.Supervisor
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

defmodule Astarte.DataUpdaterPlant.DataUpdater do
alias Astarte.Core.Device
alias Astarte.DataUpdaterPlant.AMQPDataConsumer
alias Astarte.DataUpdaterPlant.DataUpdater.Server
alias Astarte.DataUpdaterPlant.MessageTracker
require Logger
Expand Down Expand Up @@ -104,19 +105,19 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
simple_trigger,
trigger_target
) do
message_tracker = get_message_tracker(realm, encoded_device_id)
message_tracker = get_message_tracker(realm, encoded_device_id, offload_start: true)

get_data_updater_process(realm, encoded_device_id, message_tracker)
get_data_updater_process(realm, encoded_device_id, message_tracker, offload_start: true)
|> GenServer.call(
{:handle_install_volatile_trigger, object_id, object_type, parent_id, trigger_id,
simple_trigger, trigger_target}
)
end

def handle_delete_volatile_trigger(realm, encoded_device_id, trigger_id) do
message_tracker = get_message_tracker(realm, encoded_device_id)
message_tracker = get_message_tracker(realm, encoded_device_id, offload_start: true)

get_data_updater_process(realm, encoded_device_id, message_tracker)
get_data_updater_process(realm, encoded_device_id, message_tracker, offload_start: true)
|> GenServer.call({:handle_delete_volatile_trigger, trigger_id})
end

Expand All @@ -127,13 +128,19 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
|> GenServer.call({:dump_state})
end

defp get_data_updater_process(realm, encoded_device_id, message_tracker) do
def get_data_updater_process(realm, encoded_device_id, message_tracker, opts \\ []) do
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id) do
case Registry.lookup(Registry.DataUpdater, {realm, device_id}) do
[] ->
name = {:via, Registry, {Registry.DataUpdater, {realm, device_id}}}
{:ok, pid} = Server.start(realm, device_id, message_tracker, name: name)
pid
if Keyword.get(opts, :offload_start) do
# We pass through AMQPDataConsumer to start the process to make sure that
# that start is serialized
AMQPDataConsumer.start_data_updater(realm, encoded_device_id, message_tracker)
else
name = {:via, Registry, {Registry.DataUpdater, {realm, device_id}}}
{:ok, pid} = Server.start(realm, device_id, message_tracker, name: name)
pid
end

[{pid, nil}] ->
pid
Expand All @@ -149,14 +156,20 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
end
end

defp get_message_tracker(realm, encoded_device_id) do
def get_message_tracker(realm, encoded_device_id, opts \\ []) do
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id) do
device = {realm, device_id}

case Registry.lookup(Registry.MessageTracker, device) do
[] ->
acknowledger = self()
spawn_message_tracker(acknowledger, device)
if Keyword.get(opts, :offload_start) do
# We pass through AMQPDataConsumer to start the process to make sure that
# that start is serialized and acknowledger is the right process
AMQPDataConsumer.start_message_tracker(realm, encoded_device_id)
else
acknowledger = self()
spawn_message_tracker(acknowledger, device)
end

[{pid, nil}] ->
pid
Expand Down

0 comments on commit 6de679d

Please sign in to comment.