Skip to content

Commit

Permalink
Implementation of Simple Consumer for Python Client (#588)
Browse files Browse the repository at this point in the history
* finish simple_consumer

* fix style issues

* delete private info

* convert comments to English

* add state enum & change_invisible_duration

* extract example

* add more tests

* fix style issue
  • Loading branch information
yanchaomei authored Aug 26, 2023
1 parent 20fd6fb commit d450648
Show file tree
Hide file tree
Showing 11 changed files with 830 additions and 22 deletions.
58 changes: 58 additions & 0 deletions python/examples/simple_consumer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

from rocketmq.client_config import ClientConfig
from rocketmq.filter_expression import FilterExpression
from rocketmq.log import logger
from rocketmq.protocol.definition_pb2 import Resource
from rocketmq.rpc_client import Endpoints
from rocketmq.session_credentials import (SessionCredentials,
SessionCredentialsProvider)
from rocketmq.simple_consumer import SimpleConsumer


async def test():
credentials = SessionCredentials("username", "password")
credentials_provider = SessionCredentialsProvider(credentials)
client_config = ClientConfig(
endpoints=Endpoints("endpoint"),
session_credentials_provider=credentials_provider,
ssl_enabled=True,
)
topic = Resource()
topic.name = "normal_topic"

consumer_group = "yourConsumerGroup"
subscription = {topic.name: FilterExpression("*")}
simple_consumer = (await SimpleConsumer.Builder()
.set_client_config(client_config)
.set_consumer_group(consumer_group)
.set_await_duration(15)
.set_subscription_expression(subscription)
.build())
logger.info(simple_consumer)
# while True:
message_views = await simple_consumer.receive(16, 15)
logger.info(message_views)
for message in message_views:
logger.info(message.body)
logger.info(f"Received a message, topic={message.topic}, message-id={message.message_id}, body-size={len(message.body)}")
await simple_consumer.ack(message)
logger.info(f"Message is acknowledged successfully, message-id={message.message_id}")

if __name__ == "__main__":
asyncio.run(test())
25 changes: 21 additions & 4 deletions python/rocketmq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import asyncio
import threading
from typing import Set

from protocol import definition_pb2, service_pb2
from protocol.definition_pb2 import Code as ProtoCode
Expand Down Expand Up @@ -60,7 +59,7 @@ class Client:
"""
Main client class which handles interaction with the server.
"""
def __init__(self, client_config: ClientConfig, topics: Set[str]):
def __init__(self, client_config: ClientConfig):
"""
Initialization method for the Client class.
Expand All @@ -70,7 +69,6 @@ def __init__(self, client_config: ClientConfig, topics: Set[str]):
self.client_config = client_config
self.client_id = ClientIdEncoder.generate()
self.endpoints = client_config.endpoints
self.topics = topics

#: A cache to store topic routes.
self.topic_route_cache = {}
Expand All @@ -83,13 +81,16 @@ def __init__(self, client_config: ClientConfig, topics: Set[str]):
#: A dictionary to store isolated items.
self.isolated = dict()

def get_topics(self):
raise NotImplementedError("This method should be implemented by the subclass.")

async def start(self):
"""
Start method which initiates fetching of topic routes and schedules heartbeats.
"""
# get topic route
logger.debug(f"Begin to start the rocketmq client, client_id={self.client_id}")
for topic in self.topics:
for topic in self.get_topics():
self.topic_route_cache[topic] = await self.fetch_topic_route(topic)
scheduler = ScheduleWithFixedDelay(self.heartbeat, 3, 12)
scheduler_sync_settings = ScheduleWithFixedDelay(self.sync_settings, 3, 12)
Expand Down Expand Up @@ -489,6 +490,22 @@ async def change_invisible_duration(
request, metadata, timeout_seconds
)

async def receive_message(
self,
endpoints: Endpoints,
request: service_pb2.ReceiveMessageRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
metadata = Signature.sign(self.__client.client_config, self.__client.client_id)

response = await rpc_client.receive_message(
request, metadata, timeout_seconds
)
return response

def telemetry(
self,
endpoints: Endpoints,
Expand Down
73 changes: 73 additions & 0 deletions python/rocketmq/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import List

from filter_expression import ExpressionType
from google.protobuf.duration_pb2 import Duration
from message import MessageView
from rocketmq.client import Client
from rocketmq.protocol.definition_pb2 import \
FilterExpression as ProtoFilterExpression
from rocketmq.protocol.definition_pb2 import FilterType
from rocketmq.protocol.definition_pb2 import Resource as ProtoResource
from rocketmq.protocol.service_pb2 import \
ReceiveMessageRequest as ProtoReceiveMessageRequest


class ReceiveMessageResult:
def __init__(self, endpoints, messages: List['MessageView']):
self.endpoints = endpoints
self.messages = messages


class Consumer(Client):
CONSUMER_GROUP_REGEX = re.compile(r"^[%a-zA-Z0-9_-]+$")

def __init__(self, client_config, consumer_group):
super().__init__(client_config)
self.consumer_group = consumer_group

async def receive_message(self, request, mq, await_duration):
tolerance = self.client_config.request_timeout
timeout = tolerance + await_duration
results = await self.client_manager.receive_message(mq.broker.endpoints, request, timeout)

messages = [MessageView.from_protobuf(message, mq) for message in results]
return ReceiveMessageResult(mq.broker.endpoints, messages)

@staticmethod
def _wrap_filter_expression(filter_expression):
filter_type = FilterType.TAG
if filter_expression.type == ExpressionType.Sql92:
filter_type = FilterType.SQL
return ProtoFilterExpression(
type=filter_type,
expression=filter_expression.expression
)

def wrap_receive_message_request(self, batch_size, mq, filter_expression, await_duration, invisible_duration):
group = ProtoResource()
group.name = self.consumer_group
return ProtoReceiveMessageRequest(
group=group,
message_queue=mq.to_protobuf(),
filter_expression=self._wrap_filter_expression(filter_expression),
long_polling_timeout=Duration(seconds=await_duration),
batch_size=batch_size,
auto_renew=False,
invisible_duration=Duration(seconds=invisible_duration)
)
13 changes: 8 additions & 5 deletions python/rocketmq/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def to_protobuf(self):
:return: The protobuf representation of the broker.
"""
return ProtoBroker(
Name=self.name, Id=self.id, Endpoints=self.endpoints.to_protobuf()
name=self.name, id=self.id, endpoints=self.endpoints.to_protobuf()
)


Expand All @@ -76,8 +76,8 @@ def __init__(self, name=None, resource=None):
:param resource: The resource object.
"""
if resource is not None:
self.namespace = resource.ResourceNamespace
self.name = resource.Name
self.namespace = resource.resource_namespace
self.name = resource.name
else:
self.namespace = ""
self.name = name
Expand All @@ -87,7 +87,10 @@ def to_protobuf(self):
:return: The protobuf representation of the resource.
"""
return ProtoResource(ResourceNamespace=self.namespace, Name=self.name)
resource = ProtoResource()
resource.name = self.name
resource.resource_namespace = self.namespace
return resource

def __str__(self):
return f"{self.namespace}.{self.name}" if self.namespace else self.name
Expand Down Expand Up @@ -219,7 +222,7 @@ def __init__(self, message_queue):
:param message_queue: The initial message queue to be encapsulated.
"""
self._topic_resource = Resource(message_queue.topic)
self._topic_resource = Resource(message_queue.topic.name, message_queue.topic)
self.queue_id = message_queue.id
self.permission = PermissionHelper.from_protobuf(message_queue.permission)
self.accept_message_types = [
Expand Down
35 changes: 35 additions & 0 deletions python/rocketmq/filter_expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum


class ExpressionType(Enum):
Tag = 1
Sql92 = 2


class FilterExpression:
def __init__(self, expression, expression_type=ExpressionType.Tag):
self._expression = expression
self._type = expression_type

@property
def type(self):
return self._type

@property
def expression(self):
return self._expression
Loading

0 comments on commit d450648

Please sign in to comment.