Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added customer config #4

Merged
merged 9 commits into from
Jul 29, 2024
3 changes: 2 additions & 1 deletion databricks/sdk/chaosgenius/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"Library for Pulling Client Data."
from .customer_config import CustomerConfig
from .data_puller import DataPuller
from .logger import LogSparkDBHandler

__all__ = ["DataPuller", "LogSparkDBHandler"]
__all__ = ["CustomerConfig", "DataPuller", "LogSparkDBHandler"]
74 changes: 74 additions & 0 deletions databricks/sdk/chaosgenius/customer_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
from logging import Logger
from typing import Optional

import pandas as pd
from pyspark.sql.session import SparkSession


class CustomerConfig:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add the set function, incase we are going to need it. It can be handy.

"""
Customer Config class.

Entity type: workspace, cluster, warehouse, job etc
Entity ID: ID of above
Include Entity: "yes"/"no"
Entity Config: JSON {"something": "else"}
"""

def __init__(self, sparkSession: SparkSession, logger: Logger):
self.logger = logger
self.logger.info("Creating customer config table.")
self.sparkSession = sparkSession

try:
sparkSession.sql(
"""
CREATE TABLE IF NOT EXISTS chaosgenius.default.customer_config (
entity_type string,
entity_id string,
include_entity string,
entity_config string
)
"""
)
except Exception:
self.logger.error("Unable to create customer config table.", exc_info=True)

def get(
self,
entity_type: Optional[str] = None,
entity_ids: Optional[list[str]] = None,
include_entity: Optional[str] = None,
) -> pd.DataFrame:
try:
where_query = ""
if entity_type is not None:
where_query += f"where entity_type = '{entity_type}'"

if entity_ids is not None:
if where_query == "":
where_query += "where "
else:
where_query += " and "
entity_ids_string = ",".join(map(lambda x: f"'{x}'", entity_ids))
where_query += f"entity_id in ({entity_ids_string})"

if include_entity is not None:
if where_query == "":
where_query += "where "
else:
where_query += " and "
where_query += f"include_entity = '{include_entity}'"

df = self.sparkSession.sql(
f"select * from chaosgenius.default.customer_config {where_query}"
).toPandas()
df["entity_config"] = (
df["entity_config"].replace("", "{}").apply(lambda x: json.loads(x))
)
except Exception:
self.logger.error("Unable to get config.", exc_info=True)
return pd.DataFrame(
columns=["entity_type", "entity_id", "include_entity", "entity_config"]
)
175 changes: 166 additions & 9 deletions databricks/sdk/chaosgenius/data_puller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utilities for pulling data."""

import datetime as dt
import logging
import json
Expand All @@ -8,7 +9,13 @@
from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from databricks.sdk import WorkspaceClient
from databricks.sdk.chaosgenius.customer_config import CustomerConfig
from databricks.sdk.service import sql as databricks_sql
from databricks.sdk.service.compute import ClusterDetails, InstancePoolAndStats
from databricks.sdk.service.iam import User
from databricks.sdk.service.sql import EndpointInfo
from databricks.sdk.service.jobs import BaseJob


PANDAS_CHUNK_SIZE = 10000

Expand All @@ -20,12 +27,14 @@ def __init__(
self,
workspace_id: str,
workspace_client: WorkspaceClient,
customer_config: CustomerConfig,
spark_session: Optional[SparkSession],
save_to_csv: bool = False,
logger: Optional[logging.Logger] = None,
) -> None:
self._workspace_id = workspace_id
self._workspace_client = workspace_client
self._customer_config = customer_config
self._spark_session = spark_session
self._logger = logger if logger else logging.getLogger("data_puller")
self._pull_time = dt.datetime.now()
Expand All @@ -52,21 +61,23 @@ def __init__(
)

logger.info("Getting cluster list")
self._cluster_list = [i for i in self._workspace_client.clusters.list()]
self._cluster_list = self._get_full_cluster_list()
logger.info(f"Total clusters: {len(self._cluster_list)}")

logger.info("Getting instance pools list")
self._ip_list = [i for i in self._workspace_client.instance_pools.list()]
self._ip_list = self._get_full_instance_pool_info()
logger.info(f"Total pools: {len(self._ip_list)}")

logger.info("Getting warehouses list")
self._wh_list = [i for i in self._workspace_client.warehouses.list()]
self._wh_list = self._get_full_warehouse_info()
logger.info(f"Total warehouses: {len(self._wh_list)}")

logger.info("Getting jobs list")
self._job_list = [
i for i in self._workspace_client.jobs.list(expand_tasks=True)
]
self._job_list = self._get_full_jobs_info()
logger.info(f"Total jobs: {len(self._job_list)}")

logger.info("Getting users list")
self._user_list = [i for i in self._workspace_client.users.list()]
self._user_list = self._get_full_user_info()
logger.info(f"Total users: {len(self._user_list)}")

logger.info("Starting data pull")
Expand All @@ -80,6 +91,152 @@ def __init__(
self._add_status_entry("overall", status=status, data={"results": results})
logger.info("Completed data pull.")

def _get_full_cluster_list(self) -> list[ClusterDetails]:
logger.info("Getting workspace clusters.")
cl = [i for i in self._workspace_client.clusters.list()]
logger.info(f"Current cluster count: {len(cl)}")

logger.info("Adding clusters from customer config.")
ci_add_l = self._customer_config.get(
entity_type="cluster", include_entity="yes"
)
logger.info(f"Num additional clusters: {len(ci_add_l)}.")
for ci in ci_add_l:
if ci not in [i.cluster_id for i in cl]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create one cluster id set after getting information and can check the ci in cluster_ids

cluster_ids = set([i.cluster_id for i in cl])
for ci in ci_add_l:
    if ci not in cluster_ids:

logger.info(f"Additional cluster ID {ci} not in list. Getting info.")
cl.append(self._workspace_client.clusters.get(ci))
logger.info(f"Current cluster count: {len(cl)}")

logger.info("Removing clusters from customer config.")
ci_remove_l = self._customer_config.get(
entity_type="cluster", include_entity="no"
)
logger.info(f"Clusters to be removed: {len(ci_remove_l)}.")
for ci in ci_remove_l:
for i, c in enumerate(cl):
if ci == c.cluster_id:
logger.info(f"Removing cluster ID {ci}.")
cl.pop(i)
break
logger.info(f"Current cluster count: {len(cl)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(f"Clusters to be removed: {len(ci_remove_l)}.")
for ci in ci_remove_l:
for i, c in enumerate(cl):
if ci == c.cluster_id:
logger.info(f"Removing cluster ID {ci}.")
cl.pop(i)
break
logger.info(f"Current cluster count: {len(cl)}")
logger.info(f"Clusters to be removed: {len(ci_remove_l)}.")
ci_remove_ids = set(ci_remove_l)
final_cluster = [cluster for cluster in cl if cluster.cluster_id not in ci_remove_ids]
logger.info(f"Current cluster count: {len(cl)}")
return final_cluster

return cl

def _get_full_instance_pool_info(self) -> list[InstancePoolAndStats]:
logger.info("Getting workspace instance pools.")
ip_list = [i for i in self._workspace_client.instance_pools.list()]
logger.info(f"Current instance pool count: {len(ip_list)}")

logger.info("Adding instance pools from customer config.")
ip_add_l = self._customer_config.get(
entity_type="instance_pool", include_entity="yes"
)
logger.info(f"Num additional instance pools: {len(ip_add_l)}.")
for ip in ip_add_l:
if ip not in [i.instance_pool_id for i in ip_list]:
logger.info(
f"Additional instance pool ID {ip} not in list. Getting info."
)
ip_list.append(self._workspace_client.instance_pools.get(ip))
logger.info(f"Current instance pool count: {len(ip_list)}")

logger.info("Removing instance pools from customer config.")
ip_remove_l = self._customer_config.get(
entity_type="instance_pool", include_entity="no"
)
logger.info(f"Instance pools to be removed: {len(ip_remove_l)}.")
for ip in ip_remove_l:
for i, c in enumerate(ip_list):
if ip == c.instance_pool_id:
logger.info(f"Removing instance pool ID {ip}.")
ip_list.pop(i)
break
logger.info(f"Current instance pool count: {len(ip_list)}")
return ip_list

def _get_full_warehouse_info(self) -> list[EndpointInfo]:
logger.info("Getting workspace warehouses.")
wh_list = [i for i in self._workspace_client.warehouses.list()]
logger.info(f"Current warehouse count: {len(wh_list)}")

logger.info("Adding warehouses from customer config.")
wh_add_l = self._customer_config.get(
entity_type="warehouse", include_entity="yes"
)
logger.info(f"Num additional warehouses: {len(wh_add_l)}.")
for wh in wh_add_l:
if wh not in [i.id for i in wh_list]:
logger.info(f"Additional warehouse ID {wh} not in list. Getting info.")
wh_list.append(self._workspace_client.warehouses.get(wh))
logger.info(f"Current warehouse count: {len(wh_list)}")

logger.info("Removing warehouses from customer config.")
wh_remove_l = self._customer_config.get(
entity_type="warehouse", include_entity="no"
)
logger.info(f"Warehouses to be removed: {len(wh_remove_l)}.")
for wh in wh_remove_l:
for i, c in enumerate(wh_list):
if wh == c.id:
logger.info(f"Removing warehouse ID {wh}.")
wh_list.pop(i)
break
logger.info(f"Current warehouse count: {len(wh_list)}")
return wh_list

def _get_full_jobs_info(self) -> list[BaseJob]:
logger.info("Getting workspace jobs.")
job_list = [i for i in self._workspace_client.jobs.list(expand_tasks=True)]
logger.info(f"Current job count: {len(job_list)}")

logger.info("Adding jobs from customer config.")
job_add_l = self._customer_config.get(entity_type="job", include_entity="yes")
logger.info(f"Num additional jobs: {len(job_add_l)}.")
for job in job_add_l:
if job not in [i.job_id for i in job_list]:
logger.info(f"Additional job ID {job} not in list. Getting info.")
job_list.append(self._workspace_client.jobs.get(job))
logger.info(f"Current job count: {len(job_list)}")

logger.info("Removing jobs from customer config.")
job_remove_l = self._customer_config.get(entity_type="job", include_entity="no")
logger.info(f"Jobs to be removed: {len(job_remove_l)}.")
for job in job_remove_l:
for i, c in enumerate(job_list):
if job == c.job_id:
logger.info(f"Removing job ID {job}.")
job_list.pop(i)
break
logger.info(f"Current job count: {len(job_list)}")
return job_list

def _get_full_user_info(self) -> list[User]:
logger.info("Getting workspace users.")
user_list = [i for i in self._workspace_client.users.list()]
logger.info(f"Current user count: {len(user_list)}")

logger.info("Adding users from customer config.")
user_add_l = self._customer_config.get(entity_type="user", include_entity="yes")
logger.info(f"Num additional users: {len(user_add_l)}.")
for user in user_add_l:
if user not in [i.id for i in user_list]:
logger.info(f"Additional user ID {user} not in list. Getting info.")
user_list.append(self._workspace_client.users.get(user))
logger.info(f"Current user count: {len(user_list)}")

logger.info("Removing users from customer config.")
user_remove_l = self._customer_config.get(
entity_type="user", include_entity="no"
)
logger.info(f"Users to be removed: {len(user_remove_l)}.")
for user in user_remove_l:
for i, c in enumerate(user_list):
if user == c.id:
logger.info(f"Removing user ID {user}.")
user_list.pop(i)
break
logger.info(f"Current user count: {len(user_list)}")
return user_list

def _get_start_end_time(self) -> tuple[int, int]:
try:
df = self._spark_session.sql(
Expand Down Expand Up @@ -216,7 +373,7 @@ def get_clusters_events(
self._save_iterator_in_chunks(
iterator=cluster_events,
metadata={"cluster_id": cluster.cluster_id},
table_name="clusters_events"
table_name="clusters_events",
)
results.append((cluster.cluster_id, True))
except Exception:
Expand Down Expand Up @@ -365,7 +522,7 @@ def get_job_runs_list(self) -> bool:
self._save_iterator_in_chunks(
iterator=job_runs,
metadata={"job_id": job.job_id},
table_name="jobs_runs_list"
table_name="jobs_runs_list",
)
job_results.append((job.job_id, True))
except Exception:
Expand Down
Loading
Loading