-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8896093
commit 44df2b0
Showing
10 changed files
with
702 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .models import kafka_backends # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Exceptions raised by the kafka service.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,332 @@ | ||
"""KafkaBackend class with methods for supported APIs.""" | ||
|
||
import uuid | ||
from datetime import datetime | ||
from typing import Any, Dict, List, Optional, Tuple | ||
|
||
from moto.core.base_backend import BackendDict, BaseBackend | ||
from moto.core.common_models import BaseModel | ||
from moto.utilities.utils import get_partition | ||
|
||
from ..utilities.tagging_service import TaggingService | ||
|
||
|
||
class FakeKafkaCluster(BaseModel): | ||
def __init__( | ||
self, | ||
cluster_name: str, | ||
account_id: str, | ||
region_name: str, | ||
cluster_type: str, | ||
tags: Optional[Dict[str, str]] = None, | ||
broker_node_group_info: Optional[Dict[str, Any]] = None, | ||
kafka_version: Optional[str] = None, | ||
number_of_broker_nodes: Optional[int] = None, | ||
configuration_info: Optional[Dict[str, Any]] = None, | ||
serverless_config: Optional[Dict[str, Any]] = None, | ||
encryption_info: Optional[Dict[str, Any]] = None, | ||
enhanced_monitoring: str = "DEFAULT", | ||
open_monitoring: Optional[Dict[str, Any]] = None, | ||
logging_info: Optional[Dict[str, Any]] = None, | ||
storage_mode: str = "LOCAL", | ||
current_version: str = "1.0", | ||
client_authentication: Optional[Dict[str, Any]] = None, | ||
state: str = "CREATING", | ||
active_operation_arn: Optional[str] = None, | ||
zookeeper_connect_string: Optional[str] = None, | ||
zookeeper_connect_string_tls: Optional[str] = None, | ||
): | ||
# General attributes | ||
self.cluster_id = str(uuid.uuid4()) | ||
self.cluster_name = cluster_name | ||
self.account_id = account_id | ||
self.region_name = region_name | ||
self.cluster_type = cluster_type | ||
self.tags = tags or {} | ||
self.state = state | ||
self.creation_time = datetime.now().isoformat() | ||
self.current_version = current_version | ||
self.active_operation_arn = active_operation_arn | ||
self.arn = self._generate_arn() | ||
|
||
# Attributes specific to PROVISIONED clusters | ||
self.broker_node_group_info = broker_node_group_info | ||
self.kafka_version = kafka_version | ||
self.number_of_broker_nodes = number_of_broker_nodes | ||
self.configuration_info = configuration_info | ||
self.encryption_info = encryption_info | ||
self.enhanced_monitoring = enhanced_monitoring | ||
self.open_monitoring = open_monitoring | ||
self.logging_info = logging_info | ||
self.storage_mode = storage_mode | ||
self.client_authentication = client_authentication | ||
self.zookeeper_connect_string = zookeeper_connect_string | ||
self.zookeeper_connect_string_tls = zookeeper_connect_string_tls | ||
|
||
# Attributes specific to SERVERLESS clusters | ||
self.serverless_config = serverless_config | ||
|
||
def _generate_arn(self) -> str: | ||
resource_type = ( | ||
"cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster" | ||
) | ||
partition = get_partition(self.region_name) | ||
return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" | ||
|
||
|
||
class KafkaBackend(BaseBackend): | ||
"""Implementation of Kafka APIs.""" | ||
|
||
def __init__(self, region_name: str, account_id: str): | ||
super().__init__(region_name, account_id) | ||
self.clusters: Dict[str, FakeKafkaCluster] = {} | ||
self.tagger = TaggingService() | ||
|
||
def create_cluster_v2( | ||
self, | ||
cluster_name: str, | ||
tags: Optional[Dict[str, str]], | ||
provisioned: Optional[Dict[str, Any]], | ||
serverless: Optional[Dict[str, Any]], | ||
) -> Tuple[str, str, str, str]: | ||
if provisioned: | ||
cluster_type = "PROVISIONED" | ||
broker_node_group_info = provisioned.get("brokerNodeGroupInfo") | ||
kafka_version = provisioned.get("kafkaVersion", "default-kafka-version") | ||
number_of_broker_nodes = int(provisioned.get("numberOfBrokerNodes", 1)) | ||
storage_mode = provisioned.get("storageMode", "LOCAL") | ||
serverless_config = None | ||
elif serverless: | ||
cluster_type = "SERVERLESS" | ||
broker_node_group_info = None | ||
kafka_version = None | ||
number_of_broker_nodes = None | ||
storage_mode = None | ||
serverless_config = serverless | ||
|
||
new_cluster = FakeKafkaCluster( | ||
cluster_name=cluster_name, | ||
account_id=self.account_id, | ||
region_name=self.region_name, | ||
cluster_type=cluster_type, | ||
broker_node_group_info=broker_node_group_info, | ||
kafka_version=kafka_version, | ||
number_of_broker_nodes=number_of_broker_nodes, | ||
serverless_config=serverless_config, | ||
tags=tags, | ||
state="CREATING", | ||
storage_mode=storage_mode if storage_mode else "LOCAL", | ||
current_version="1.0", | ||
) | ||
|
||
self.clusters[new_cluster.arn] = new_cluster | ||
|
||
if tags: | ||
self.tag_resource(new_cluster.arn, tags) | ||
|
||
return ( | ||
new_cluster.arn, | ||
new_cluster.cluster_name, | ||
new_cluster.state, | ||
new_cluster.cluster_type, | ||
) | ||
|
||
def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: | ||
cluster = self.clusters[cluster_arn] | ||
|
||
cluster_info: Dict[str, Any] = { | ||
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", | ||
"clusterArn": cluster.arn, | ||
"clusterName": cluster.cluster_name, | ||
"clusterType": cluster.cluster_type, | ||
"creationTime": cluster.creation_time, | ||
"currentVersion": cluster.current_version, | ||
"state": cluster.state, | ||
"stateInfo": { | ||
"code": "string", | ||
"message": "Cluster state details.", | ||
}, | ||
"tags": self.list_tags_for_resource(cluster.arn), | ||
} | ||
|
||
if cluster.cluster_type == "PROVISIONED": | ||
cluster_info.update( | ||
{ | ||
"provisioned": { | ||
"brokerNodeGroupInfo": cluster.broker_node_group_info or {}, | ||
"clientAuthentication": cluster.client_authentication or {}, | ||
"currentBrokerSoftwareInfo": { | ||
"configurationArn": (cluster.configuration_info or {}).get( | ||
"arn", "string" | ||
), | ||
"configurationRevision": ( | ||
cluster.configuration_info or {} | ||
).get("revision", 1), | ||
"kafkaVersion": cluster.kafka_version, | ||
}, | ||
"encryptionInfo": cluster.encryption_info or {}, | ||
"enhancedMonitoring": cluster.enhanced_monitoring, | ||
"openMonitoring": cluster.open_monitoring or {}, | ||
"loggingInfo": cluster.logging_info or {}, | ||
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, | ||
"zookeeperConnectString": cluster.zookeeper_connect_string | ||
or "zookeeper.example.com:2181", | ||
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls | ||
or "zookeeper.example.com:2181", | ||
"storageMode": cluster.storage_mode, | ||
"customerActionStatus": "NONE", | ||
} | ||
} | ||
) | ||
|
||
elif cluster.cluster_type == "SERVERLESS": | ||
cluster_info.update( | ||
{ | ||
"serverless": { | ||
"vpcConfigs": cluster.serverless_config.get("vpcConfigs", []) | ||
if cluster.serverless_config | ||
else [], | ||
"clientAuthentication": cluster.serverless_config.get( | ||
"clientAuthentication", {} | ||
) | ||
if cluster.serverless_config | ||
else {}, | ||
} | ||
} | ||
) | ||
|
||
return cluster_info | ||
|
||
def list_clusters_v2( | ||
self, | ||
cluster_name_filter: Optional[str], | ||
cluster_type_filter: Optional[str], | ||
max_results: Optional[int], | ||
next_token: Optional[str], | ||
) -> Tuple[List[Dict[str, Any]], Optional[str]]: | ||
cluster_info_list = [ | ||
{ | ||
"clusterArn": cluster.arn, | ||
"clusterName": cluster.cluster_name, | ||
"clusterType": cluster.cluster_type, | ||
"state": cluster.state, | ||
"creationTime": cluster.creation_time, | ||
} | ||
for cluster in self.clusters.values() | ||
] | ||
|
||
return cluster_info_list, None | ||
|
||
def create_cluster( | ||
self, | ||
broker_node_group_info: Dict[str, Any], | ||
client_authentication: Optional[Dict[str, Any]], | ||
cluster_name: str, | ||
configuration_info: Optional[Dict[str, Any]] = None, | ||
encryption_info: Optional[Dict[str, Any]] = None, | ||
enhanced_monitoring: str = "DEFAULT", | ||
open_monitoring: Optional[Dict[str, Any]] = None, | ||
kafka_version: str = "2.8.1", | ||
logging_info: Optional[Dict[str, Any]] = None, | ||
number_of_broker_nodes: int = 1, | ||
tags: Optional[Dict[str, str]] = None, | ||
storage_mode: str = "LOCAL", | ||
) -> Tuple[str, str, str]: | ||
new_cluster = FakeKafkaCluster( | ||
cluster_name=cluster_name, | ||
account_id=self.account_id, | ||
region_name=self.region_name, | ||
cluster_type="PROVISIONED", | ||
broker_node_group_info=broker_node_group_info, | ||
client_authentication=client_authentication, | ||
kafka_version=kafka_version, | ||
number_of_broker_nodes=number_of_broker_nodes, | ||
configuration_info=configuration_info, | ||
encryption_info=encryption_info, | ||
enhanced_monitoring=enhanced_monitoring, | ||
open_monitoring=open_monitoring, | ||
logging_info=logging_info, | ||
storage_mode=storage_mode, | ||
) | ||
|
||
self.clusters[new_cluster.arn] = new_cluster | ||
|
||
if tags: | ||
self.tag_resource(new_cluster.arn, tags) | ||
|
||
return new_cluster.arn, new_cluster.cluster_name, new_cluster.state | ||
|
||
def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]: | ||
cluster = self.clusters[cluster_arn] | ||
|
||
return { | ||
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", | ||
"brokerNodeGroupInfo": cluster.broker_node_group_info or {}, | ||
"clientAuthentication": cluster.client_authentication or {}, | ||
"clusterArn": cluster.arn, | ||
"clusterName": cluster.cluster_name, | ||
"creationTime": cluster.creation_time, | ||
"currentBrokerSoftwareInfo": { | ||
"configurationArn": (cluster.configuration_info or {}).get( | ||
"arn", "string" | ||
), | ||
"configurationRevision": (cluster.configuration_info or {}).get( | ||
"revision", 1 | ||
), | ||
"kafkaVersion": cluster.kafka_version, | ||
}, | ||
"currentVersion": cluster.current_version, | ||
"encryptionInfo": cluster.encryption_info or {}, | ||
"enhancedMonitoring": cluster.enhanced_monitoring, | ||
"openMonitoring": cluster.open_monitoring or {}, | ||
"loggingInfo": cluster.logging_info or {}, | ||
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, | ||
"state": cluster.state, | ||
"stateInfo": { | ||
"code": "string", | ||
"message": "Cluster state details.", | ||
}, | ||
"tags": self.list_tags_for_resource(cluster.arn), | ||
"zookeeperConnectString": cluster.zookeeper_connect_string | ||
or "zookeeper.example.com:2181", | ||
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls | ||
or "zookeeper.example.com:2181", | ||
"storageMode": cluster.storage_mode, | ||
"customerActionStatus": "NONE", | ||
} | ||
|
||
def list_clusters( | ||
self, | ||
cluster_name_filter: Optional[str], | ||
max_results: Optional[int], | ||
next_token: Optional[str], | ||
) -> List[Dict[str, Any]]: | ||
cluster_info_list = [ | ||
{ | ||
"clusterArn": cluster.arn, | ||
"clusterName": cluster.cluster_name, | ||
"state": cluster.state, | ||
"creationTime": cluster.creation_time, | ||
"clusterType": cluster.cluster_type, | ||
} | ||
for cluster_arn, cluster in self.clusters.items() | ||
] | ||
|
||
return cluster_info_list | ||
|
||
def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]: | ||
cluster = self.clusters.pop(cluster_arn) | ||
return cluster_arn, cluster.state | ||
|
||
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: | ||
return self.tagger.get_tag_dict_for_resource(resource_arn) | ||
|
||
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: | ||
tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] | ||
self.tagger.tag_resource(resource_arn, tags_list) | ||
|
||
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: | ||
self.tagger.untag_resource_using_names(resource_arn, tag_keys) | ||
|
||
|
||
kafka_backends = BackendDict(KafkaBackend, "kafka") |
Oops, something went wrong.