Skip to content

Commit

Permalink
Merge pull request #494 from kafkaex/v1.0.0-api-migration
Browse files Browse the repository at this point in the history
V1.0.0 api migration
  • Loading branch information
Argonus authored Oct 16, 2024
2 parents df3d446 + 178da3b commit dd1983c
Show file tree
Hide file tree
Showing 73 changed files with 1,571 additions and 473 deletions.
1 change: 0 additions & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
{Credo.Check.Warning.ExpensiveEmptyEnumCheck},
{Credo.Check.Warning.IExPry},
{Credo.Check.Warning.IoInspect},
{Credo.Check.Warning.LazyLogging},
{Credo.Check.Warning.OperationOnSameValues},
{Credo.Check.Warning.OperationWithConstantResult},
{Credo.Check.Warning.UnusedEnumOperation},
Expand Down
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 120
]
100 changes: 100 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
name: CI Integration

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
setup:
name: test | setup dependencies
runs-on: ubuntu-20.04
env:
MIX_ENV: test
strategy:
matrix:
pair:
- elixir: 1.16
otp: 26.1

steps:
- name: Cancel previous runs
uses: styfle/cancel-workflow-action@0.9.0
with:
access_token: ${{ github.token }}
- name: Checkout Github repo
uses: actions/checkout@v2
- name: Setup elixir & erlang environment
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]

- name: Retrieve Mix Dependencies Cache
uses: actions/cache@v2
id: mix-cache # id to use in retrieve action
with:
path: deps
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Retrieve Mix Dependencies Compilation Cache
uses: actions/cache@v2
id: mix-deps-compile-cache # id to use in retrieve action
with:
path: _build
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Install Mix Dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Compile Mix Dependencies
run: mix deps.compile

test:
name: runner / Test
needs: [setup]

runs-on: ubuntu-20.04
env:
MIX_ENV: test

strategy:
fail-fast: false
matrix:
pair:
- elixir: 1.16
otp: 26.1

steps:
- uses: actions/checkout@v2
- name: Setup elixir & erlang environment
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]

- name: Retrieve Mix Dependencies Cache
uses: actions/cache@v2
id: mix-cache # id to use in retrieve action
with:
path: deps
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Retrieve Mix Dependencies Compilation Cache
uses: actions/cache@v2
id: mix-deps-compile-cache # id to use in retrieve action
with:
path: _build
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Docker-compose up
run: ./scripts/docker_up.sh

- name: Docker ps
run: docker ps -a

- name: Run Tests
run: ./scripts/ci_tests.sh
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ jobs:
run: docker ps -a

- name: Run Tests
run: ./scripts/ci_tests.sh
run: mix test
48 changes: 34 additions & 14 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ defmodule KafkaEx do
def describe_group(consumer_group_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())

case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
case Server.call(worker_name, {:describe_groups, [consumer_group_name], opts}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
Expand Down Expand Up @@ -227,8 +227,9 @@ defmodule KafkaEx do
"""
@spec latest_offset(binary, integer, atom | pid) ::
[OffsetResponse.t()] | :topic_not_found
def latest_offset(topic, partition, name \\ Config.default_worker()),
do: offset(topic, partition, :latest, name)
def latest_offset(topic, partition, name \\ Config.default_worker()) do
offset(topic, partition, :latest, name)
end

@doc """
Get the offset of the earliest message still persistent in Kafka
Expand All @@ -240,10 +241,10 @@ defmodule KafkaEx do
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}]
```
"""
@spec earliest_offset(binary, integer, atom | pid) ::
[OffsetResponse.t()] | :topic_not_found
def earliest_offset(topic, partition, name \\ Config.default_worker()),
do: offset(topic, partition, :earliest, name)
@spec earliest_offset(binary, integer, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
def earliest_offset(topic, partition, name \\ Config.default_worker()) do
offset(topic, partition, :earliest, name)
end

@doc """
Get the offset of the message sent at the specified date/time
Expand All @@ -255,14 +256,14 @@ defmodule KafkaEx do
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}]
```
"""
@spec offset(
binary,
number,
:calendar.datetime() | :earliest | :latest,
atom | pid
) :: [OffsetResponse.t()] | :topic_not_found
@type valid_timestamp :: :earliest | :latest | :calendar.datetime()
@spec offset(binary, number, valid_timestamp, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
def offset(topic, partition, time, name \\ Config.default_worker()) do
Server.call(name, {:offset, topic, partition, time})
case Server.call(name, {:offset, topic, partition, time}) do
{:ok, response} -> parse_offset_value(response)
{:error, :topic_not_found} -> :topic_not_found
result -> result
end
end

@wait_time 10
Expand Down Expand Up @@ -812,4 +813,23 @@ defmodule KafkaEx do
end
end
end

# -------------------------------------------------------------------
# Backwards compatibility
# -------------------------------------------------------------------
defp parse_offset_value([%KafkaEx.New.Structs.Offset{} | _] = offsets) do
Enum.map(offsets, fn offset ->
%OffsetResponse{
topic: offset.topic,
partition_offsets:
Enum.map(offset.partition_offsets, fn value ->
%{
partition: value.partition,
error_code: value.error_code,
offset: [value.offset]
}
end)
}
end)
end
end
3 changes: 1 addition & 2 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ defmodule KafkaEx.ConsumerGroup do
opts = Keyword.put(opts, :supervisor_pid, self())

children = [
{KafkaEx.ConsumerGroup.Manager,
{{gen_consumer_module, consumer_module}, group_name, topics, opts}}
{KafkaEx.ConsumerGroup.Manager, {{gen_consumer_module, consumer_module}, group_name, topics, opts}}
]

Supervisor.init(children,
Expand Down
4 changes: 1 addition & 3 deletions lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do
child_spec_builder = fn topic, partition ->
%{
id: gen_consumer_module,
start:
{gen_consumer_module, :start_link,
[consumer_module, group_name, topic, partition, opts]},
start: {gen_consumer_module, :start_link, [consumer_module, group_name, topic, partition, opts]},
shutdown: Keyword.get(opts, :shutdown, @default_worker_shutdown)
}
end
Expand Down
Loading

0 comments on commit dd1983c

Please sign in to comment.