Skip to content

Commit

Permalink
Add kafka config to Lightning.Config (#2335)
Browse files Browse the repository at this point in the history
* move kafka config to

---------

Co-authored-by: Stuart Corbishley <corbish@gmail.com>
  • Loading branch information
midigofrank and stuartc authored Jul 30, 2024
1 parent 5138287 commit e2d780d
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 101 deletions.
90 changes: 89 additions & 1 deletion lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ defmodule Lightning.Config do
[]
end
end

@impl true
def kafka_triggers_enabled? do
kafka_trigger_config() |> Keyword.get(:enabled, false)
end

@impl true
def kafka_duplicate_tracking_retention_seconds do
kafka_trigger_config()
|> Keyword.get(:duplicate_tracking_retention_seconds)
end

@impl true
def kafka_next_message_candidate_set_delay_milliseconds do
kafka_trigger_config()
|> Keyword.get(:next_message_candidate_set_delay_milliseconds)
end

@impl true
def kafka_no_message_candidate_set_delay_milliseconds do
kafka_trigger_config()
|> Keyword.get(:no_message_candidate_set_delay_milliseconds)
end

@impl true
def kafka_number_of_consumers do
kafka_trigger_config() |> Keyword.get(:number_of_consumers)
end

@impl true
def kafka_number_of_message_candidate_set_workers do
kafka_trigger_config()
|> Keyword.get(:number_of_message_candidate_set_workers)
end

@impl true
def kafka_number_of_messages_per_second do
kafka_trigger_config() |> Keyword.get(:number_of_messages_per_second)
end

@impl true
def kafka_number_of_processors do
kafka_trigger_config() |> Keyword.get(:number_of_processors)
end

defp kafka_trigger_config do
Application.get_env(:lightning, :kafka_triggers, [])
end
end

defmodule Utils do
Expand Down Expand Up @@ -176,12 +224,20 @@ defmodule Lightning.Config do
@callback get_extension_mod(key :: atom()) :: any()
@callback grace_period() :: integer()
@callback instance_admin_email() :: String.t()
@callback kafka_duplicate_tracking_retention_seconds() :: integer()
@callback kafka_next_message_candidate_set_delay_milliseconds() :: integer()
@callback kafka_no_message_candidate_set_delay_milliseconds() :: integer()
@callback kafka_number_of_consumers() :: integer()
@callback kafka_number_of_message_candidate_set_workers() :: integer()
@callback kafka_number_of_messages_per_second() :: float()
@callback kafka_number_of_processors() :: integer()
@callback kafka_triggers_enabled?() :: boolean()
@callback oauth_provider(key :: atom()) :: keyword() | nil
@callback purge_deleted_after_days() :: integer()
@callback repo_connection_token_signer() :: Joken.Signer.t()
@callback reset_password_token_validity_in_days() :: integer()
@callback run_token_signer() :: Joken.Signer.t()
@callback usage_tracking() :: Keyword.t()
@callback reset_password_token_validity_in_days() :: integer()
@callback usage_tracking_cron_opts() :: [Oban.Plugins.Cron.cron_input()]
@callback worker_secret() :: binary() | nil
@callback worker_token_signer() :: Joken.Signer.t()
Expand Down Expand Up @@ -272,6 +328,38 @@ defmodule Lightning.Config do
impl().usage_tracking_cron_opts()
end

def kafka_triggers_enabled? do
impl().kafka_triggers_enabled?()
end

def kafka_duplicate_tracking_retention_seconds do
impl().kafka_duplicate_tracking_retention_seconds()
end

def kafka_next_message_candidate_set_delay_milliseconds do
impl().kafka_next_message_candidate_set_delay_milliseconds()
end

def kafka_no_message_candidate_set_delay_milliseconds do
impl().kafka_no_message_candidate_set_delay_milliseconds()
end

def kafka_number_of_consumers do
impl().kafka_number_of_consumers()
end

def kafka_number_of_message_candidate_set_workers do
impl().kafka_number_of_message_candidate_set_workers()
end

def kafka_number_of_messages_per_second do
impl().kafka_number_of_messages_per_second()
end

def kafka_number_of_processors do
impl().kafka_number_of_processors()
end

defp impl do
Application.get_env(:lightning, __MODULE__, API)
end
Expand Down
12 changes: 3 additions & 9 deletions lib/lightning/kafka_triggers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,8 @@ defmodule Lightning.KafkaTriggers do

offset_reset_policy = determine_offset_reset_policy(trigger)

number_of_consumers =
Application.get_env(:lightning, :kafka_triggers)[:number_of_consumers]

number_of_processors =
Application.get_env(:lightning, :kafka_triggers)[:number_of_processors]
number_of_consumers = Lightning.Config.kafka_number_of_consumers()
number_of_processors = Lightning.Config.kafka_number_of_processors()

%{
id: trigger.id,
Expand Down Expand Up @@ -208,10 +205,7 @@ defmodule Lightning.KafkaTriggers do
end

def convert_rate_limit do
per_second =
Application.get_env(:lightning, :kafka_triggers)[
:number_of_messages_per_second
]
per_second = Lightning.Config.kafka_number_of_messages_per_second()

seconds_in_interval = 10

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker do
@moduledoc """
Repsonsible for cleaing up stale TriggerKafkaMessageRecords entries.
Responsible for cleaning up stale TriggerKafkaMessageRecords entries.
TriggerKafkaMessageRecords are used to deduplicate incoming messages from
a Kafka cluster.
"""
Expand All @@ -16,10 +16,7 @@ defmodule Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker do
@impl Oban.Worker
def perform(_opts) do
retention_period =
Application.get_env(
:lightning,
:kafka_triggers
)[:duplicate_tracking_retention_seconds]
Lightning.Config.kafka_duplicate_tracking_retention_seconds()

threshold_time = DateTime.utc_now() |> DateTime.add(-retention_period)

Expand Down
39 changes: 24 additions & 15 deletions lib/lightning/kafka_triggers/message_candidate_set_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Lightning.KafkaTriggers.MessageCandidateSetSupervisor do
@moduledoc """
Starts the server and worker processes responsible for converting messages
received from Kafka clusters. There are two sets of workers and servers. This
is to accommodate messages that have keys (grouped into MessageCandidateSets)
ans those that do not, which are processed individually.
is to accommodate messages that have keys (grouped into MessageCandidateSets)
and those that do not, which are processed individually.
"""
use Supervisor

Expand All @@ -19,17 +19,36 @@ defmodule Lightning.KafkaTriggers.MessageCandidateSetSupervisor do
@impl true
def init(opts) do
number_of_workers = Keyword.get(opts, :number_of_workers, 1)
# TODO: move the config upwards (to the supervisor), and pass in the exact
# config values needed by the children
config = Keyword.get(opts, :config, Lightning.Config)

child_opts = [
number_of_workers: number_of_workers,
no_message_candidate_set_delay_milliseconds:
config.kafka_no_message_candidate_set_delay_milliseconds(),
next_message_candidate_set_delay_milliseconds:
config.kafka_next_message_candidate_set_delay_milliseconds()
]

mcs_children =
generate_child_specs(MessageCandidateSetServer, number_of_workers)
generate_child_specs(MessageCandidateSetServer, child_opts)

message_children =
generate_child_specs(MessageServer, number_of_workers)
generate_child_specs(MessageServer, child_opts)

Supervisor.init(mcs_children ++ message_children, strategy: :one_for_one)
end

def generate_child_specs(server, number_of_workers) do
def generate_child_specs(server, opts) do
no_set_delay =
opts |> Keyword.fetch!(:no_message_candidate_set_delay_milliseconds)

next_set_delay =
opts |> Keyword.fetch!(:next_message_candidate_set_delay_milliseconds)

number_of_workers = opts |> Keyword.fetch!(:number_of_workers)

{worker, id_prefix} =
case server do
MessageCandidateSetServer ->
Expand All @@ -39,16 +58,6 @@ defmodule Lightning.KafkaTriggers.MessageCandidateSetSupervisor do
{MessageWorker, "message_worker"}
end

no_set_delay =
Application.get_env(:lightning, :kafka_triggers)[
:no_message_candidate_set_delay_milliseconds
]

next_set_delay =
Application.get_env(:lightning, :kafka_triggers)[
:next_message_candidate_set_delay_milliseconds
]

workers =
0..(number_of_workers - 1)
|> Enum.map(fn index ->
Expand Down
6 changes: 2 additions & 4 deletions lib/lightning/kafka_triggers/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ defmodule Lightning.KafkaTriggers.Supervisor do

@impl true
def init(_init_arg) do
enabled = Application.get_env(:lightning, :kafka_triggers)[:enabled]
enabled = Lightning.Config.kafka_triggers_enabled?()

number_of_workers =
Application.get_env(:lightning, :kafka_triggers)[
:number_of_message_candidate_set_workers
]
Lightning.Config.kafka_number_of_message_candidate_set_workers()

children =
if enabled do
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning_web/live/workflow_live/components.ex
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ defmodule LightningWeb.WorkflowLive.Components do
label="Trigger type"
class=""
options={
if Application.get_env(:lightning, :kafka_triggers)[:enabled] do
if Lightning.Config.kafka_triggers_enabled?() do
[
"Cron Schedule (UTC)": "cron",
"Kafka Consumer": "kafka",
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Lightning.MixProject do
{:mimic, "~> 1.7.2", only: :test},
{:mix_test_watch, "~> 1.0", only: [:test, :dev], runtime: false},
{:mock, "~> 0.3.8", only: :test},
{:mox, "~> 1.0.2", only: :test},
{:mox, "~> 1.1.0", only: :test},
{:oauth2, "~> 2.1"},
{:oban, "~> 2.13"},
{:opentelemetry_exporter, "~> 1.6.0"},
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"cloak_ecto": {:hex, :cloak_ecto, "1.3.0", "0de127c857d7452ba3c3367f53fb814b0410ff9c680a8d20fbe8b9a3c57a1118", [:mix], [{:cloak, "~> 1.1.1", [hex: :cloak, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "314beb0c123b8a800418ca1d51065b27ba3b15f085977e65c0f7b2adab2de1cc"},
"combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"},
"comeonin": {:hex, :comeonin, "5.4.0", "246a56ca3f41d404380fc6465650ddaa532c7f98be4bda1b4656b3a37cc13abe", [:mix], [], "hexpm", "796393a9e50d01999d56b7b8420ab0481a7538d0caf80919da493b4a6e51faf1"},
"cors_plug": {:hex, :cors_plug, "3.0.3", "7c3ac52b39624bc616db2e937c282f3f623f25f8d550068b6710e58d04a0e330", [:mix], [{:plug, "~> 1.13", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "3f2d759e8c272ed3835fab2ef11b46bddab8c1ab9528167bd463b6452edf830d"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"cors_plug": {:hex, :cors_plug, "3.0.3", "7c3ac52b39624bc616db2e937c282f3f623f25f8d550068b6710e58d04a0e330", [:mix], [{:plug, "~> 1.13", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "3f2d759e8c272ed3835fab2ef11b46bddab8c1ab9528167bd463b6452edf830d"},
"cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
Expand Down Expand Up @@ -84,7 +84,7 @@
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"},
"mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"},
"mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ defmodule Lightning.KafkaTriggers.DuplicateTrackingCleanupWorkerTest do

setup do
retention_period =
Application.get_env(
:lightning,
:kafka_triggers
)[:duplicate_tracking_retention_seconds]
Lightning.Config.kafka_duplicate_tracking_retention_seconds()

now = DateTime.utc_now()
retain_offset = -retention_period + 2
Expand Down
Loading

0 comments on commit e2d780d

Please sign in to comment.