Skip to content

Commit

Permalink
chore: Enable Java compat tests for records types (#176)
Browse files Browse the repository at this point in the history
Fixes #100 (and unlocks first non-alpha release 🎉).
  • Loading branch information
aiven-anton authored May 7, 2024
1 parent 7112ea0 commit 6de3df4
Show file tree
Hide file tree
Showing 63 changed files with 313 additions and 57 deletions.
1 change: 1 addition & 0 deletions codegen/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from kio.static.primitive import i32Timedelta
from kio.static.primitive import i64Timedelta
from kio.static.primitive import TZAware
from kio.static.primitive import Records
from kio.static.constants import ErrorCode
from kio.static.constants import EntityType
'''
Expand Down
10 changes: 1 addition & 9 deletions codegen/generate_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,7 @@ def main() -> None:
entity_snake_case=to_snake_case(entity_type.__name__),
)
)
if (
entity_type.__type__ is not EntityType.nested
and entity_type.__name__
not in {
"ProduceRequest", # Records
"FetchResponse", # Records
"FetchSnapshotResponse", # Records
}
):
if entity_type.__type__ is not EntityType.nested:
module_code[module_path].append(
test_code_java.format(
entity_type=entity_type.__name__,
Expand Down
4 changes: 3 additions & 1 deletion codegen/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ def get_type_hint(self, optional: bool = False) -> str:
hint = "f64"
case Primitive.string:
hint = "str"
case Primitive.bytes_ | Primitive.records:
case Primitive.bytes_:
hint = "bytes"
case Primitive.records:
hint = "Records"
case Primitive.bool_:
hint = "bool"
case Primitive.uuid:
Expand Down
1 change: 0 additions & 1 deletion container/compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '2'
name: kio
services:
# Adopted from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.text.CaseUtils;
Expand Down Expand Up @@ -88,7 +89,10 @@ T create(JsonNode json) throws Exception {
List<?> list = new CollectionCreator(rootMessageInfo, fieldValue, fieldName, fieldSchema).createList();
setter.invoke(instance, list);
} else if (BaseRecords.class.isAssignableFrom(parameterType)) {
throw new Exception("Not implemented");
ByteBuffer buffer = getByteBuffer(fieldValue, fieldName);
if (buffer != null) {
setter.invoke(instance, MemoryRecords.readableRecords(buffer));
}
} else {
Object o = new ObjectCreator<>(
rootMessageInfo, new EntityClass<>(parameterType), fieldSchema).create(fieldValue);
Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v0/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i64
Expand All @@ -30,7 +31,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v1/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand All @@ -31,7 +32,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v10/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v11/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -56,7 +57,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v12/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -100,7 +101,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v13/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from kio.schema.types import ProducerId
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -101,7 +102,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v14/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from kio.schema.types import ProducerId
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -101,7 +102,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v15/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from kio.schema.types import ProducerId
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -101,7 +102,7 @@ class PartitionData:
metadata={"kafka_type": "int32"}, default=BrokerId(-1)
)
"""The preferred read replica for the consumer to use on its next fetch request"""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v2/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand All @@ -31,7 +32,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v3/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand All @@ -31,7 +32,7 @@ class PartitionData:
"""The error code, or 0 if there was no fetch error."""
high_watermark: i64 = field(metadata={"kafka_type": "int64"})
"""The current high water mark."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v4/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -49,7 +50,7 @@ class PartitionData:
"""The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v5/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v6/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v7/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v8/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch/v9/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionData:
"""The current log start offset."""
aborted_transactions: tuple[AbortedTransaction, ...] | None
"""The aborted transactions."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/fetch_snapshot/v0/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.constants import ErrorCode
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand Down Expand Up @@ -61,7 +62,7 @@ class PartitionSnapshot:
"""The total size of the snapshot."""
position: i64 = field(metadata={"kafka_type": "int64"})
"""The starting byte position within the snapshot included in the Bytes field."""
unaligned_records: bytes = field(metadata={"kafka_type": "records"})
unaligned_records: Records = field(metadata={"kafka_type": "records"})
"""Snapshot data in records format which may not be aligned on an offset boundary"""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/produce/v0/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kio.schema.request_header.v1.header import RequestHeader
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand All @@ -25,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
3 changes: 2 additions & 1 deletion src/kio/schema/produce/v1/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kio.schema.request_header.v1.header import RequestHeader
from kio.schema.types import TopicName
from kio.static.constants import EntityType
from kio.static.primitive import Records
from kio.static.primitive import i16
from kio.static.primitive import i32
from kio.static.primitive import i32Timedelta
Expand All @@ -25,7 +26,7 @@ class PartitionProduceData:
__header_schema__: ClassVar[type[RequestHeader]] = RequestHeader
index: i32 = field(metadata={"kafka_type": "int32"})
"""The partition index."""
records: bytes | None = field(metadata={"kafka_type": "records"})
records: Records | None = field(metadata={"kafka_type": "records"})
"""The record data to be produced."""


Expand Down
Loading

0 comments on commit 6de3df4

Please sign in to comment.