Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Hammer for rate limiting with custom ets bucket #3571

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,6 @@ config :ref_inspector,
config :ua_inspector,
init: {Plausible.Release, :configure_ua_inspector}

config :hammer,
backend: {Hammer.Backend.ETS, [expiry_ms: 60_000 * 60 * 4, cleanup_interval_ms: 60_000 * 10]}

if config_env() in [:dev, :staging, :prod] do
config :kaffy,
otp_app: :plausible,
Expand Down
1 change: 1 addition & 0 deletions lib/plausible/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Plausible.Application do
Plausible.IngestRepo,
Plausible.AsyncInsertRepo,
Plausible.ImportDeletionRepo,
{Plausible.RateLimit, clean_period: :timer.minutes(10)},
Plausible.Ingestion.Counters,
{Finch, name: Plausible.Finch, pools: finch_pool_config()},
{Phoenix.PubSub, name: Plausible.PubSub},
Expand Down
82 changes: 82 additions & 0 deletions lib/plausible/rate_limit.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Plausible.RateLimit do
@moduledoc """
Thin wrapper around `:ets.update_counter/4` and a
clean-up process to act as a rate limiter.
"""

use GenServer

@doc """
Starts the process that creates and cleans the ETS table.

Accepts the following options:
- `GenServer.options()`
- `:table` for the ETS table name, defaults to `#{__MODULE__}`
- `:clean_period` for how often to perform garbage collection
"""
@spec start_link([GenServer.option() | {:table, atom} | {:clean_period, pos_integer}]) ::
GenServer.on_start()
def start_link(opts) do
{gen_opts, opts} =
Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])

GenServer.start_link(__MODULE__, opts, gen_opts)
end

@doc """
Checks the rate-limit for a key.
"""
@spec check_rate(:ets.table(), key, scale, limit, increment) :: {:allow, count} | {:deny, limit}
when key: term,
scale: pos_integer,
limit: pos_integer,
increment: pos_integer,
count: pos_integer
def check_rate(table \\ __MODULE__, key, scale, limit, increment \\ 1) do
bucket = div(now(), scale)
full_key = {key, bucket}
expires_at = (bucket + 1) * scale
count = :ets.update_counter(table, full_key, increment, {full_key, 0, expires_at})
if count <= limit, do: {:allow, count}, else: {:deny, limit}
end

@impl true
def init(opts) do
clean_period = Keyword.fetch!(opts, :clean_period)
table = Keyword.get(opts, :table, __MODULE__)

^table =
:ets.new(table, [
:named_table,
:set,
:public,
{:read_concurrency, true},
{:write_concurrency, true},
{:decentralized_counters, true}
])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at how Hammer's table is configured:

[
  id: #Reference<0.563007245.2435186714.162753>,
  decentralized_counters: false,
  read_concurrency: false,
  write_concurrency: false,
  compressed: false,
  memory: 7351569,
  owner: #PID<0.3706.0>,
  heir: :none,
  name: :hammer_ets_buckets,
  size: 389610,
  node: :"plausible@plausible-app-01",
  named_table: true,
  type: :ordered_set,
  keypos: 1,
  protection: :public
]

I'm sure you've benchmarked the hell out of it, what was the difference between using set and ordered_set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only benchmarked different implementations https://github.com/ruslandoga/rate_limit, and I haven't benchmarked set vs ordered-set directly. I'll do a benchmark now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ env MIX_ENV=bench mix run bench/basic.exs
Operating System: macOS
CPU Information: Apple M1
Number of Available Cores: 8
Available memory: 8 GB
Elixir 1.15.7
Erlang 26.1.2

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
reduction time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 14 s

Benchmarking ordered_set ...
Benchmarking set ...

Name                  ips        average  deviation         median         99th %
set                5.20 M      192.26 ns ±15809.05%         166 ns         209 ns
ordered_set        3.73 M      268.42 ns ±12499.97%         209 ns         292 ns

Comparison: 
set                5.20 M
ordered_set        3.73 M - 1.40x slower +76.16 ns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


schedule(clean_period)
{:ok, %{table: table, clean_period: clean_period}}
end

@impl true
def handle_info(:clean, state) do
clean(state.table)
schedule(state.clean_period)
{:noreply, state}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to hibernate the process, since all it's doing is waiting another 10 minutes for the next cleanup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't used process hibernation before. I'll read up on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a quick look, process hibernation forces a garbage collection (we have opts as garbage) and compacts memory usage. I think it wouldn't hurt, but at the same time the gains don't seem significant either (cleaning up opts and compacting a two-element map).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buckle up, we're going live :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😵

end

defp schedule(clean_period) do
Process.send_after(self(), :clean, clean_period)
end

defp clean(table) do
ms = [{{{:_, :_}, :_, :"$1"}, [], [{:<, :"$1", {:const, now()}}]}]
:ets.select_delete(table, ms)
end

@compile inline: [now: 0]
defp now do
System.system_time(:millisecond)
end
end
20 changes: 5 additions & 15 deletions lib/plausible/site/gate_keeper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Plausible.Site.GateKeeper do
@type t() :: {:allow, Plausible.Site.t()} | {:deny, policy()}

@moduledoc """
Thin wrapper around Hammer for gate keeping domain-specific events
Thin wrapper around `Plausible.RateLimit` for gate keeping domain-specific events
during the ingestion phase. When the site is allowed, gate keeping
check returns `:allow`, otherwise a `:deny` tagged tuple is returned
with one of the following policy markers:
Expand All @@ -24,7 +24,7 @@ defmodule Plausible.Site.GateKeeper do
* when the underlying rate limiting mechanism returns
an internal error: :allow
"""
alias Plausible.Site
alias Plausible.{Site, RateLimit}
alias Plausible.Site.Cache

require Logger
Expand Down Expand Up @@ -68,19 +68,9 @@ defmodule Plausible.Site.GateKeeper do
key = Keyword.get(opts, :key, key(site.domain))
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000

case Hammer.check_rate(key, scale_ms, threshold) do
{:deny, _} ->
:throttle

{:allow, _} ->
{:allow, site}

{:error, reason} ->
Logger.error(
"Error checking rate limit for '#{key}': #{inspect(reason)}. Falling back to: :allow"
)

{:allow, site}
case RateLimit.check_rate(key, scale_ms, threshold) do
{:deny, _} -> :throttle
{:allow, _} -> {:allow, site}
end
end
end
6 changes: 3 additions & 3 deletions lib/plausible_web/controllers/auth_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule PlausibleWeb.AuthController do
use PlausibleWeb, :controller
use Plausible.Repo

alias Plausible.Auth
alias Plausible.{Auth, RateLimit}
alias Plausible.Billing.Quota
alias PlausibleWeb.TwoFactor

Expand Down Expand Up @@ -304,14 +304,14 @@ defmodule PlausibleWeb.AuthController do
defp check_ip_rate_limit(conn) do
ip_address = PlausibleWeb.RemoteIp.get(conn)

case Hammer.check_rate("login:ip:#{ip_address}", @login_interval, @login_limit) do
case RateLimit.check_rate("login:ip:#{ip_address}", @login_interval, @login_limit) do
{:allow, _} -> :ok
{:deny, _} -> {:rate_limit, :ip_address}
end
end

defp check_user_rate_limit(user) do
case Hammer.check_rate("login:user:#{user.id}", @login_interval, @login_limit) do
case RateLimit.check_rate("login:user:#{user.id}", @login_interval, @login_limit) do
{:allow, _} -> :ok
{:deny, _} -> {:rate_limit, :user}
end
Expand Down
7 changes: 6 additions & 1 deletion lib/plausible_web/plugs/authorize_stats_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule PlausibleWeb.AuthorizeStatsApiPlug do
use Plausible.Repo
alias Plausible.Auth
alias Plausible.Sites
alias Plausible.RateLimit
alias PlausibleWeb.Api.Helpers, as: H

def init(options) do
Expand Down Expand Up @@ -101,7 +102,11 @@ defmodule PlausibleWeb.AuthorizeStatsApiPlug do

@one_hour 60 * 60 * 1000
defp check_api_key_rate_limit(api_key) do
case Hammer.check_rate("api_request:#{api_key.id}", @one_hour, api_key.hourly_request_limit) do
case RateLimit.check_rate(
"api_request:#{api_key.id}",
@one_hour,
api_key.hourly_request_limit
) do
{:allow, _} -> :ok
{:deny, _} -> {:error, :rate_limit, api_key.hourly_request_limit}
end
Expand Down
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ defmodule Plausible.MixProject do
{:locus, "~> 2.3"},
{:gen_cycle, "~> 1.0.4"},
{:hackney, "~> 1.8"},
{:hammer, "~> 6.0"},
{:httpoison, "~> 1.4"},
{:jason, "~> 1.3"},
{:kaffy, "~> 0.10.2", only: [:dev, :test, :staging, :prod]},
Expand Down
2 changes: 0 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"gproc": {:hex, :gproc, "0.8.0", "cea02c578589c61e5341fce149ea36ccef236cc2ecac8691fba408e7ea77ec2f", [:rebar3], [], "hexpm", "580adafa56463b75263ef5a5df4c86af321f68694e7786cb057fd805d1e2a7de"},
"grpcbox": {:hex, :grpcbox, "0.16.0", "b83f37c62d6eeca347b77f9b1ec7e9f62231690cdfeb3a31be07cd4002ba9c82", [:rebar3], [{:acceptor_pool, "~>1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~>0.13.0", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~>0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~>0.8.0", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "294df743ae20a7e030889f00644001370a4f7ce0121f3bbdaf13cf3169c62913"},
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"hammer": {:hex, :hammer, "6.1.0", "f263e3c3e9946bd410ea0336b2abe0cb6260af4afb3a221e1027540706e76c55", [:make, :mix], [{:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "b47e415a562a6d072392deabcd58090d8a41182cf9044cdd6b0d0faaaf68ba57"},
"heroicons": {:hex, :heroicons, "0.5.3", "ee8ae8335303df3b18f2cc07f46e1cb6e761ba4cf2c901623fbe9a28c0bc51dd", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "a210037e8a09ac17e2a0a0779d729e89c821c944434c3baa7edfc1f5b32f3502"},
"hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm", "06f580167c4b8b8a6429040df36cc93bba6d571faeaec1b28816523379cbb23a"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
Expand Down Expand Up @@ -118,7 +117,6 @@
"plug": {:hex, :plug, "1.14.2", "cff7d4ec45b4ae176a227acd94a7ab536d9b37b942c8e8fa6dfc0fff98ff4d80", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "842fc50187e13cf4ac3b253d47d9474ed6c296a8732752835ce4a86acdf68d13"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"},
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"postgrex": {:hex, :postgrex, "0.17.1", "01c29fd1205940ee55f7addb8f1dc25618ca63a8817e56fac4f6846fc2cddcbe", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "14b057b488e73be2beee508fb1955d8db90d6485c6466428fe9ccf1d6692a555"},
"prom_ex": {:hex, :prom_ex, "1.9.0", "63e6dda6c05cdeec1f26c48443dcc38ffd2118b3665ae8d2bd0e5b79f2aea03e", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "01f3d4f69ec93068219e686cc65e58a29c42bea5429a8ff4e2121f19db178ee6"},
"public_suffix": {:git, "https://github.com/axelson/publicsuffix-elixir", "fa40c243d4b5d8598b90cff268bc4e33f3bb63f1", []},
Expand Down
67 changes: 67 additions & 0 deletions test/plausible/rate_limit_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Plausible.RateLimitTest do
use ExUnit.Case, async: true
alias Plausible.RateLimit

@table __MODULE__

defp key, do: "key:#{System.unique_integer([:positive])}"

@tag :slow
test "garbage collection" do
start_supervised!({RateLimit, clean_period: _100_ms = 100, table: @table})

key = key()
scale = _50_ms = 50
limit = 10

for _ <- 1..3 do
assert {:allow, 1} = RateLimit.check_rate(@table, key, scale, limit)
assert [{{^key, _bucket}, _count = 1, expires_at}] = :ets.tab2list(@table)

assert expires_at > System.system_time(:millisecond)
assert expires_at <= System.system_time(:millisecond) + 50

:timer.sleep(150)

assert :ets.tab2list(@table) == []
end
end

describe "check_rate/3" do
setup do
start_supervised!({RateLimit, clean_period: :timer.minutes(1), table: @table})
:ok
end

test "increments" do
key = key()
scale = :timer.seconds(10)
limit = 10

assert {:allow, 1} = RateLimit.check_rate(@table, key, scale, limit)
assert {:allow, 2} = RateLimit.check_rate(@table, key, scale, limit)
assert {:allow, 3} = RateLimit.check_rate(@table, key, scale, limit)
end

test "resets" do
key = key()
scale = 10
limit = 10

assert {:allow, 1} = RateLimit.check_rate(@table, key, scale, limit)
:timer.sleep(scale * 2 + 1)
assert {:allow, 1} = RateLimit.check_rate(@table, key, scale, limit)
end

test "denies" do
key = key()
scale = :timer.seconds(10)
limit = 3

assert {:allow, 1} = RateLimit.check_rate(@table, key, scale, limit)
assert {:allow, 2} = RateLimit.check_rate(@table, key, scale, limit)
assert {:allow, 3} = RateLimit.check_rate(@table, key, scale, limit)
assert {:deny, 3} = RateLimit.check_rate(@table, key, scale, limit)
end
end
end
51 changes: 0 additions & 51 deletions test/plausible/site/gate_keeper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ defmodule Plausible.Site.GateKeeperTest do
alias Plausible.Site.Cache
alias Plausible.Site.GateKeeper

import ExUnit.CaptureLog

setup %{test: test} do
{:ok, _} = start_test_cache(test)
opts = [cache_opts: [cache_name: test, force?: true]]
Expand Down Expand Up @@ -87,55 +85,6 @@ defmodule Plausible.Site.GateKeeperTest do
assert {:deny, :not_found} = GateKeeper.check(domain, opts)
end

test "rate limiter policy switches to allow when RL backend errors bubble-up", %{
test: test,
opts: opts
} do
domain = "causingerrors.example.com"

site =
add_site_and_refresh_cache(test,
domain: domain,
ingest_rate_limit_threshold: 1,
ingest_rate_limit_scale_seconds: 600
)

site_id = site.id

assert {:allow, %Plausible.Site{id: ^site_id, from_cache?: true}} =
GateKeeper.check(domain, opts)

assert {:deny, :throttle} = GateKeeper.check(domain, opts)

{:ok, :broken} = break_hammer(site)

log =
capture_log(fn ->
assert {:allow, %Plausible.Site{id: ^site_id, from_cache?: true}} =
GateKeeper.check(domain, opts)
end)

assert log =~ "Error checking rate limit for 'ingest:site:causingerrors.example.com'"
assert log =~ "Falling back to: :allow"
end

# We need a way to force Hammer to error-out on Hammer.check_rate/3.
# This is tricky because we don't configure multiple backends,
# so the easiest (and naive) way to do it, without mocking, is to
# plant a hand-crafted ets entry that makes it throw an exception
# when it gets to it. This should not affect any shared state tests
# because the rogue entry is only stored for a specific key.
# The drawback of doing this, the test will break if we
# ever change the underlying Rate Limiting backend/implementation.
defp break_hammer(site) do
scale_ms = site.ingest_rate_limit_scale_seconds * 1_000
rogue_key = site.domain
our_key = GateKeeper.key(rogue_key)
{_, key} = Hammer.Utils.stamp_key(our_key, scale_ms)
true = :ets.insert(:hammer_ets_buckets, {key, 1, "TOTALLY-WRONG", "ABSOLUTELY-BREAKING"})
{:ok, :broken}
end

defp start_test_cache(cache_name) do
%{start: {m, f, a}} = Cache.child_spec(cache_name: cache_name)
apply(m, f, a)
Expand Down