diff --git a/docs/deployment/provision/overview.mdx b/docs/deployment/provision/overview.mdx
index be567c4725..ce4d803f5f 100644
--- a/docs/deployment/provision/overview.mdx
+++ b/docs/deployment/provision/overview.mdx
@@ -27,7 +27,8 @@ Provisioning in Keep is controlled through environment variables and configurati
 | Provisioning Type      | Environment Variable           | Purpose                                                                   |
 | ---------------------- | ------------------------------ | ------------------------------------------------------------------------- |
 | **Provider**           | `KEEP_PROVIDERS`               | JSON string containing provider configurations with deduplication rules   |
-| **Workflow**           | `KEEP_WORKFLOW`                | One workflow to provision right from the env variable.                    |
+| **Providers**          | `KEEP_PROVIDERS_DIRECTORY`     | Directory path containing provider configuration files                    |
+| **Workflow**           | `KEEP_WORKFLOW`                | One workflow to provision right from the env variable                     |
 | **Workflows**          | `KEEP_WORKFLOWS_DIRECTORY`     | Directory path containing workflow configuration files                    |
 | **Dashboard**          | `KEEP_DASHBOARDS`              | JSON string containing dashboard configurations                           |
 
diff --git a/docs/deployment/provision/provider.mdx b/docs/deployment/provision/provider.mdx
index c73b91cbdf..9173a1db6b 100644
--- a/docs/deployment/provision/provider.mdx
+++ b/docs/deployment/provision/provider.mdx
@@ -99,29 +99,15 @@ deduplication_rules:
 Keep supports a wide range of provider types. Each provider type has its own specific configuration requirements.
 To see the full list of supported providers and their detailed configuration options, please refer to our comprehensive provider documentation.
 
-
 ### Update Provisioned Providers
 
-#### Using KEEP_PROVIDERS
-
-Provider configurations can be updated dynamically by changing the `KEEP_PROVIDERS` environment variable.
-
-On every restart, Keep reads this environment variable and determines which providers need to be added or removed.
-
-This process allows for flexible management of data sources without requiring manual intervention. By simply updating the `KEEP_PROVIDERS` variable and restarting the application, you can efficiently add new providers, remove existing ones, or modify their configurations.
-
-The high-level provisioning mechanism:
-1. Keep reads the `KEEP_PROVIDERS` value.
-2. Keep checks if there are any provisioned providers that are no longer in the `KEEP_PROVIDERS` value, and deletes them.
-3. Keep installs all providers from the `KEEP_PROVIDERS` value.
-
-#### Using KEEP_PROVIDERS_DIRECTORY
+Keep uses a consistent process for updating provider configurations regardless of whether you use `KEEP_PROVIDERS` or `KEEP_PROVIDERS_DIRECTORY`.
 
-Provider configurations can be updated dynamically by changing the YAML files in the `KEEP_PROVIDERS_DIRECTORY` directory.
+#### Provisioning Process
 
-On every restart, Keep reads the YAML files in the `KEEP_PROVIDERS_DIRECTORY` directory and determines which providers need to be added or removed.
+When Keep starts or restarts, it follows these steps to manage provider configurations:
 
-The high-level provisioning mechanism:
-1. Keep reads the YAML files in the `KEEP_PROVIDERS_DIRECTORY` directory.
-2. Keep checks if there are any provisioned providers that are no longer in the YAML files, and deletes them.
-3. Keep installs all providers from the YAML files.
+1. **Read Configurations**: Loads provider definitions from either the `KEEP_PROVIDERS` environment variable or YAML files in the `KEEP_PROVIDERS_DIRECTORY`.
+2. **Create New Providers**: Installs any providers listed in the configuration that are not already present.
+3. **Update When Changed**: If an existing provider's configuration has changed, Keep reapplies the configuration, including deduplication rules. If errors occur during this update, changes are automatically rolled back.
+4. **Delete Providers**: Deletes any currently installed providers that are not found in the loaded configuration.
diff --git a/keep/api/alert_deduplicator/deduplication_rules_provisioning.py b/keep/api/alert_deduplicator/deduplication_rules_provisioning.py
index 4b26d3b98c..d1741e4479 100644
--- a/keep/api/alert_deduplicator/deduplication_rules_provisioning.py
+++ b/keep/api/alert_deduplicator/deduplication_rules_provisioning.py
@@ -4,23 +4,28 @@
 
 import keep.api.core.db as db
 from keep.api.core.config import config
-from keep.providers.providers_factory import ProvidersFactory
+from keep.api.models.db.provider import Provider
 
 logger = logging.getLogger(__name__)
 
 
-def provision_deduplication_rules(deduplication_rules: dict[str, any], tenant_id: str):
+def provision_deduplication_rules(
+    deduplication_rules: dict[str, any], tenant_id: str, provider: Provider
+):
     """
     Provisions deduplication rules for a given tenant.
 
     Args:
         deduplication_rules (dict[str, any]): A dictionary where the keys are rule names and the values are
-        DeduplicationRuleRequestDto objects.
+            DeduplicationRuleRequestDto objects.
         tenant_id (str): The ID of the tenant for which deduplication rules are being provisioned.
+        provider (Provider): The provider for which the deduplication rules are being provisioned.
     """
-    enrich_with_providers_info(deduplication_rules, tenant_id)
+    enrich_with_provider_info(deduplication_rules, provider)
 
-    all_deduplication_rules_from_db = db.get_all_deduplication_rules(tenant_id)
+    all_deduplication_rules_from_db = db.get_all_deduplication_rules_by_provider(
+        tenant_id, provider.id, provider.type
+    )
     provisioned_deduplication_rules = [
         rule for rule in all_deduplication_rules_from_db if rule.is_provisioned
     ]
@@ -29,17 +34,6 @@ def provision_deduplication_rules(deduplication_rules: dict[str, any], tenant_id
     }
     actor = "system"
 
-    # delete rules that are not in the env
-    for provisioned_deduplication_rule in provisioned_deduplication_rules:
-        if str(provisioned_deduplication_rule.name) not in deduplication_rules:
-            logger.info(
-                "Deduplication rule with name '%s' is not in the env, deleting from DB",
-                provisioned_deduplication_rule.name,
-            )
-            db.delete_deduplication_rule(
-                rule_id=str(provisioned_deduplication_rule.id), tenant_id=tenant_id
-            )
-
     for (
         deduplication_rule_name,
         deduplication_rule_to_provision,
@@ -97,47 +91,36 @@ def provision_deduplication_rules(deduplication_rules: dict[str, any], tenant_id
             is_provisioned=True,
         )
 
+        logger.info(
+            "Provisioned deduplication rules %s successfully",
+            deduplication_rule_name,
+        )
 
-def provision_deduplication_rules_from_env(tenant_id: str):
-    """
-    Provisions deduplication rules from environment variables for a given tenant.
-    This function reads deduplication rules from environment variables, validates them,
-    and then provisions them into the database. It handles the following:
-    - Deletes deduplication rules from the database that are not present in the environment variables.
-    - Updates existing deduplication rules in the database if they are present in the environment variables.
-    - Creates new deduplication rules in the database if they are not already present.
-    Args:
-        tenant_id (str): The ID of the tenant for which deduplication rules are being provisioned.
-    Raises:
-        ValueError: If the deduplication rules from the environment variables are invalid.
-    """
-
-    deduplication_rules_from_env_dict = get_deduplication_rules_to_provision()
-
-    if not deduplication_rules_from_env_dict:
-        logger.info("No deduplication rules found in env. Nothing to provision.")
-        return
-
-    provision_deduplication_rules(deduplication_rules_from_env_dict, tenant_id)
+    # Delete provisioned deduplication rules that are not provisioned anymore
+    for rule_name, rule in provisioned_deduplication_rules_from_db_dict.items():
+        if rule_name not in deduplication_rules:
+            logger.info(
+                "Deduplication rule with name '%s' no longer in configuration, deleting from DB",
+                rule_name,
+            )
+            db.delete_deduplication_rule(rule_id=str(rule.id), tenant_id=tenant_id)
+            logger.info(
+                "Deleted deduplication rule %s successfully",
+                rule_name,
+            )
 
 
-def enrich_with_providers_info(deduplication_rules: dict[str, any], tenant_id: str):
+def enrich_with_provider_info(deduplication_rules: dict[str, any], provider: Provider):
     """
     Enriches passed deduplication rules with provider ID and type information.
 
     Args:
         deduplication_rules (dict[str, any]): A list of deduplication rules to be enriched.
-        tenant_id (str): The ID of the tenant for which deduplication rules are being provisioned.
+        provider (Provider): The provider for which the deduplication rules are being provisioned.
     """
 
-    installed_providers = ProvidersFactory.get_installed_providers(tenant_id)
-    installed_providers_dict = {
-        provider.details.get("name"): provider for provider in installed_providers
-    }
-
     for rule_name, rule in deduplication_rules.items():
         logger.info(f"Enriching deduplication rule: {rule_name}")
-        provider = installed_providers_dict.get(rule.get("provider_name"))
         rule["provider_id"] = provider.id
         rule["provider_type"] = provider.type
 
diff --git a/keep/api/config.py b/keep/api/config.py
index 98a16dd4b3..13f136afda 100644
--- a/keep/api/config.py
+++ b/keep/api/config.py
@@ -2,9 +2,6 @@
 import os
 
 import keep.api.logging
-from keep.api.alert_deduplicator.deduplication_rules_provisioning import (
-    provision_deduplication_rules_from_env,
-)
 from keep.api.api import AUTH_TYPE
 from keep.api.core.db_on_start import migrate_db, try_create_single_tenant
 from keep.api.core.dependencies import SINGLE_TENANT_UUID
@@ -33,9 +30,6 @@ def provision_resources():
         logger.info("Workflows provisioned successfully")
         provision_dashboards(SINGLE_TENANT_UUID)
         logger.info("Dashboards provisioned successfully")
-        logger.info("Provisioning deduplication rules")
-        provision_deduplication_rules_from_env(SINGLE_TENANT_UUID)
-        logger.info("Deduplication rules provisioned successfully")
     else:
         logger.info("Provisioning resources is disabled")
 
diff --git a/keep/api/core/db.py b/keep/api/core/db.py
index 32d7fe511f..70e5da7358 100644
--- a/keep/api/core/db.py
+++ b/keep/api/core/db.py
@@ -40,7 +40,7 @@
 from sqlalchemy.dialects.postgresql import insert as pg_insert
 from sqlalchemy.dialects.sqlite import insert as sqlite_insert
 from sqlalchemy.exc import IntegrityError, OperationalError
-from sqlalchemy.orm import joinedload, subqueryload, foreign
+from sqlalchemy.orm import foreign, joinedload, subqueryload
 from sqlalchemy.orm.exc import StaleDataError
 from sqlalchemy.sql import exists, expression
 from sqlmodel import Session, SQLModel, col, or_, select, text
@@ -2354,6 +2354,18 @@ def get_all_deduplication_rules(tenant_id):
     return rules
 
 
+def get_all_deduplication_rules_by_provider(tenant_id, provider_id, provider_type):
+    with Session(engine) as session:
+        rules = session.exec(
+            select(AlertDeduplicationRule).where(
+                AlertDeduplicationRule.tenant_id == tenant_id,
+                AlertDeduplicationRule.provider_id == provider_id,
+                AlertDeduplicationRule.provider_type == provider_type,
+            )
+        ).all()
+    return rules
+
+
 def get_deduplication_rule_by_id(tenant_id, rule_id: str):
     rule_uuid = __convert_to_uuid(rule_id)
     if not rule_uuid:
diff --git a/keep/providers/providers_service.py b/keep/providers/providers_service.py
index 816eec6a97..492933deb5 100644
--- a/keep/providers/providers_service.py
+++ b/keep/providers/providers_service.py
@@ -3,7 +3,7 @@
 import os
 import time
 import uuid
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
 
 from fastapi import HTTPException
 from sqlalchemy.exc import IntegrityError
@@ -14,7 +14,10 @@
 )
 from keep.api.core.config import config
 from keep.api.core.db import (
+    delete_deduplication_rule,
     engine,
+    existed_or_new_session,
+    get_all_deduplication_rules_by_provider,
     get_all_provisioned_providers,
     get_provider_by_name,
     get_provider_logs,
@@ -207,22 +210,6 @@ def install_provider(
             except IntegrityError as e:
                 if "FOREIGN KEY constraint" in str(e):
                     raise
-                try:
-                    # if the provider is already installed, delete the secret
-                    logger.warning(
-                        "Provider already installed, deleting secret",
-                        extra={"error": str(e)},
-                    )
-                    secret_manager.delete_secret(
-                        secret_name=secret_name,
-                    )
-                    logger.warning("Secret deleted")
-                except Exception:
-                    logger.exception("Failed to delete the secret")
-                    pass
-                raise HTTPException(
-                    status_code=409, detail="Provider already installed"
-                )
 
             if provider_model.consumer:
                 try:
@@ -244,106 +231,182 @@ def update_provider(
         provider_id: str,
         provider_info: Dict[str, Any],
         updated_by: str,
-        session: Session,
+        session: Optional[Session] = None,
+        allow_provisioned: bool = False,
+        validate_scopes: bool = True,
     ) -> Dict[str, Any]:
-        provider = session.exec(
-            select(Provider).where(
-                (Provider.tenant_id == tenant_id) & (Provider.id == provider_id)
-            )
-        ).one_or_none()
+        with existed_or_new_session(session) as session:
+            provider = session.exec(
+                select(Provider).where(
+                    (Provider.tenant_id == tenant_id) & (Provider.id == provider_id)
+                )
+            ).one_or_none()
 
-        if not provider:
-            raise HTTPException(404, detail="Provider not found")
+            if not provider:
+                raise HTTPException(404, detail="Provider not found")
 
-        if provider.provisioned:
-            raise HTTPException(403, detail="Cannot update a provisioned provider")
+            if provider.provisioned and not allow_provisioned:
+                raise HTTPException(403, detail="Cannot update a provisioned provider")
 
-        pulling_enabled = provider_info.pop("pulling_enabled", True)
+            pulling_enabled = provider_info.pop("pulling_enabled", True)
 
-        # if pulling_enabled is "true" or "false" cast it to boolean
-        if isinstance(pulling_enabled, str):
-            pulling_enabled = pulling_enabled.lower() == "true"
+            # if pulling_enabled is "true" or "false" cast it to boolean
+            if isinstance(pulling_enabled, str):
+                pulling_enabled = pulling_enabled.lower() == "true"
 
-        provider_config = {
-            "authentication": provider_info,
-            "name": provider.name,
-        }
+            provider_config = {
+                "authentication": provider_info,
+                "name": provider.name,
+            }
 
-        context_manager = ContextManager(tenant_id=tenant_id)
-        try:
-            provider_instance = ProvidersFactory.get_provider(
-                context_manager, provider_id, provider.type, provider_config
+            context_manager = ContextManager(tenant_id=tenant_id)
+            try:
+                provider_instance = ProvidersFactory.get_provider(
+                    context_manager, provider_id, provider.type, provider_config
+                )
+            except Exception as e:
+                raise HTTPException(status_code=400, detail=str(e))
+
+            if validate_scopes:
+                validated_scopes = provider_instance.validate_scopes()
+            else:
+                validated_scopes = {}
+
+            secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
+            secret_manager.write_secret(
+                secret_name=provider.configuration_key,
+                secret_value=json.dumps(provider_config),
             )
-        except Exception as e:
-            raise HTTPException(status_code=400, detail=str(e))
 
-        validated_scopes = provider_instance.validate_scopes()
+            provider.installed_by = updated_by
+            provider.validatedScopes = validated_scopes
+            provider.pulling_enabled = pulling_enabled
+            session.commit()
 
-        secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
-        secret_manager.write_secret(
-            secret_name=provider.configuration_key,
-            secret_value=json.dumps(provider_config),
-        )
+            return {
+                "details": provider_config,
+                "validatedScopes": validated_scopes,
+            }
 
-        provider.installed_by = updated_by
-        provider.validatedScopes = validated_scopes
-        provider.pulling_enabled = pulling_enabled
-        session.commit()
+    @staticmethod
+    def upsert_provider(
+        tenant_id: str,
+        provider_name: str,
+        provider_type: str,
+        provider_config: Dict[str, Any],
+        provisioned: bool = False,
+        validate_scopes: bool = True,
+        provisioned_providers_names: List[str] = [],
+    ) -> Dict[str, Any]:
+        installed_provider_info = None
+        try:
+            # First check if the provider is already installed
+            # If it is, update it, otherwise install it
+            if provider_name in provisioned_providers_names:
+                logger.info(
+                    f"Provider {provider_name} already provisioned, updating..."
+                )
+                provider = get_provider_by_name(tenant_id, provider_name)
+                installed_provider_info = ProvidersService.update_provider(
+                    tenant_id=tenant_id,
+                    provider_id=provider.id,
+                    provider_info=provider_config,
+                    updated_by="system",
+                    allow_provisioned=True,
+                    validate_scopes=validate_scopes,
+                )
+                logger.info(f"Provider {provider_name} updated successfully")
+            else:
+                logger.info(f"Provider {provider_name} not existing, installing...")
+                installed_provider_info = ProvidersService.install_provider(
+                    tenant_id=tenant_id,
+                    installed_by="system",
+                    provider_id=provider_type,
+                    provider_name=provider_name,
+                    provider_type=provider_type,
+                    provider_config=provider_config,
+                    provisioned=provisioned,
+                    validate_scopes=validate_scopes,
+                )
+                logger.info(f"Provider {provider_name} provisioned successfully")
+        except Exception as e:
+            logger.error(
+                "Error provisioning provider from env var",
+                extra={"exception": e},
+            )
+            raise HTTPException(status_code=400, detail=str(e))
 
-        return {
-            "details": provider_config,
-            "validatedScopes": validated_scopes,
-        }
+        return installed_provider_info
 
     @staticmethod
     def delete_provider(
-        tenant_id: str, provider_id: str, session: Session, allow_provisioned=False
+        tenant_id: str,
+        provider_id: str,
+        session: Optional[Session] = None,
+        allow_provisioned=False,
     ):
-        provider_model: Provider = session.exec(
-            select(Provider).where(
-                (Provider.tenant_id == tenant_id) & (Provider.id == provider_id)
-            )
-        ).one_or_none()
+        with existed_or_new_session(session) as session:
+            provider_model: Optional[Provider] = session.exec(
+                select(Provider).where(
+                    (Provider.tenant_id == tenant_id) & (Provider.id == provider_id)
+                )
+            ).one_or_none()
 
-        if not provider_model:
-            raise HTTPException(404, detail="Provider not found")
+            if provider_model is None:
+                raise HTTPException(404, detail="Provider not found")
 
-        if provider_model.provisioned and not allow_provisioned:
-            raise HTTPException(403, detail="Cannot delete a provisioned provider")
+            if provider_model.provisioned and not allow_provisioned:
+                raise HTTPException(403, detail="Cannot delete a provisioned provider")
 
-        context_manager = ContextManager(tenant_id=tenant_id)
-        secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
-        config = secret_manager.read_secret(
-            provider_model.configuration_key, is_json=True
-        )
+            # Delete all associated deduplication rules
+            try:
+                deduplication_rules = get_all_deduplication_rules_by_provider(
+                    tenant_id, provider_model.id, provider_model.type
+                )
+                for rule in deduplication_rules:
+                    logger.info(
+                        f"Deleting deduplication rule {rule.name} for provider {provider_model.name}"
+                    )
+                    delete_deduplication_rule(str(rule.id), tenant_id)
+            except Exception as e:
+                logger.exception(
+                    "Failed to delete deduplication rules for provider",
+                    extra={"exception": e},
+                )
 
-        try:
-            secret_manager.delete_secret(provider_model.configuration_key)
-        except Exception:
-            logger.exception("Failed to delete the provider secret")
+            context_manager = ContextManager(tenant_id=tenant_id)
+            secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
+            config = secret_manager.read_secret(
+                provider_model.configuration_key, is_json=True
+            )
 
-        if provider_model.consumer:
             try:
-                event_subscriber = EventSubscriber.get_instance()
-                event_subscriber.remove_consumer(provider_model)
+                secret_manager.delete_secret(provider_model.configuration_key)
             except Exception:
-                logger.exception("Failed to unregister provider as a consumer")
+                logger.exception("Failed to delete the provider secret")
 
-        try:
-            provider = ProvidersFactory.get_provider(
-                context_manager, provider_model.id, provider_model.type, config
-            )
-            provider.clean_up()
-        except NotImplementedError:
-            logger.info(
-                "Being deleted provider of type %s does not have a clean_up method",
-                provider_model.type,
-            )
-        except Exception:
-            logger.exception(msg="Provider deleted but failed to clean up provider")
+            if provider_model.consumer:
+                try:
+                    event_subscriber = EventSubscriber.get_instance()
+                    event_subscriber.remove_consumer(provider_model)
+                except Exception:
+                    logger.exception("Failed to unregister provider as a consumer")
+
+            try:
+                provider = ProvidersFactory.get_provider(
+                    context_manager, provider_model.id, provider_model.type, config
+                )
+                provider.clean_up()
+            except NotImplementedError:
+                logger.info(
+                    "Being deleted provider of type %s does not have a clean_up method",
+                    provider_model.type,
+                )
+            except Exception:
+                logger.exception(msg="Provider deleted but failed to clean up provider")
 
-        session.delete(provider_model)
-        session.commit()
+            session.delete(provider_model)
+            session.commit()
 
     @staticmethod
     def validate_provider_scopes(
@@ -380,11 +443,36 @@ def is_provider_installed(tenant_id: str, provider_name: str) -> bool:
         return provider is not None
 
     @staticmethod
+    def provision_provider_deduplication_rules(
+        tenant_id: str,
+        provider: Provider,
+        deduplication_rules: Dict[str, Dict[str, Any]],
+    ):
+        # Provision the deduplication rules
+        deduplication_rules_dict: dict[str, dict] = {}
+        for rule_name, rule_config in deduplication_rules.items():
+            logger.info(f"Provisioning deduplication rule {rule_name}")
+            rule_config["name"] = rule_name
+            rule_config["provider_name"] = provider.name
+            rule_config["provider_type"] = provider.type
+            deduplication_rules_dict[rule_name] = rule_config
+
+        try:
+            # Provision deduplication rules
+            provision_deduplication_rules(
+                deduplication_rules=deduplication_rules_dict,
+                tenant_id=tenant_id,
+                provider=provider,
+            )
+        except Exception as e:
+            logger.exception(f"Failed to provision deduplication rules: {e}")
+
     def install_webhook(
         tenant_id: str, provider_type: str, provider_id: str, session: Session
     ) -> bool:
         context_manager = ContextManager(
-            tenant_id=tenant_id, workflow_id=""  # this is not in a workflow scope
+            tenant_id=tenant_id,
+            workflow_id="",  # this is not in a workflow scope
         )
         secret_manager = SecretManagerFactory.get_secret_manager(context_manager)
         provider_secret_name = f"{tenant_id}_{provider_type}_{provider_id}"
@@ -455,10 +543,6 @@ def provision_providers(tenant_id: str):
         provisioned_providers_dir = os.environ.get("KEEP_PROVIDERS_DIRECTORY")
         provisioned_providers_json = os.environ.get("KEEP_PROVIDERS")
 
-        if not (provisioned_providers_dir or provisioned_providers_json):
-            logger.info("No providers for provisioning found")
-            return
-
         if (
             provisioned_providers_dir is not None
             and provisioned_providers_json is not None
@@ -476,154 +560,172 @@ def provision_providers(tenant_id: str):
 
         # Get all existing provisioned providers
         provisioned_providers = get_all_provisioned_providers(tenant_id)
+        provisioned_providers_names = [
+            provider.name for provider in provisioned_providers
+        ]
+        incoming_providers_names = set()
 
-        ### Provisioning from env var
-        if provisioned_providers_json is not None:
-            # Avoid circular import
-            from keep.parser.parser import Parser
+        if not (provisioned_providers_dir or provisioned_providers_json):
+            if provisioned_providers:
+                logger.info(
+                    "No providers for provisioning found. Deleting all provisioned providers."
+                )
+            else:
+                logger.info("No providers for provisioning found. Nothing to do.")
+                return
 
-            parser = Parser()
-            context_manager = ContextManager(tenant_id=tenant_id)
-            parser._parse_providers_from_env(context_manager)
-            env_providers = context_manager.providers_context
+        try:
+            ### Provisioning from env var
+            if provisioned_providers_json is not None:
+                # Avoid circular import
+                from keep.parser.parser import Parser
 
-            # Un-provisioning other providers.
-            for provider in provisioned_providers:
-                if provider.name not in env_providers:
-                    with Session(engine) as session:
-                        try:
-                            logger.info(f"Deleting provider {provider.name}")
-                            ProvidersService.delete_provider(
-                                tenant_id, provider.id, session, allow_provisioned=True
-                            )
-                            logger.info(f"Provider {provider.name} deleted")
-                        except Exception as e:
-                            logger.exception(
-                                "Failed to delete provisioned provider that does not exist in the env var",
-                                extra={"exception": e},
+                parser = Parser()
+                context_manager = ContextManager(tenant_id=tenant_id)
+                parser._parse_providers_from_env(context_manager)
+                env_providers = context_manager.providers_context
+
+                for provider_name, provider_info in env_providers.items():
+                    # We need this to avoid failure in upsert operation results in
+                    # the deletion of the old provisioned provider
+                    incoming_providers_names.add(provider_name)
+
+                    try:
+                        provider_type = provider_info.get("type")
+                        if not provider_type:
+                            logger.error(
+                                f"Provider {provider_name} does not have a type"
                             )
                             continue
 
-            for provider_name, provider_config in env_providers.items():
-                logger.info(f"Provisioning provider {provider_name}")
-                if ProvidersService.is_provider_installed(tenant_id, provider_name):
-                    logger.info(f"Provider {provider_name} already installed")
-                    continue
+                        provider_config = provider_info.get("authentication", {})
+
+                        # Perform upsert operation for the provider
+                        ProvidersService.upsert_provider(
+                            tenant_id=tenant_id,
+                            provider_name=provider_name,
+                            provider_type=provider_type,
+                            provider_config=provider_config,
+                            provisioned=True,
+                            validate_scopes=False,
+                            provisioned_providers_names=provisioned_providers_names,
+                        )
+                    except Exception as e:
+                        logger.error(
+                            "Error provisioning provider from env var",
+                            extra={"exception": e},
+                        )
+                        continue
 
-                logger.info(f"Installing provider {provider_name}")
-                try:
-                    installed_provider = ProvidersService.install_provider(
-                        tenant_id=tenant_id,
-                        installed_by="system",
-                        provider_id=provider_config["type"],
-                        provider_name=provider_name,
-                        provider_type=provider_config["type"],
-                        provider_config=provider_config["authentication"],
-                        provisioned=True,
-                        validate_scopes=False,
-                    )
-                    ProvidersService.install_webhook(
-                        tenant_id,
-                        installed_provider["type"],
-                        installed_provider["id"],
-                        session,
+                    provider = get_provider_by_name(tenant_id, provider_name)
+
+                    # Configure deduplication rules
+                    deduplication_rules = provider_info.get("deduplication_rules", {})
+                    logger.info(
+                        f"Provisioning deduplication rules for provider {provider_name}"
                     )
-                    logger.info(f"Provider {provider_name} provisioned successfully")
-                except Exception as e:
-                    logger.error(
-                        "Error provisioning provider from env var",
-                        extra={"exception": e},
+                    ProvidersService.provision_provider_deduplication_rules(
+                        tenant_id=tenant_id,
+                        provider=provider,
+                        deduplication_rules=deduplication_rules,
                     )
 
-        ### Provisioning from the directory
-        if provisioned_providers_dir is not None:
-            installed_providers = []
-            for file in os.listdir(provisioned_providers_dir):
-                if file.endswith((".yaml", ".yml")):
-                    logger.info(f"Provisioning provider from {file}")
-                    provider_path = os.path.join(provisioned_providers_dir, file)
+            ### Provisioning from the directory
+            if provisioned_providers_dir is not None:
+                for file in os.listdir(provisioned_providers_dir):
+                    if file.endswith((".yaml", ".yml")):
+                        logger.info(f"Provisioning provider from {file}")
+                        provider_path = os.path.join(provisioned_providers_dir, file)
 
-                    try:
-                        with open(provider_path, "r") as yaml_file:
-                            provider_yaml = cyaml.safe_load(yaml_file.read())
-                            provider_name = provider_yaml["name"]
-                            provider_type = provider_yaml["type"]
-                            provider_config = provider_yaml.get("authentication", {})
-
-                            # Skip if already installed
-                            if ProvidersService.is_provider_installed(
-                                tenant_id, provider_name
-                            ):
-                                logger.info(
-                                    f"Provider {provider_name} already installed"
+                        try:
+                            with open(provider_path, "r") as yaml_file:
+                                provider_info = cyaml.safe_load(yaml_file.read())
+                                provider_name = provider_info.get("name")
+                                if not provider_name:
+                                    logger.error(
+                                        f"Provider {provider_path} does not have a name"
+                                    )
+                                    continue
+
+                                # We need this to avoid failure in upsert operation results in
+                                # the deletion of the old provisioned provider
+                                incoming_providers_names.add(provider_name)
+
+                                provider_type = provider_info.get("type")
+                                if not provider_type:
+                                    logger.error(
+                                        f"Provider {provider_path} does not have a type"
+                                    )
+                                    continue
+
+                                provider_config = provider_info.get(
+                                    "authentication", {}
                                 )
-                                # Add to installed providers list. This is necessary, otherwise the provider
-                                # will be un-provisioned on the process un-provisioning outdated providers.
-                                installed_providers.append(provider_name)
-                                continue
 
-                            logger.info(f"Installing provider {provider_name}")
-                            ProvidersService.install_provider(
+                            # Perform upsert operation for the provider
+                            installed_provider_info = ProvidersService.upsert_provider(
                                 tenant_id=tenant_id,
-                                installed_by="system",
-                                provider_id=provider_type,
                                 provider_name=provider_name,
                                 provider_type=provider_type,
                                 provider_config=provider_config,
                                 provisioned=True,
                                 validate_scopes=False,
+                                provisioned_providers_names=provisioned_providers_names,
                             )
-                            logger.info(
-                                f"Provider {provider_name} provisioned successfully"
+                        except Exception as e:
+                            logger.error(
+                                "Error provisioning provider from directory",
+                                extra={"exception": e},
                             )
-                            installed_providers.append(provider_name)
+                            continue
 
-                            # Configure deduplication rules
-                            deduplication_rules = provider_yaml.get(
-                                "deduplication_rules", {}
-                            )
-                            if deduplication_rules:
-                                logger.info(
-                                    f"Provisioning deduplication rules for provider {provider_name}"
-                                )
+                        provider = installed_provider_info["provider"]
 
-                                deduplication_rules_dict: dict[str, dict] = {}
-                                for (
-                                    rule_name,
-                                    rule_config,
-                                ) in deduplication_rules.items():
-                                    logger.info(
-                                        f"Provisioning deduplication rule {rule_name}"
-                                    )
-                                    rule_config["name"] = rule_name
-                                    rule_config["provider_name"] = provider_name
-                                    rule_config["provider_type"] = provider_type
-                                    deduplication_rules_dict[rule_name] = rule_config
-
-                                # Provision deduplication rules
-                                provision_deduplication_rules(
-                                    deduplication_rules=deduplication_rules_dict,
-                                    tenant_id=tenant_id,
-                                )
-                    except Exception as e:
-                        logger.error(
-                            "Error provisioning provider from directory",
-                            extra={"exception": e},
+                        # Configure deduplication rules
+                        deduplication_rules = provider_info.get(
+                            "deduplication_rules", {}
                         )
+                        logger.info(
+                            f"Provisioning deduplication rules for provider {provider_name}"
+                        )
+                        with Session(engine) as session:
+                            ProvidersService.provision_provider_deduplication_rules(
+                                tenant_id=tenant_id,
+                                provider=provider,
+                                deduplication_rules=deduplication_rules,
+                                session=session,
+                            )
 
-            # Un-provisioning other providers.
+            # Delete providers that are not in the incoming list
             for provider in provisioned_providers:
-                if provider.name not in installed_providers:
-                    with Session(engine) as session:
+                if provider.name not in incoming_providers_names:
+                    try:
                         logger.info(
-                            f"Deprovisioning provider {provider.name} as its file no longer exists or is outside the providers directory"
+                            f"Provider {provider.name} not found in incoming provisioned providers, deleting..."
                         )
                         ProvidersService.delete_provider(
-                            tenant_id, provider.id, session, allow_provisioned=True
+                            tenant_id=tenant_id,
+                            provider_id=provider.id,
+                            allow_provisioned=True,
                         )
-                        logger.info(
-                            f"Provider {provider.name} deprovisioned successfully"
+                        logger.info(f"Provider {provider.name} deleted successfully")
+                    except Exception as e:
+                        logger.error(
+                            f"Error deleting provider {provider.name}",
+                            extra={"exception": e},
                         )
+                        continue
+
+            logger.info(
+                "Providers provisioning completed. Provisioned providers: %s",
+                (
+                    ", ".join(incoming_providers_names)
+                    if incoming_providers_names
+                    else "None"
+                ),
+            )
+        except Exception as e:
+            logger.error("Provisioning failed", extra={"exception": e})
 
     @staticmethod
     def get_provider_logs(
diff --git a/tests/deduplication/test_deduplications.py b/tests/deduplication/test_deduplications.py
index 1f367868d7..f95117bdf7 100644
--- a/tests/deduplication/test_deduplications.py
+++ b/tests/deduplication/test_deduplications.py
@@ -10,8 +10,8 @@
 
 from keep.api.core.db import get_last_alerts
 from keep.api.core.dependencies import SINGLE_TENANT_UUID
-from keep.api.models.alert import DeduplicationRuleDto, AlertStatus
-from keep.api.models.db.alert import AlertDeduplicationRule, AlertDeduplicationEvent, Alert
+from keep.api.models.alert import AlertStatus
+from keep.api.models.db.alert import Alert, AlertDeduplicationRule
 from keep.api.utils.enrichment_helpers import convert_db_alerts_to_dto_alerts
 from keep.providers.providers_factory import ProvidersFactory
 from tests.fixtures.client import client, setup_api_key, test_app  # noqa
@@ -857,7 +857,9 @@ def test_full_deduplication_last_received(db_session, create_alert):
     db_session.exec(text("DELETE FROM alertdeduplicationrule"))
     dedup = AlertDeduplicationRule(
         name="Test Rule",
-        fingerprint_fields=["service",],
+        fingerprint_fields=[
+            "service",
+        ],
         full_deduplication=True,
         ignore_fields=["fingerprint", "lastReceived", "id"],
         is_provisioned=True,
@@ -879,30 +881,30 @@ def test_full_deduplication_last_received(db_session, create_alert):
         None,
         AlertStatus.FIRING,
         dt1,
-        {
-            "source": ["keep"],
-            "service": "service"
-        },
+        {"source": ["keep"], "service": "service"},
     )
 
     assert db_session.query(Alert).count() == 1
     alerts = get_last_alerts(SINGLE_TENANT_UUID)
     alerts_dto = convert_db_alerts_to_dto_alerts(alerts)
 
-    assert alerts_dto[0].lastReceived == dt1.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+    assert (
+        alerts_dto[0].lastReceived
+        == dt1.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+    )
 
     create_alert(
         None,
         AlertStatus.FIRING,
         dt2,
-        {
-            "source": ["keep"],
-            "service": "service"
-        },
+        {"source": ["keep"], "service": "service"},
     )
 
     assert db_session.query(Alert).count() == 1
     alerts = get_last_alerts(SINGLE_TENANT_UUID)
     alerts_dto = convert_db_alerts_to_dto_alerts(alerts)
 
-    assert alerts_dto[0].lastReceived == dt2.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+    assert (
+        alerts_dto[0].lastReceived
+        == dt2.astimezone(pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+    )
diff --git a/tests/deduplication/test_deduplications_provisioning.py b/tests/deduplication/test_deduplications_provisioning.py
deleted file mode 100644
index edc0736431..0000000000
--- a/tests/deduplication/test_deduplications_provisioning.py
+++ /dev/null
@@ -1,227 +0,0 @@
-import json
-from uuid import UUID
-import pytest
-from keep.api.alert_deduplicator.deduplication_rules_provisioning import (
-    provision_deduplication_rules_from_env,
-)
-from unittest.mock import patch
-from keep.api.models.db.alert import AlertDeduplicationRule
-from keep.api.models.provider import Provider
-
-
-@pytest.fixture
-def setup(monkeypatch):
-    providers_in_env_var = {
-        "Installed Prometheus provider": {
-            "type": "prometheus",
-            "deduplication_rules": {
-                "provisioned fake existing deduplication rule": {
-                    "description": "new description",
-                    "fingerprint_fields": ["source"],
-                    "full_deduplication": True,
-                    "ignore_fields": ["ignore_field"],
-                }
-            }
-        },
-        "Installed Grafana provider": {
-            "type": "grafana",
-            "deduplication_rules": {
-                "fake new deduplication rule": {
-                    "description": "fake new deduplication rule description",
-                    "fingerprint_fields": ["fingerprint"],
-                    "full_deduplication": False,
-                }
-            }
-        },
-    }
-
-    deduplication_rules_in_db = [
-        AlertDeduplicationRule(
-            id=UUID("f3a2b76c8430491da71684de9cf257ab"),
-            tenant_id="fake_tenant_id",
-            name="provisioned fake existing deduplication rule",
-            description="provisioned fake existing deduplication rule description",
-            provider_id="edc4d65d53204cefb511321be98f748e",
-            provider_type="prometheus",
-            last_updated_by="system",
-            created_by="system",
-            fingerprint_fields=["fingerprint", "source", "service"],
-            full_deduplication=False,
-            is_provisioned=True,
-        ),
-        AlertDeduplicationRule(
-            id=UUID("a5d8f32b6c7049efb913c21da7e845fd"),
-            tenant_id="fake_tenant_id",
-            name="provisioned fake deduplication rule to delete",
-            description="fake new deduplication rule description",
-            provider_id="a1b2c3d4e5f64789ab1234567890abcd",
-            provider_type="grafana",
-            last_updated_by="system",
-            created_by="system",
-            fingerprint_fields=["fingerprint"],
-            full_deduplication=False,
-            is_provisioned=True,
-        ),
-        AlertDeduplicationRule(
-            id=UUID("c7e3d28f95104b6a8f12dc45eb7639fa"),
-            tenant_id="fake_tenant_id",
-            name="not provisioned fake deduplication rule",
-            description="not provisioned fake deduplication rule",
-            provider_id="a1b2c3d4e5f64789ab1234567890abcd",
-            provider_type="grafana",
-            last_updated_by="user",
-            created_by="user",
-            fingerprint_fields=["fingerprint"],
-            full_deduplication=False,
-            is_provisioned=False,
-        ),
-    ]
-    installed_providers = [
-        Provider(
-            id="edc4d65d53204cefb511321be98f748e",
-            display_name="Prometheus",
-            type="prometheus",
-            details={"name": "Installed Prometheus provider"},
-            can_query=True,
-            can_notify=True,
-        ),
-        Provider(
-            id="p2b2c3d4e5f64789ab1234567890abcd",
-            display_name="Prometheus",
-            type="prometheus",
-            details={"name": "Installed Prometheus provider second"},
-            can_query=True,
-            can_notify=True,
-        ),
-        Provider(
-            id="a1b2c3d4e5f64789ab1234567890abcd",
-            display_name="Grafana",
-            type="grafana",
-            details={"name": "Installed Grafana provider"},
-            can_query=True,
-            can_notify=True,
-        )
-    ]
-
-    linked_providers = [
-        Provider(
-            id="abcda1b2c3d4e5f64789ab1234567890",
-            display_name="Grafana",
-            type="grafana",
-            can_query=True,
-            can_notify=True,
-        )
-    ]
-
-    with patch(
-        "keep.api.core.db.get_all_deduplication_rules",
-        return_value=deduplication_rules_in_db,
-    ) as mock_get_all, patch(
-        "keep.api.core.db.delete_deduplication_rule", return_value=None
-    ) as mock_delete, patch(
-        "keep.api.core.db.update_deduplication_rule", return_value=None
-    ) as mock_update, patch(
-        "keep.api.core.db.create_deduplication_rule", return_value=None
-    ) as mock_create, patch(
-        "keep.providers.providers_factory.ProvidersFactory.get_installed_providers",
-        return_value=installed_providers,
-    ) as mock_get_providers, patch(
-        "keep.providers.providers_factory.ProvidersFactory.get_linked_providers",
-        return_value=linked_providers,
-    ) as mock_get_linked_providers:
-
-        fake_tenant_id = "fake_tenant_id"
-        monkeypatch.setenv(
-            "KEEP_PROVIDERS", json.dumps(providers_in_env_var)
-        )
-
-        yield {
-            "mock_get_all": mock_get_all,
-            "mock_delete": mock_delete,
-            "mock_update": mock_update,
-            "mock_create": mock_create,
-            "mock_get_providers": mock_get_providers,
-            "mock_get_linked_providers": mock_get_linked_providers,
-            "fake_tenant_id": fake_tenant_id,
-            "providers_in_env_var": providers_in_env_var,
-            "deduplication_rules_in_db": deduplication_rules_in_db,
-            "linked_providers": linked_providers,
-            "installed_providers": installed_providers,
-        }
-
-
-def test_provisioning_of_new_rule(setup):
-    """
-    Test the provisioning of new deduplication rules from the environment.
-    """
-    provision_deduplication_rules_from_env(setup["fake_tenant_id"])
-    setup["mock_create"].assert_called_once_with(
-        tenant_id=setup["fake_tenant_id"],
-        name="fake new deduplication rule",
-        description="fake new deduplication rule description",
-        provider_id="a1b2c3d4e5f64789ab1234567890abcd",
-        provider_type="grafana",
-        created_by="system",
-        enabled=True,
-        fingerprint_fields=["fingerprint"],
-        full_deduplication=False,
-        ignore_fields=[],
-        priority=0,
-        is_provisioned=True,
-    )
-
-
-def test_provisioning_of_existing_rule(setup):
-    """
-    Test the provisioning of new deduplication rules from the environment.
-    """
-    provision_deduplication_rules_from_env(setup["fake_tenant_id"])
-    setup["mock_update"].assert_called_once_with(
-        tenant_id=setup["fake_tenant_id"],
-        rule_id=str(UUID("f3a2b76c8430491da71684de9cf257ab")),
-        name="provisioned fake existing deduplication rule",
-        description="new description",
-        provider_id="edc4d65d53204cefb511321be98f748e",
-        provider_type="prometheus",
-        last_updated_by="system",
-        enabled=True,
-        fingerprint_fields=["source"],
-        full_deduplication=True,
-        ignore_fields=["ignore_field"],
-        priority=0,
-    )
-
-
-def test_deletion_of_provisioned_rule_not_in_env(setup):
-    """
-    Test the provisioning of new deduplication rules from the environment.
-    """
-    provision_deduplication_rules_from_env(setup["fake_tenant_id"])
-    setup["mock_delete"].assert_called_once_with(
-        tenant_id=setup["fake_tenant_id"],
-        rule_id=str(UUID("a5d8f32b6c7049efb913c21da7e845fd")),
-    )
-
-def test_not_throwing_error_if_env_var_empty(setup, monkeypatch):
-    monkeypatch.setenv(
-        "KEEP_PROVIDERS", ''
-    )
-    try:
-        provision_deduplication_rules_from_env(setup["fake_tenant_id"])
-    except Exception as e:
-        pytest.fail(f"provision_deduplication_rules_from_env raised an exception: {e}")
-
-def test_not_throwing_error_if_providers_do_not_have_dedup_rules(setup, monkeypatch):
-    providers_in_env_var = {
-        "Installed Prometheus provider": {
-            "type": "prometheus"
-        }
-    }
-
-    monkeypatch.setenv(
-        "KEEP_PROVIDERS", json.dumps(providers_in_env_var)
-    )
-    try:
-        provision_deduplication_rules_from_env(setup["fake_tenant_id"])
-    except Exception as e:
-        pytest.fail(f"provision_deduplication_rules_from_env raised an exception: {e}")    
diff --git a/tests/test_providers_yaml_provisioning.py b/tests/test_providers_yaml_provisioning.py
index 990e1ccbb5..6e95ddb299 100644
--- a/tests/test_providers_yaml_provisioning.py
+++ b/tests/test_providers_yaml_provisioning.py
@@ -69,19 +69,23 @@ def test_provision_provider_from_yaml(temp_providers_dir, sample_provider_yaml,
 
     # Mock environment variables
     with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
-        with patch(
-            "keep.providers.providers_service.ProvidersService.is_provider_installed",
-            return_value=False,
-        ), patch(
-            "keep.providers.providers_service.ProvidersService.install_provider",
-            return_value=mock_provider,
-        ) as mock_install, patch(
-            "keep.providers.providers_service.provision_deduplication_rules"
-        ) as mock_provision_rules, patch(
-            "keep.api.core.db.get_all_provisioned_providers", return_value=[]
-        ), patch(
-            "keep.providers.providers_factory.ProvidersFactory.get_installed_providers",
-            return_value=[mock_provider],
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider",
+                return_value=mock_provider,
+            ) as mock_install,
+            patch(
+                "keep.providers.providers_service.ProvidersService.provision_provider_deduplication_rules"
+            ) as mock_provision_provider_rules,
+            patch("keep.api.core.db.get_all_provisioned_providers", return_value=[]),
+            patch(
+                "keep.providers.providers_factory.ProvidersFactory.get_installed_providers",
+                return_value=[mock_provider],
+            ),
         ):
             # Call the provisioning function
             ProvidersService.provision_providers("test-tenant")
@@ -98,38 +102,11 @@ def test_provision_provider_from_yaml(temp_providers_dir, sample_provider_yaml,
             }
 
             # Verify deduplication rules provisioning was called
-            mock_provision_rules.assert_called_once()
-            call_args = mock_provision_rules.call_args[1]
+            mock_provision_provider_rules.assert_called_once()
+            call_args = mock_provision_provider_rules.call_args[1]
             assert call_args["tenant_id"] == "test-tenant"
-            assert len(call_args["deduplication_rules"]) > 0
-            rule = list(call_args["deduplication_rules"].values())[0]
-            assert rule["description"] == "Test deduplication rule"
-            assert rule["fingerprint_fields"] == ["fingerprint", "source"]
-            assert rule["full_deduplication"] is True
-            assert rule["ignore_fields"] == ["name"]
-
-
-def test_skip_existing_provider(temp_providers_dir, sample_provider_yaml):
-    """Test that existing providers are skipped during provisioning"""
-    # Create a YAML file
-    provider_file = os.path.join(temp_providers_dir, "test_provider.yaml")
-    with open(provider_file, "w") as f:
-        f.write(sample_provider_yaml)
-
-    # Mock environment variables
-    with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
-        # Mock database operations to simulate existing provider
-        with patch(
-            "keep.providers.providers_service.ProvidersService.is_provider_installed",
-            return_value=True,
-        ), patch(
-            "keep.providers.providers_service.ProvidersService.install_provider"
-        ) as mock_install:
-            # Call the provisioning function
-            ProvidersService.provision_providers("test-tenant")
-
-            # Verify provider installation was not called
-            mock_install.assert_not_called()
+            assert "provider" in call_args
+            assert "deduplication_rules" in call_args
 
 
 def test_invalid_yaml_file(temp_providers_dir):
@@ -142,12 +119,15 @@ def test_invalid_yaml_file(temp_providers_dir):
     # Mock environment variables
     with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
         # Mock database operations
-        with patch(
-            "keep.providers.providers_service.ProvidersService.is_provider_installed",
-            return_value=False,
-        ), patch(
-            "keep.providers.providers_service.ProvidersService.install_provider"
-        ) as mock_install:
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider"
+            ) as mock_install,
+        ):
             # Call the provisioning function
             ProvidersService.provision_providers("test-tenant")
 
@@ -172,14 +152,229 @@ def test_missing_required_fields(temp_providers_dir):
     # Mock environment variables
     with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
         # Mock database operations
-        with patch(
-            "keep.providers.providers_service.ProvidersService.is_provider_installed",
-            return_value=False,
-        ), patch(
-            "keep.providers.providers_service.ProvidersService.install_provider"
-        ) as mock_install:
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider"
+            ) as mock_install,
+        ):
             # Call the provisioning function
             ProvidersService.provision_providers("test-tenant")
 
             # Verify provider installation was not called
             mock_install.assert_not_called()
+
+
+def test_provider_yaml_with_multiple_deduplication_rules(temp_providers_dir, caplog):
+    """Test provisioning a provider from YAML file with multiple deduplication rules"""
+    yaml_content = """
+name: test-victoriametrics
+type: victoriametrics
+authentication:
+  VMAlertHost: http://localhost
+  VMAlertPort: 1234
+deduplication_rules:
+  rule1:
+    description: First deduplication rule
+    fingerprint_fields:
+      - fingerprint
+      - source
+    full_deduplication: true
+    ignore_fields:
+      - name
+  rule2:
+    description: Second deduplication rule
+    fingerprint_fields:
+      - alert_id
+      - service
+    full_deduplication: false
+    ignore_fields:
+      - lastReceived
+"""
+    # Create a YAML file
+    provider_file = os.path.join(temp_providers_dir, "test_provider.yaml")
+    with open(provider_file, "w") as f:
+        f.write(yaml_content)
+
+    # Mock provider
+    mock_provider = MagicMock(
+        type="victoriametrics",
+        id="test-provider-id",
+        details={
+            "name": "test-victoriametrics",
+            "authentication": {"VMAlertHost": "http://localhost", "VMAlertPort": 1234},
+        },
+        validatedScopes={},
+    )
+
+    # Mock environment variables and services
+    with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider",
+                return_value=mock_provider,
+            ) as mock_install,
+            patch(
+                "keep.providers.providers_service.ProvidersService.provision_provider_deduplication_rules"
+            ) as mock_provision_provider_rules,
+            patch("keep.api.core.db.get_all_provisioned_providers", return_value=[]),
+        ):
+            # Call the provisioning function
+            ProvidersService.provision_providers("test-tenant")
+
+            # Verify provider installation
+            mock_install.assert_called_once()
+
+            # Verify deduplication rules provisioning
+            mock_provision_provider_rules.assert_called_once()
+            call_args = mock_provision_provider_rules.call_args[1]
+            assert call_args["tenant_id"] == "test-tenant"
+
+            rules = call_args["deduplication_rules"]
+            assert len(rules) == 2
+
+            rule1 = rules["rule1"]
+            assert rule1["description"] == "First deduplication rule"
+            assert rule1["fingerprint_fields"] == ["fingerprint", "source"]
+            assert rule1["full_deduplication"] is True
+            assert rule1["ignore_fields"] == ["name"]
+
+            rule2 = rules["rule2"]
+            assert rule2["description"] == "Second deduplication rule"
+            assert rule2["fingerprint_fields"] == ["alert_id", "service"]
+            assert rule2["full_deduplication"] is False
+            assert rule2["ignore_fields"] == ["lastReceived"]
+
+
+def test_provider_yaml_with_empty_deduplication_rules(temp_providers_dir, caplog):
+    """Test provisioning a provider from YAML file with empty deduplication rules"""
+    yaml_content = """
+name: test-victoriametrics
+type: victoriametrics
+authentication:
+  VMAlertHost: http://localhost
+  VMAlertPort: 1234
+deduplication_rules: {}
+"""
+    # Create a YAML file
+    provider_file = os.path.join(temp_providers_dir, "test_provider.yaml")
+    with open(provider_file, "w") as f:
+        f.write(yaml_content)
+
+    # Mock provider
+    mock_provider = MagicMock(
+        type="victoriametrics",
+        id="test-provider-id",
+        details={
+            "name": "test-victoriametrics",
+            "authentication": {"VMAlertHost": "http://localhost", "VMAlertPort": 1234},
+        },
+        validatedScopes={},
+    )
+
+    # Mock environment variables and services
+    with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider",
+                return_value=mock_provider,
+            ) as mock_install,
+            patch(
+                "keep.providers.providers_service.ProvidersService.provision_provider_deduplication_rules"
+            ) as mock_provision_provider_rules,
+            patch("keep.api.core.db.get_all_provisioned_providers", return_value=[]),
+        ):
+            # Call the provisioning function
+            ProvidersService.provision_providers("test-tenant")
+
+            # Verify provider installation was called
+            mock_install.assert_called_once()
+
+            # Verify deduplication rules provisioning was called with empty rules
+            mock_provision_provider_rules.assert_called_once()
+            call_args = mock_provision_provider_rules.call_args[1]
+            assert call_args["tenant_id"] == "test-tenant"
+            assert call_args["deduplication_rules"] == {}
+
+
+def test_provider_yaml_with_invalid_deduplication_rules(temp_providers_dir, caplog):
+    """Test provisioning a provider from YAML file with invalid deduplication rules"""
+    yaml_content = """
+name: test-victoriametrics
+type: victoriametrics
+authentication:
+  VMAlertHost: http://localhost
+  VMAlertPort: 1234
+deduplication_rules:
+  invalid_rule:
+    # Missing required fields
+    description: Invalid rule
+"""
+    # Create a YAML file
+    provider_file = os.path.join(temp_providers_dir, "test_provider.yaml")
+    with open(provider_file, "w") as f:
+        f.write(yaml_content)
+
+    # Mock provider
+    mock_provider = MagicMock(
+        type="victoriametrics",
+        id="test-provider-id",
+        details={
+            "name": "test-victoriametrics",
+            "authentication": {"VMAlertHost": "http://localhost", "VMAlertPort": 1234},
+        },
+        validatedScopes={},
+    )
+
+    # Mock environment variables and services
+    with patch.dict(os.environ, {"KEEP_PROVIDERS_DIRECTORY": temp_providers_dir}):
+        with (
+            patch(
+                "keep.providers.providers_service.ProvidersService.is_provider_installed",
+                return_value=False,
+            ),
+            patch(
+                "keep.providers.providers_service.ProvidersService.install_provider",
+                return_value=mock_provider,
+            ) as mock_install,
+            patch(
+                "keep.providers.providers_service.ProvidersService.provision_provider_deduplication_rules"
+            ) as mock_provision_provider_rules,
+            patch("keep.api.core.db.get_all_provisioned_providers", return_value=[]),
+            patch(
+                "sqlmodel.Session",
+                MagicMock(
+                    return_value=MagicMock(
+                        __enter__=MagicMock(return_value=MagicMock()),
+                        __exit__=MagicMock(),
+                    )
+                ),
+            ),
+        ):
+            # Call the provisioning function
+            ProvidersService.provision_providers("test-tenant")
+
+            # Verify provider installation was called
+            mock_install.assert_called_once()
+
+            # Verify deduplication rules provisioning was called
+            mock_provision_provider_rules.assert_called_once()
+            call_args = mock_provision_provider_rules.call_args[1]
+            assert call_args["tenant_id"] == "test-tenant"
+
+            # Even invalid rules should be passed through, validation happens in provision_deduplication_rules
+            assert len(call_args["deduplication_rules"]) == 1
+            rule = call_args["deduplication_rules"]["invalid_rule"]
+            assert rule["description"] == "Invalid rule"
+            assert "fingerprint_fields" not in rule
diff --git a/tests/test_provisioning.py b/tests/test_provisioning.py
index e987e29271..e173f97b2d 100644
--- a/tests/test_provisioning.py
+++ b/tests/test_provisioning.py
@@ -162,7 +162,7 @@ def test_reprovision_workflow(monkeypatch, db_session, client, test_app):
     [
         {
             "AUTH_TYPE": "NOAUTH",
-            "KEEP_PROVIDERS": '{"keepVictoriaMetrics":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort": 1234}},"keepClickhouse1":{"type":"clickhouse","authentication":{"host":"http://localhost","port":1234,"username":"keep","password":"keep","database":"keep-db"}}}',
+            "KEEP_PROVIDERS": '{"keepVictoriaMetrics1":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort": 1234}},"keepClickhouse1":{"type":"clickhouse","authentication":{"host":"http://localhost","port":1234,"username":"keep","password":"keep","database":"keep-db"}}}',
         },
     ],
     indirect=True,
@@ -183,7 +183,7 @@ def test_provision_provider(db_session, client, test_app):
     [
         {
             "AUTH_TYPE": "NOAUTH",
-            "KEEP_PROVIDERS": '{"keepVictoriaMetrics":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort": 1234}},"keepClickhouse1":{"type":"clickhouse","authentication":{"host":"http://localhost","port":1234,"username":"keep","password":"keep","database":"keep-db"}}}',
+            "KEEP_PROVIDERS": '{"keepVictoriaMetric2":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort": 1234}},"keepClickhouse1":{"type":"clickhouse","authentication":{"host":"http://localhost","port":1234,"username":"keep","password":"keep","database":"keep-db"}}}',
         },
     ],
     indirect=True,
@@ -191,7 +191,6 @@ def test_provision_provider(db_session, client, test_app):
 def test_reprovision_provider(monkeypatch, db_session, client, test_app):
     response = client.get("/providers", headers={"x-api-key": "someapikey"})
     assert response.status_code == 200
-    # 3 workflows and 3 provisioned workflows
     providers = response.json()
     provisioned_providers = [
         p for p in providers.get("installed_providers") if p.get("provisioned")
@@ -371,3 +370,438 @@ def test_provision_provider_with_empty_tenant_table(db_session, client, test_app
     )
 
     db_session.execute(text("PRAGMA foreign_keys = OFF;"))
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [{"AUTH_TYPE": "NOAUTH"}],
+    indirect=True,
+)
+def test_no_provisioned_providers_and_unset_env_vars(
+    monkeypatch, db_session, client, test_app
+):
+    """Test behavior when there are no provisioned providers and env vars are unset"""
+    # Import necessary modules
+    from unittest.mock import patch
+
+    from keep.providers.providers_service import ProvidersService
+
+    # Mock get_all_provisioned_providers to return an empty list
+    with (
+        patch(
+            "keep.providers.providers_service.get_all_provisioned_providers",
+            return_value=[],
+        ) as mock_get_providers,
+        patch(
+            "keep.providers.providers_service.ProvidersService.delete_provider"
+        ) as mock_delete_provider,
+    ):
+        # Call provision_providers without setting any env vars
+        ProvidersService.provision_providers("test-tenant")
+
+        # Verify get_all_provisioned_providers was called
+        mock_get_providers.assert_called_once_with("test-tenant")
+
+        # Verify delete_provider was not called since there were no providers to delete
+        mock_delete_provider.assert_not_called()
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [{"AUTH_TYPE": "NOAUTH"}],
+    indirect=True,
+)
+def test_delete_provisioned_providers_when_env_vars_unset(
+    monkeypatch, db_session, client, test_app
+):
+    """Test deleting provisioned providers when env vars are unset"""
+    # Import necessary modules
+    from unittest.mock import MagicMock, patch
+
+    from keep.providers.providers_service import ProvidersService
+
+    # Create a mock provider
+    mock_provider = MagicMock(id="test-id", name="test-provider", type="test-type")
+
+    # Mock get_all_provisioned_providers to return our mock provider
+    with (
+        patch(
+            "keep.providers.providers_service.get_all_provisioned_providers",
+            return_value=[mock_provider],
+        ) as mock_get_providers,
+        patch(
+            "keep.providers.providers_service.ProvidersService.delete_provider"
+        ) as mock_delete_provider,
+    ):
+        # Call provision_providers without setting any env vars
+        ProvidersService.provision_providers("test-tenant")
+
+        # Verify get_all_provisioned_providers was called
+        mock_get_providers.assert_called_once_with("test-tenant")
+
+        # Verify delete_provider was called with correct parameters
+        mock_delete_provider.assert_called_once_with(
+            tenant_id="test-tenant",
+            provider_id="test-id",
+            allow_provisioned=True,
+        )
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"existingProvider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort": 1234}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_replace_existing_provisioned_provider(
+    monkeypatch, db_session, client, test_app
+):
+    """Test that when a new provider is provisioned via KEEP_PROVIDERS without including
+    the current provisioned provider, it removes the current one and installs the new one
+    """
+
+    # First verify the initial provider is installed
+    response = client.get("/providers", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    providers = response.json()
+    provisioned_providers = [
+        p for p in providers.get("installed_providers") if p.get("provisioned")
+    ]
+    assert len(provisioned_providers) == 1
+    # Provider name is in the details
+    provider_details = provisioned_providers[0].get("details", {})
+    assert provider_details.get("name") == "existingProvider"
+    assert provisioned_providers[0]["type"] == "victoriametrics"
+
+    # Change environment variable to new provider config that doesn't include the existing one
+    monkeypatch.setenv(
+        "KEEP_PROVIDERS",
+        '{"newProvider":{"type":"prometheus","authentication":{"url":"http://localhost:9090"}}}',
+    )
+
+    # Reload the app to apply the new environment changes
+    importlib.reload(sys.modules["keep.api.api"])
+    from keep.api.api import get_app
+
+    app = get_app()
+
+    # Manually trigger the startup event
+    for event_handler in app.router.on_startup:
+        asyncio.run(event_handler())
+
+    # Manually trigger the provision resources
+    from keep.api.config import provision_resources
+
+    provision_resources()
+
+    client = TestClient(app)
+
+    # Verify that the old provider is gone and new provider is installed
+    response = client.get("/providers", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    providers = response.json()
+    provisioned_providers = [
+        p for p in providers.get("installed_providers") if p.get("provisioned")
+    ]
+    assert len(provisioned_providers) == 1
+    provider_details = provisioned_providers[0].get("details", {})
+    assert provider_details.get("name") == "newProvider"
+    assert provisioned_providers[0]["type"] == "prometheus"
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"rule1":{"description":"First rule","fingerprint_fields":["fingerprint","source"],"ignore_fields":["name"]}}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_delete_deduplication_rules_when_reprovisioning(
+    monkeypatch, db_session, client, test_app
+):
+    """Test that deduplication rules are deleted when reprovisioning a provider without rules"""
+
+    # First verify initial provider and rule are installed
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+    assert len(rules) - 1 == 1
+    assert rules[1]["name"] == "rule1"
+
+    # Update provider config without any deduplication rules
+    monkeypatch.setenv(
+        "KEEP_PROVIDERS",
+        '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234}}}',
+    )
+
+    # Reload the app to apply the new environment changes
+    importlib.reload(sys.modules["keep.api.api"])
+    from keep.api.api import get_app
+
+    app = get_app()
+
+    # Manually trigger the startup event
+    for event_handler in app.router.on_startup:
+        asyncio.run(event_handler())
+
+    # Manually trigger the provision resources
+    from keep.api.config import provision_resources
+
+    provision_resources()
+
+    client = TestClient(app)
+
+    # Verify the rule was deleted
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+    assert len(rules) == 0
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"rule1":{"description":"First rule","fingerprint_fields":["fingerprint","source"]},"rule2":{"description":"Second rule","fingerprint_fields":["alert_id"]}}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_provision_provider_with_multiple_deduplication_rules(
+    db_session, client, test_app
+):
+    """Test provisioning a provider with multiple deduplication rules"""
+
+    # Verify the provider and rules are installed
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+    assert len(rules) - 1 == 2
+
+    rule1 = next(r for r in rules[1:] if r["name"] == "rule1")
+    assert rule1["description"] == "First rule"
+    assert rule1["fingerprint_fields"] == ["fingerprint", "source"]
+    assert rule1["is_provisioned"] is True
+
+    rule2 = next(r for r in rules if r["name"] == "rule2")
+    assert rule2["description"] == "Second rule"
+    assert rule2["fingerprint_fields"] == ["alert_id"]
+    assert rule2["is_provisioned"] is True
+
+    # Verify both rules are associated with the same provider
+    assert rule1["provider_type"] == "victoriametrics"
+    assert rule2["provider_type"] == "victoriametrics"
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"rule1":{"description":"First rule","fingerprint_fields":["fingerprint","source"]},"rule2":{"description":"Second rule","fingerprint_fields":["alert_id"]}}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_update_deduplication_rules_when_reprovisioning(
+    monkeypatch, db_session, client, test_app
+):
+    """Test that old deduplication rules are deleted and new ones are created when reprovisioning a provider with different rules"""
+
+    # First verify initial provider and both rules are installed
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+    assert len(rules) - 1 == 2  # Subtract 1 to exclude the default rule
+
+    rule_names = [r["name"] for r in rules]
+    assert "rule1" in rule_names
+    assert "rule2" in rule_names
+
+    # Update provider config with one rule removed and one rule updated and one new rule
+    monkeypatch.setenv(
+        "KEEP_PROVIDERS",
+        '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"rule1":{"description":"Updated first rule","fingerprint_fields":["fingerprint","source","severity"]},"rule3":{"description":"New rule","fingerprint_fields":["alert_id","group"]}}}}',
+    )
+
+    # Reload the app to apply the new environment changes
+    importlib.reload(sys.modules["keep.api.api"])
+    from keep.api.api import get_app
+
+    app = get_app()
+
+    # Manually trigger the startup event
+    for event_handler in app.router.on_startup:
+        asyncio.run(event_handler())
+
+    # Manually trigger the provision resources
+    from keep.api.config import provision_resources
+
+    provision_resources()
+
+    client = TestClient(app)
+
+    # Verify the rules were updated correctly
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+
+    rule_names = [r["name"] for r in rules]
+    assert "rule1" in rule_names
+    assert "rule2" not in rule_names  # rule2 should be deleted
+    assert "rule3" in rule_names  # rule3 should be added
+
+    # Verify rule1 was updated
+    rule1 = next(r for r in rules if r["name"] == "rule1")
+    assert rule1["description"] == "Updated first rule"
+    assert rule1["fingerprint_fields"] == ["fingerprint", "source", "severity"]
+
+    # Verify rule3 was added
+    rule3 = next(r for r in rules if r["name"] == "rule3")
+    assert rule3["description"] == "New rule"
+    assert rule3["fingerprint_fields"] == ["alert_id", "group"]
+
+    # Verify both rules are associated with the same provider
+    assert rule1["provider_type"] == "victoriametrics"
+    assert rule3["provider_type"] == "victoriametrics"
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"vm_rule1":{"description":"VM Rule","fingerprint_fields":["fingerprint"]}}}, "pagerduty_provider":{"type":"pagerduty","authentication":{"api_key":"somekey","routing_key":"routingkey123"},"deduplication_rules":{"pd_rule1":{"description":"PD Rule","fingerprint_fields":["id"]}}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_multiple_providers_with_deduplication_rules(
+    monkeypatch, db_session, client, test_app
+):
+    """Test that deduplication rules for different providers don't interfere with each other"""
+
+    # First verify both providers and their rules are installed
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+
+    rule_names = [r["name"] for r in rules]
+    assert "vm_rule1" in rule_names
+    assert "pd_rule1" in rule_names
+
+    # Update only the vm_provider, removing its rule and adding a new one
+    monkeypatch.setenv(
+        "KEEP_PROVIDERS",
+        '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"vm_rule2":{"description":"New VM Rule","fingerprint_fields":["name"]}}}, "pagerduty_provider":{"type":"pagerduty","authentication":{"api_key":"somekey"},"deduplication_rules":{"pd_rule1":{"description":"PD Rule","fingerprint_fields":["id"]}}}}',
+    )
+
+    # Reload the app to apply the new environment changes
+    importlib.reload(sys.modules["keep.api.api"])
+    from keep.api.api import get_app
+
+    app = get_app()
+
+    # Manually trigger the startup event
+    for event_handler in app.router.on_startup:
+        asyncio.run(event_handler())
+
+    # Manually trigger the provision resources
+    from keep.api.config import provision_resources
+
+    provision_resources()
+
+    client = TestClient(app)
+
+    # Verify the rules were updated correctly
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+
+    rule_names = [r["name"] for r in rules]
+    assert "vm_rule1" not in rule_names  # vm_rule1 should be deleted
+    assert "vm_rule2" in rule_names  # vm_rule2 should be added
+    assert "pd_rule1" in rule_names  # pd_rule1 should be kept
+
+    # Verify vm_rule2 was added correctly
+    vm_rule2 = next(r for r in rules if r["name"] == "vm_rule2")
+    assert vm_rule2["description"] == "New VM Rule"
+    assert vm_rule2["fingerprint_fields"] == ["name"]
+    assert vm_rule2["provider_type"] == "victoriametrics"
+
+    # Verify pd_rule1 was kept unchanged
+    pd_rule1 = next(r for r in rules if r["name"] == "pd_rule1")
+    assert pd_rule1["description"] == "PD Rule"
+    assert pd_rule1["fingerprint_fields"] == ["id"]
+    assert pd_rule1["provider_type"] == "pagerduty"
+
+
+@pytest.mark.parametrize(
+    "test_app",
+    [
+        {
+            "AUTH_TYPE": "NOAUTH",
+            "KEEP_PROVIDERS": '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"vm_rule1":{"description":"VM Rule","fingerprint_fields":["fingerprint"]}}}, "pagerduty_provider":{"type":"pagerduty","authentication":{"api_key":"somekey","routing_key":"routingkey123"},"deduplication_rules":{"pd_rule1":{"description":"PD Rule","fingerprint_fields":["id"]}}}}',
+        },
+    ],
+    indirect=True,
+)
+def test_deleting_provider_removes_deduplication_rules(
+    monkeypatch, db_session, client, test_app
+):
+    """Test that when a provider is deleted, its associated deduplication rules are deleted as well"""
+
+    # First verify both providers and their rules are installed
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+
+    rule_names = [r["name"] for r in rules]
+    assert "vm_rule1" in rule_names
+    assert "pd_rule1" in rule_names
+
+    # Remove the pagerduty_provider completely
+    monkeypatch.setenv(
+        "KEEP_PROVIDERS",
+        '{"vm_provider":{"type":"victoriametrics","authentication":{"VMAlertHost":"http://localhost","VMAlertPort":1234},"deduplication_rules":{"vm_rule1":{"description":"VM Rule","fingerprint_fields":["fingerprint"]}}}}',
+    )
+
+    # Reload the app to apply the new environment changes
+    importlib.reload(sys.modules["keep.api.api"])
+    from keep.api.api import get_app
+
+    app = get_app()
+
+    # Manually trigger the startup event
+    for event_handler in app.router.on_startup:
+        asyncio.run(event_handler())
+
+    # Manually trigger the provision resources
+    from keep.api.config import provision_resources
+
+    provision_resources()
+
+    client = TestClient(app)
+
+    # Verify the rules were updated correctly
+    response = client.get("/deduplications", headers={"x-api-key": "someapikey"})
+    assert response.status_code == 200
+    rules = response.json()
+
+    rule_names = [r["name"] for r in rules]
+    assert "vm_rule1" in rule_names  # vm_rule1 should still exist
+    assert "pd_rule1" not in rule_names  # pd_rule1 should be deleted
+
+    # Verify vm_rule1 is unchanged
+    vm_rule1 = next(r for r in rules if r["name"] == "vm_rule1")
+    assert vm_rule1["description"] == "VM Rule"
+    assert vm_rule1["fingerprint_fields"] == ["fingerprint"]
+    assert vm_rule1["provider_type"] == "victoriametrics"