Skip to content

Commit

Permalink
Refactored the delete topic logic. (#186)
Browse files Browse the repository at this point in the history
* Refactored the delete topic logic.

* Modified the tests to use the plural variant of the delete topic command.

* Bugfix when returning futures.

* Refactoring for the ensure_kafka_futures_method.

* Bugfix when re-raising exceptions on waiting for futures.

* Fixed an endless loop in the futures method.

* Fix for the timeout issue.

* Comment fixes.

* Made the exception trace easier to read.
  • Loading branch information
ognjen-j authored May 28, 2021
1 parent 2031a10 commit de315a2
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 578 deletions.
31 changes: 23 additions & 8 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,33 @@ def delete_consumer_group(state: State, consumergroup_id: Tuple[str]):


@delete.command("topic")
@click.argument(
"topic-name", metavar="TOPIC_NAME", required=False, type=click.STRING, autocompletion=list_topics, nargs=-1
)
@click.argument("topic-name", metavar="TOPIC_NAME", required=False, type=click.STRING, autocompletion=list_topics)
@default_options
def delete_topic(state: State, topic_name: str):
"""Delete a topic
"""Delete a single topic
WARNING: This command cannot be undone, and all data in the topic will be lost.
"""
topic_names = list(topic_name) + get_piped_stdin_arguments()
topic_controller = state.cluster.topic_controller
current_topics = [topic.name for topic in topic_controller.list_topics(get_topic_objects=False)]
if topic_name not in current_topics:
click.echo(click.style(f"Topic [{topic_name}] doesn't exist on the cluster.", fg="red"))
else:
click.echo(f"Deleting {click.style(topic_name, fg='green')}")
if ensure_approval("Are you sure?", no_verify=state.no_verify):
topic_controller.delete_topics([Topic(topic_name)])
click.echo(click.style(f"Topic '{topic_name}' successfully deleted.", fg="green"))


@delete.command("topics")
@click.argument("topic-list", metavar="TOPIC_LIST", required=False, autocompletion=list_topics, nargs=-1)
@default_options
def delete_topics(state: State, topic_list: Tuple[str]):
"""Delete multiple topics
WARNING: This command cannot be undone, and all data in the topics will be lost.
"""
topic_names = list(topic_list) + get_piped_stdin_arguments()
topic_controller = state.cluster.topic_controller
current_topics = [topic.name for topic in topic_controller.list_topics(get_topic_objects=False)]
existing_topics: List[str] = []
Expand All @@ -478,9 +495,7 @@ def delete_topic(state: State, topic_name: str):
click.echo(click.style("The provided list contains no existing topics.", fg="red"))
else:
if ensure_approval("Are you sure?", no_verify=state.no_verify):
for topic_name in existing_topics:
topic_controller.delete_topic(Topic(topic_name))
assert topic_name not in (t.name for t in topic_controller.list_topics(get_topic_objects=False))
topic_controller.delete_topics([Topic(topic_name) for topic_name in existing_topics])
click.echo(click.style(f"Topics '{existing_topics}' successfully deleted.", fg="green"))


Expand Down
1 change: 1 addition & 0 deletions esque/clients/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _setup_config(self):
{
"group.id": self._group_id,
"error_cb": log_error,
"session.timeout.ms": 10_000,
# We need to commit offsets manually once we"re sure it got saved
# to the sink
"enable.auto.commit": self._enable_auto_commit,
Expand Down
18 changes: 15 additions & 3 deletions esque/controller/topic_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from confluent_kafka.cimpl import KafkaException, NewTopic, TopicPartition

from esque.config import ESQUE_GROUP_ID, Config
from esque.errors import TopicDeletionException
from esque.helpers import ensure_kafka_future_done
from esque.resources.topic import Partition, Topic, TopicDiff

Expand Down Expand Up @@ -96,9 +97,20 @@ def _get_altered_config(self, topic: Topic) -> Dict[str, str]:
altered_config[name] = value
return altered_config

def delete_topic(self, topic: Topic):
future = self.cluster.confluent_client.delete_topics([topic.name])[topic.name]
ensure_kafka_future_done(future)
def delete_topic(self, topic: Topic) -> bool:
return self.delete_topics([topic])

def delete_topics(self, topics: List[Topic]) -> bool:
futures = self.cluster.confluent_client.delete_topics([topic.name for topic in topics], operation_timeout=60)
errors: List[str] = []
for topic_name, future in futures.items():
try:
future.result()
except KafkaException as e:
errors.append(f"[{topic_name}]: {e.args[0].str()}")
if errors:
raise TopicDeletionException("The following exceptions occurred:\n " + "\n ".join(sorted(errors)))
return True

def get_cluster_topic(
self, topic_name: str, *, retrieve_last_timestamp: bool = False, retrieve_partition_data: bool = True
Expand Down
4 changes: 4 additions & 0 deletions esque/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class InvalidReplicationFactorException(KafkaException):
pass


class TopicDeletionException(ExceptionWithMessage):
pass


class ValidationException(ExceptionWithMessage):
pass

Expand Down
31 changes: 16 additions & 15 deletions esque/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from concurrent.futures import Future, wait
from itertools import islice
from typing import Type, TypeVar
from typing import List, Type, TypeVar

import confluent_kafka
import pendulum
Expand Down Expand Up @@ -31,22 +30,24 @@ def set_instance(cls: Type[T], instance: T):


def ensure_kafka_future_done(future: Future, timeout: int = 60 * 5) -> Future:
# Clients, such as confluents AdminClient, may return a done future with an exception
done, not_done = wait({future}, timeout=timeout)

if not_done:
raise FutureTimeoutException("Future timed out after {} seconds".format(timeout))
return ensure_kafka_futures_done(futures=[future], timeout=timeout)[0]

result = next(islice(done, 1))

exception = result.exception()
def ensure_kafka_futures_done(futures: List[Future], timeout: int = 60 * 5) -> List[Future]:
# Clients, such as confluents AdminClient, may return a done future with an exception
done, not_done = wait(futures, timeout=timeout)

if exception is None:
return result
elif isinstance(exception, confluent_kafka.KafkaException):
raise_for_kafka_error(exception.args[0])
else:
raise exception
if not_done:
raise FutureTimeoutException("{} future(s) timed out after {} seconds".format(len(not_done), timeout))
for result in list(done):
exception = result.exception()
if exception is None:
continue
if isinstance(exception, confluent_kafka.KafkaException):
raise_for_kafka_error(exception.args[0])
elif isinstance(exception, BaseException):
raise exception
return list(done)


def unpack_confluent_config(config):
Expand Down
Loading

0 comments on commit de315a2

Please sign in to comment.