Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new option consume jobs #11

Merged
merged 7 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions .github/workflows/elixir.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
name: CI Pipeline

on:
push:
branches: ["main"]
pull_request:
push:
branches: ["main"]

env:
Expand All @@ -16,46 +15,52 @@ permissions:
jobs:
build:
name: Build and test
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
fail-fast: true
fail-fast: false
matrix:
otp: ["24.3", "25.1", "26.1"]
elixir: ["1.14.5", "1.15.6"]
include:
- elixir: "1.14"
otp: "23.0"

# Latest versions.
- elixir: "1.17"
otp: "27.0"
lint: lint
coverage: coverage
steps:
- uses: actions/checkout@v3
- name: Set up Elixir
- name: Check out this repository
uses: actions/checkout@v4

- name: Set up Erlang and Elixir
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.elixir}}
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

- name: Restore dependencies cache
- name: Cache Mix dependencies
uses: actions/cache@v3
id: dependency-cache
id: cache-deps
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-

- name: Install dependencies
if: steps.dependency-cache.outputs.cache-hit != 'true'
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
path: |
deps
_build
key: |
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
mix-${{ runner.os }}-${{matrix.elixir}}-${{matrix.otp}}-

# - name: Run the formatter
# run: mix format --check-formatted
- run: mix do deps.get --check-locked, deps.compile
if: steps.cache-deps.outputs.cache-hit != 'true'

- name: Check for unused dependencies
run: mix deps.unlock --check-unused
- run: mix format --check-formatted
if: ${{ matrix.lint }}

- name: Compile Mix dependencies
run: mix deps.compile
- run: mix deps.unlock --check-unused
if: ${{ matrix.lint }}

- name: Compile code and check for warnings
run: mix compile --warnings-as-errors
- run: mix compile --warnings-as-errors
if: ${{ matrix.lint }}

- name: Run test suite with coverage
run: mix coveralls
- run: mix coveralls
if: ${{matrix.coverage}}
10 changes: 10 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## 2.1.2 - Patch release

_Released 2024-08-20_

- Replace `:only_new` and `:only_latest` options with a `:jobs` option. Set `:jobs` to `:new` to only process new jobs, and `:latest`
to only process the latest available job.
- Add deprecation warnings to `:only_new` and `:only_latest` option. The options are still accepted but will be removed in the
next major release.
- If the client receives an `{:error, reason}` tuple, reschedule another fetch instead of blowing up.

## 2.1.1 - Patch release

_Released 2023-12-28_
Expand Down
11 changes: 9 additions & 2 deletions lib/off_broadway/splunk/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ defmodule OffBroadway.Splunk.Options do
The report or alert name for the Splunk job we want to consume events from.
"""
],
jobs: [
default: :all,
type: {:in, [:all, :new, :latest]},
doc: """
Which jobs to add to the initial queue. Possible values are: `:all`, `:new`, `:latest`.
"""
],
receive_interval: [
type: :non_neg_integer,
doc: """
Expand All @@ -35,14 +42,14 @@ defmodule OffBroadway.Splunk.Options do
If set to `true`, the pipeline will skip adding any existing jobs to the initial queue.
""",
type: :boolean,
default: false
deprecated: "Use jobs: :new instead."
],
only_latest: [
doc: """
If set to `true`, the pipeline will only add the most recent job to the initial queue.
""",
type: :boolean,
default: false
deprecated: "Use jobs: :latest instead."
],
shutdown_timeout: [
type: :timeout,
Expand Down
31 changes: 20 additions & 11 deletions lib/off_broadway/splunk/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,21 @@ defmodule OffBroadway.Splunk.Producer do
refetch_timer: nil,
refetch_interval: opts[:refetch_interval],
name: opts[:name],
jobs: opts[:jobs],
current_job: nil,
first_fetch: true,
completed_jobs: MapSet.new(),
queue: :queue.new(),
splunk_client: {client, client_opts},
broadway: opts[:broadway][:name],
only_new: opts[:only_new],
only_latest: opts[:only_latest],
shutdown_timeout: opts[:shutdown_timeout]
}}
end

@impl true
def prepare_for_start(_module, broadway_opts) do
{producer_module, client_opts} = broadway_opts[:producer][:module]
client_opts = preprocess_options(client_opts)

case NimbleOptions.validate(client_opts, OffBroadway.Splunk.Options.definition()) do
{:error, error} ->
Expand All @@ -177,6 +178,15 @@ defmodule OffBroadway.Splunk.Producer do
end
end

# NOTE Remove next major release when :only_new and :only_latest are removed.
defp preprocess_options(opts) do
Enum.reduce(opts, [], fn
{:only_new, true}, acc -> Keyword.put_new(acc, :jobs, :new)
{:only_latest, true}, acc -> Keyword.put_new(acc, :jobs, :latest)
{key, value}, acc -> Keyword.put(acc, key, value)
end)
end

defp format_error(%ValidationError{keys_path: [], message: message}) do
"invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2, " <>
message
Expand Down Expand Up @@ -436,11 +446,10 @@ defmodule OffBroadway.Splunk.Producer do
|> Enum.filter(& &1.is_done)
|> Enum.sort_by(& &1.published, {:asc, DateTime})

# This flag can only be true *once*, on the first fetch.
# If set, add all current jobs to "completed jobs", and set the flag false
# so it will never trigger again.
# If the `:jobs` option is set to `:new`, add all existing jobs to
# "completed jobs", and wait for new to arrive.
completed_jobs =
if state.only_new do
if state.first_fetch && state.jobs == :new do
Enum.reduce(jobs, state.completed_jobs, fn job, acc ->
MapSet.put(acc, job)
end)
Expand All @@ -460,18 +469,18 @@ defmodule OffBroadway.Splunk.Producer do
end
end,
state.queue,
:queue.from_list(only_latest?(jobs, state.only_latest))
:queue.from_list(only_latest?(jobs, state.jobs))
)

%{state | queue: new_queue, completed_jobs: completed_jobs, only_new: false}
%{state | queue: new_queue, completed_jobs: completed_jobs, first_fetch: false}
end

defp update_queue_from_response({:ok, _response}, state), do: state
defp update_queue_from_response({:error, _reason}, state), do: state

@spec only_latest?(list :: list(), flag :: boolean()) :: list()
defp only_latest?(list, true), do: Enum.take(list, -1)
defp only_latest?(list, false), do: list
@spec only_latest?(list :: list(), flag :: atom()) :: list()
defp only_latest?(list, :latest), do: Enum.take(list, -1)
defp only_latest?(list, _), do: list

@spec merge_non_nil_fields(map_a :: map(), map_b :: map()) :: map()
defp merge_non_nil_fields(map_a, map_b) do
Expand Down
Loading
Loading