Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a series of features #538

Merged
merged 2 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/rocketmq/client_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class ClientConfig:
def __init__(self, endpoints: str, session_credentials_provider, ssl_enabled: bool):
self.__endpoints = endpoints
self.__session_credentials_provider = session_credentials_provider
self.__ssl_enabled = ssl_enabled

@property
def session_credentials_provider(self):
return self.__session_credentials_provider

@property
def endpoints(self):
return self.__endpoints

@property
def ssl_enabled(self):
return self.__ssl_enabled
47 changes: 47 additions & 0 deletions python/rocketmq/client_id_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import socket
import threading
import time

import rocketmq.utils


class ClientIdEncoder:
__INDEX = 0
__INDEX_LOCK = threading.Lock()
__CLIENT_ID_SEPARATOR = "@"

@staticmethod
def __get_and_increment_sequence():
with ClientIdEncoder.__INDEX_LOCK:
temp = ClientIdEncoder.__INDEX
ClientIdEncoder.__INDEX += 1
return temp

@staticmethod
def generate() -> str:
index = ClientIdEncoder.__get_and_increment_sequence()
return (
socket.gethostname()
+ ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ str(os.getpid())
+ ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ str(index)
+ ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ str(rocketmq.utils.number_to_base(time.monotonic_ns(), 36))
)
137 changes: 137 additions & 0 deletions python/rocketmq/client_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading

from protocol import service_pb2
from rocketmq.client import Client
from rocketmq.rpc_client import Endpoints, RpcClient


class ClientManager:
def __init__(self, client: Client):
self.__client = client
self.__rpc_clients = {}
self.__rpc_clients_lock = threading.Lock()

def __get_rpc_client(self, endpoints: Endpoints, ssl_enabled: bool) -> RpcClient:
with self.__rpc_clients_lock:
rpc_client = self.__rpc_clients.get(endpoints)
if rpc_client:
return rpc_client
rpc_client = RpcClient(endpoints.get_target(), ssl_enabled)
self.__rpc_clients[endpoints] = rpc_client
return rpc_client

async def query_route(
self,
endpoints: Endpoints,
request: service_pb2.QueryRouteRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.query_route(request, timeout_seconds)

async def heartbeat(
self,
endpoints: Endpoints,
request: service_pb2.HeartbeatRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.heartbeat(request, timeout_seconds)

async def send_message(
self,
endpoints: Endpoints,
request: service_pb2.SendMessageRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.send_message(request, timeout_seconds)

async def query_assignment(
self,
endpoints: Endpoints,
request: service_pb2.QueryAssignmentRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.query_assignment(request, timeout_seconds)

async def ack_message(
self,
endpoints: Endpoints,
request: service_pb2.AckMessageRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.ack_message(request, timeout_seconds)

async def forward_message_to_dead_letter_queue(
self,
endpoints: Endpoints,
request: service_pb2.ForwardMessageToDeadLetterQueueRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.forward_message_to_dead_letter_queue(
request, timeout_seconds
)

async def end_transaction(
self,
endpoints: Endpoints,
request: service_pb2.EndTransactionRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.end_transaction(request, timeout_seconds)

async def notify_client_termination(
self,
endpoints: Endpoints,
request: service_pb2.NotifyClientTerminationRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.notify_client_termination(request, timeout_seconds)

async def change_invisible_duration(
self,
endpoints: Endpoints,
request: service_pb2.ChangeInvisibleDurationRequest,
timeout_seconds: int,
):
rpc_client = self.__get_rpc_client(
endpoints, self.__client.client_config.ssl_enabled
)
return await rpc_client.change_invisible_duration(request, timeout_seconds)
38 changes: 38 additions & 0 deletions python/rocketmq/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os

logger = logging.getLogger("rocketmqlogger")
logger.setLevel(logging.DEBUG)

log_path = os.path.join(
os.path.expanduser("~"), "logs", "rocketmq", "rocketmq-client.log"
)
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(logging.DEBUG)

console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)

formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(process)d] [%(threadName)s] [%(filename)s#%(funcName)s:%(lineno)d] %(message)s"
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

logger.addHandler(file_handler)
logger.addHandler(console_handler)
125 changes: 125 additions & 0 deletions python/rocketmq/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from rocketmq.message_id import MessageId


class Message:
def __init__(
self,
topic: str,
body: bytes,
properties: map = None,
tag: str = None,
keys: str = None,
message_group: str = None,
delivery_timestamp: int = None,
):
if properties is None:
properties = {}
self.__topic = topic
self.__body = body
self.__properties = properties
self.__tag = tag
self.__keys = keys
self.__message_group = message_group
self.__delivery_timestamp = delivery_timestamp

@property
def topic(self):
return self.__topic

@property
def body(self):
return self.__body

@property
def properties(self):
return self.__properties

@property
def tag(self):
return self.__tag

@property
def keys(self):
return self.__keys

@property
def message_group(self):
return self.__message_group

@property
def delivery_timestamp(self):
return self.__delivery_timestamp


class MessageView:
def __init__(
self,
message_id: MessageId,
topic: str,
body: bytes,
properties: map,
tag: str,
keys: str,
message_group: str,
delivery_timestamp: int,
born_host: str,
delivery_attempt: int,
):
self.__message_id = message_id
self.__topic = topic
self.__body = body
self.__properties = properties
self.__tag = tag
self.__keys = keys
self.__message_group = message_group
self.__delivery_timestamp = delivery_timestamp
self.__born_host = born_host
self.__delivery_attempt = delivery_attempt

@property
def topic(self):
return self.__topic

@property
def message_id(self):
return self.__message_id

@property
def born_host(self):
return self.__born_host

@property
def keys(self):
return self.__keys

@property
def properties(self):
return self.__properties

@property
def tag(self):
return self.__tag

@property
def message_group(self):
return self.__message_group

@property
def delivery_timestamp(self):
return self.__delivery_timestamp
Loading