Skip to content

Commit

Permalink
Merge branch 'master' of github.com:stueccles/analytics-elixir
Browse files Browse the repository at this point in the history
  • Loading branch information
Stuart Eccles committed Apr 22, 2020
2 parents 5b1d343 + 7e11778 commit f7c21d3
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ segment-*.tar
.DS_Store
.formatter.exs
.vscode/

.elixir_ls/
14 changes: 9 additions & 5 deletions lib/segment.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,29 @@ defmodule Segment do
| Segment.Analytics.Alias.t()
| Segment.Analytics.Group.t()
| Segment.Analytics.Page.t()
require Logger
@service Application.get_env(:segment, :sender_impl, Segment.Analytics.Batcher)

@doc """
Start the configured GenServer for handling Segment events with the Segment HTTP Source API Write Key
By default if nothing is configured it will start `Segment.Analytics.Batcher`
"""
@spec start_link(String.t()) :: GenServer.on_start()
defdelegate start_link(api_key), to: @service
def start_link(api_key) do
Segment.Config.service().start_link(api_key)
end

@doc """
Start the configured GenServer for handling Segment events with the Segment HTTP Source API Write Key and a custom Tesla Adapter.
By default if nothing is configured it will start `Segment.Analytics.Batcher`
"""
@spec start_link(String.t(), Telsa.adapter()) :: GenServer.on_start()
defdelegate start_link(api_key, adapter), to: @service
def start_link(api_key, adapter) do
Segment.Config.service().start_link(api_key, adapter)
end

@spec child_spec(map()) :: map()
defdelegate child_spec(opts), to: @service
def child_spec(opts) do
Segment.Config.service().child_spec(opts)
end
end
6 changes: 3 additions & 3 deletions lib/segment/analytics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ defmodule Segment.Analytics do

@type segment_id :: String.t() | integer()

@service Application.get_env(:segment, :sender_impl, Segment.Analytics.Batcher)

@doc """
Make a call to Segment with an event. Should be of type `Track, Identify, Screen, Alias, Group or Page`
"""
Expand Down Expand Up @@ -165,5 +163,7 @@ defmodule Segment.Analytics do
end

@spec call(Segment.segment_event()) :: :ok
defdelegate call(event), to: @service
def call(event) do
Segment.Config.service().call(event)
end
end
20 changes: 10 additions & 10 deletions lib/segment/batcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ defmodule Segment.Analytics.Batcher do
use GenServer
alias Segment.Analytics.{Track, Identify, Screen, Alias, Group, Page}

@max_batch_size Application.get_env(:segment, :max_batch_size, 100)
@batch_every_ms Application.get_env(:segment, :batch_every_ms, 2000)

@doc """
Start the `Segment.Analytics.Batcher` GenServer with an Segment HTTP Source API Write Key
"""
Expand Down Expand Up @@ -96,7 +93,7 @@ defmodule Segment.Analytics.Batcher do

# Helpers
defp schedule_batch_send do
Process.send_after(self(), :process_batch, @batch_every_ms)
Process.send_after(self(), :process_batch, Segment.Config.batch_every_ms())
end

defp enqueue(event) do
Expand All @@ -106,13 +103,16 @@ defmodule Segment.Analytics.Batcher do
defp extract_batch(queue, 0),
do: {[], queue}

defp extract_batch(queue, length) when length >= @max_batch_size do
:queue.split(@max_batch_size, queue)
|> split_result()
end
defp extract_batch(queue, length) do
max_batch_size = Segment.Config.max_batch_size()

defp extract_batch(queue, length),
do: :queue.split(length, queue) |> split_result()
if length >= max_batch_size do
:queue.split(max_batch_size, queue)
|> split_result()
else
:queue.split(length, queue) |> split_result()
end
end

defp split_result({q1, q2}), do: {:queue.to_list(q1), q2}
end
26 changes: 14 additions & 12 deletions lib/segment/client/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ defmodule Segment.Http do
use Retry

@segment_api_url "https://api.segment.io/v1/"
@send_to_http Application.get_env(:segment, :send_to_http, true)
@retry_attempts Application.get_env(:segment, :retry_attempts, 3)
@retry_expiry Application.get_env(:segment, :retry_expiry, 10_000)
@retry_start Application.get_env(:segment, :retry_start, 100)

@doc """
Create a Tesla client with the Segment Source Write API Key
"""
@spec client(String.t()) :: client()
def client(api_key) do
adapter =
case @send_to_http do
case Segment.Config.send_to_http() do
true ->
Application.get_env(:segment, :tesla)[:adapter] ||
{Tesla.Adapter.Hackney, [recv_timeout: 30_000]}
Expand Down Expand Up @@ -89,7 +85,7 @@ defmodule Segment.Http do
"""
@spec send(String.t(), Segment.segment_event()) :: :ok | :error
def send(client, event) do
case make_request(client, event.type, prepare_events(event), @retry_attempts) do
case make_request(client, event.type, prepare_events(event), Segment.Config.retry_attempts()) do
{:ok, %{status: status}} when status == 200 ->
:ok

Expand All @@ -98,7 +94,10 @@ defmodule Segment.Http do
:error

{:error, err} ->
Logger.error("[Segment] Call Failed after #{@retry_attempts} retries. #{inspect(err)}")
Logger.error(
"[Segment] Call Failed after #{Segment.Config.retry_attempts()} retries. #{inspect(err)}"
)

:error

err ->
Expand All @@ -120,7 +119,7 @@ defmodule Segment.Http do
|> add_if(:context, context)
|> add_if(:integrations, integrations)

case make_request(client, "batch", data, @retry_attempts) do
case make_request(client, "batch", data, Segment.Config.retry_attempts()) do
{:ok, %{status: status}} when status == 200 ->
:ok

Expand All @@ -133,9 +132,9 @@ defmodule Segment.Http do

{:error, err} ->
Logger.error(
"[Segment] Batch call of #{length(events)} events failed after #{@retry_attempts} retries. #{
inspect(err)
}"
"[Segment] Batch call of #{length(events)} events failed after #{
Segment.Config.retry_attempts()
} retries. #{inspect(err)}"
)

:error
Expand All @@ -147,7 +146,10 @@ defmodule Segment.Http do
end

defp make_request(client, url, data, retries) when retries > 0 do
retry with: linear_backoff(@retry_start, 2) |> cap(@retry_expiry) |> Stream.take(retries) do
retry with:
linear_backoff(Segment.Config.retry_start(), 2)
|> cap(Segment.Config.retry_expiry())
|> Stream.take(retries) do
Tesla.post(client, url, data)
after
result -> result
Expand Down
31 changes: 31 additions & 0 deletions lib/segment/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Segment.Config do
@moduledoc false

def service do
Application.get_env(:segment, :sender_impl, Segment.Analytics.Batcher)
end

def max_batch_size() do
Application.get_env(:segment, :max_batch_size, 100)
end

def batch_every_ms() do
Application.get_env(:segment, :batch_every_ms, 2000)
end

def send_to_http() do
Application.get_env(:segment, :send_to_http, true)
end

def retry_attempts() do
Application.get_env(:segment, :retry_attempts, 3)
end

def retry_expiry() do
Application.get_env(:segment, :retry_expiry, 10_000)
end

def retry_start() do
Application.get_env(:segment, :retry_start, 100)
end
end

0 comments on commit f7c21d3

Please sign in to comment.