Skip to content

Commit

Permalink
Add support for strings in identifiers (#15)
Browse files Browse the repository at this point in the history
Also make topic_id optional for create_topic
Update examples to use string identifier
  • Loading branch information
FireMasterK committed Jun 26, 2024
1 parent 7bb8512 commit dec9d8a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 31 deletions.
10 changes: 5 additions & 5 deletions python_examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# Assuming there's a Python module for iggy with similar functionalities.
from iggy_py import IggyClient, ReceiveMessage

STREAM_ID = 1
TOPIC_ID = 1
STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
PARTITION_ID = 1


Expand All @@ -20,15 +20,15 @@ async def main():

async def consume_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
print(f"Messages will be consumed from stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
print(f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")

offset = 0
messages_per_batch = 10
while True:
try:
polled_messages = client.poll_messages(
stream_id=STREAM_ID,
topic_id=TOPIC_ID,
stream_id=STREAM_NAME,
topic_id=TOPIC_NAME,
partition_id=PARTITION_ID,
count=messages_per_batch,
auto_commit=False
Expand Down
17 changes: 8 additions & 9 deletions python_examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# Assuming we have a Python module for iggy with similar functionality as the Rust one.
from iggy_py import IggyClient, SendMessage as Message

STREAM_ID = 1
TOPIC_ID = 1
STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
PARTITION_ID = 1


Expand All @@ -18,17 +18,16 @@ async def main():

def init_system(client: IggyClient):
try:
client.create_stream(stream_id=STREAM_ID, name="sample-stream")
client.create_stream(name=STREAM_NAME)
print("Stream was created.")
except Exception as e:
print("stream error {}", e)

try:
client.create_topic(
stream_id=STREAM_ID, # Assuming a method exists to create a numeric Identifier.
topic_id=TOPIC_ID,
stream_id=STREAM_NAME, # Assuming a method exists to create a numeric Identifier.
partitions_count=1,
name="sample-topic",
name=STREAM_NAME,
compression_algorithm="none",
)
print("Topic was created.")
Expand All @@ -38,7 +37,7 @@ def init_system(client: IggyClient):

async def produce_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
print(f"Messages will be sent to stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
print(f"Messages will be sent to stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")

current_id = 0
messages_per_batch = 10
Expand All @@ -51,8 +50,8 @@ async def produce_messages(client: IggyClient):
messages.append(message)
try:
client.send_messages(
stream_id=STREAM_ID,
topic_id=TOPIC_ID,
stream_id=STREAM_NAME,
topic_id=TOPIC_NAME,
partitioning=PARTITION_ID,
messages=messages
)
Expand Down
51 changes: 34 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ pub struct IggyClient {
runtime: Runtime,
}

#[derive(FromPyObject)]
enum PyIdentifier {
#[pyo3(transparent, annotation = "str")]
String(String),
#[pyo3(transparent, annotation = "int")]
Int(u32),
}

impl From<PyIdentifier> for Identifier {
fn from(py_identifier: PyIdentifier) -> Self {
match py_identifier {
PyIdentifier::String(s) => Identifier::from_str(&s).unwrap(),
PyIdentifier::Int(i) => Identifier::numeric(i).unwrap(),
}
}
}

#[pymethods]
impl IggyClient {
/// Constructs a new IggyClient.
Expand Down Expand Up @@ -71,8 +88,9 @@ impl IggyClient {
/// Creates a new stream with the provided ID and name.
///
/// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure.
fn create_stream(&self, stream_id: u32, name: String) -> PyResult<()> {
let create_stream_future = self.inner.create_stream(&name, Some(stream_id));
#[pyo3(signature = (name, stream_id = None))]
fn create_stream(&self, name: String, stream_id: Option<u32>) -> PyResult<()> {
let create_stream_future = self.inner.create_stream(&name, stream_id);
let _create_stream = self
.runtime
.block_on(async move { create_stream_future.await })
Expand All @@ -84,19 +102,18 @@ impl IggyClient {
///
/// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.
#[pyo3(
signature = (stream_id, topic_id, partitions_count, name, compression_algorithm, replication_factor = None)
signature = (stream_id, name, partitions_count, compression_algorithm, topic_id = None, replication_factor = None)
)]
fn create_topic(
&self,
stream_id: u32,
topic_id: u32,
partitions_count: u32,
stream_id: PyIdentifier,
name: String,
partitions_count: u32,
compression_algorithm: String,
topic_id: Option<u32>,
replication_factor: Option<u8>,
) -> PyResult<()> {
let stream_id = Identifier::numeric(stream_id)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
let stream_id = Identifier::from(stream_id);
let compression_algorithm = CompressionAlgorithm::from_str(&compression_algorithm)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;

Expand All @@ -106,7 +123,7 @@ impl IggyClient {
partitions_count,
compression_algorithm,
replication_factor,
Some(topic_id),
topic_id,
IggyExpiry::NeverExpire,
None,
);
Expand All @@ -122,8 +139,8 @@ impl IggyClient {
/// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
fn send_messages(
&self,
stream_id: u32,
topic_id: u32,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
partitioning: u32,
messages: &Bound<'_, PyList>,
) -> PyResult<()> {
Expand All @@ -136,8 +153,8 @@ impl IggyClient {
.map(|message| message.inner)
.collect::<Vec<_>>();

let stream_id = Identifier::numeric(stream_id).unwrap();
let topic_id = Identifier::numeric(topic_id).unwrap();
let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let partitioning = Partitioning::partition_id(partitioning);

let send_message_future =
Expand All @@ -154,15 +171,15 @@ impl IggyClient {
/// Returns a list of received messages or a PyRuntimeError on failure.
fn poll_messages(
&self,
stream_id: u32,
topic_id: u32,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
partition_id: u32,
count: u32,
auto_commit: bool,
) -> PyResult<Vec<ReceiveMessage>> {
let consumer = RustConsumer::default();
let stream_id = Identifier::numeric(stream_id).unwrap();
let topic_id = Identifier::numeric(topic_id).unwrap();
let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let strategy = PollingStrategy::next();

let poll_messages = self.inner.poll_messages(
Expand Down

0 comments on commit dec9d8a

Please sign in to comment.