diff --git a/blt/middleware/ip_restrict.py b/blt/middleware/ip_restrict.py new file mode 100644 index 000000000..c256d3557 --- /dev/null +++ b/blt/middleware/ip_restrict.py @@ -0,0 +1,137 @@ +import ipaddress + +from django.core.cache import cache +from django.http import HttpResponseForbidden +from user_agents import parse + +from website.models import IP, Blocked + + +class IPRestrictMiddleware: + """ + Middleware to restrict access based on client IP addresses and user agents. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def blocked_ips(self): + """ + Retrieve blocked IP addresses from cache or database. + """ + blocked_ips = cache.get("blocked_ips") + if blocked_ips is None: + blocked_addresses = Blocked.objects.values_list("address", flat=True) + blocked_ips = set(filter(None, blocked_addresses)) + cache.set("blocked_ips", blocked_ips, timeout=86400) + return blocked_ips + + def ip_in_ips(self, ip, blocked_ips): + if blocked_ips is None: + return False + return ip in blocked_ips + + def blocked_ip_network(self): + """ + Retrieve blocked IP networks from cache or database. + """ + blocked_ip_network = cache.get("blocked_ip_network") + if blocked_ip_network is None: + blocked_network = Blocked.objects.values_list("ip_network", flat=True) + blocked_ip_network = [ + ipaddress.ip_network(range_str, strict=False) + for range_str in filter(None, blocked_network) + ] + cache.set("blocked_ip_network", blocked_ip_network, timeout=86400) + return blocked_ip_network or [] + + def ip_in_range(self, ip, blocked_ip_network): + """ + Check if the IP address is within any of the blocked IP networks. + """ + if not blocked_ip_network: + return False + ip_obj = ipaddress.ip_address(ip) + return any(ip_obj in ip_range for ip_range in blocked_ip_network if ip_range) + + def blocked_agents(self): + """ + Retrieve blocked user agents from cache or database. + """ + blocked_agents = cache.get("blocked_agents") + if blocked_agents is None or blocked_agents == []: + blocked_user_agents = Blocked.objects.values_list("user_agent_string", flat=True) + if blocked_user_agents: + blocked_agents = set(blocked_user_agents) + cache.set("blocked_agents", blocked_agents, timeout=86400) + return blocked_agents + else: + return None + return blocked_agents + + def is_user_agent_blocked(self, user_agent, blocked_agents): + """ + Check if the user agent is in the list of blocked user agents. + """ + user_agent_str = str(user_agent).strip() + + if not blocked_agents: + return False + blocked_agents = [str(agent).strip() for agent in blocked_agents if str(agent).strip()] + + for blocked_agent in blocked_agents: + blocked_agent_str = str(blocked_agent).strip() + if blocked_agent_str.lower() in user_agent_str.lower(): + return True + + return False + + def delete_all_info(self): + Blocked.objects.all().delete() + cache.delete("blocked_ips") + cache.delete("blocked_ip_network") + cache.delete("blocked_agents") + + def __call__(self, request): + """ + Process the request and restrict access based on IP address and user agent. + """ + ip = request.META.get("REMOTE_ADDR") + agent = request.META.get("HTTP_USER_AGENT", "") + user_agent = parse(agent) + # If you want to clear everything use this + # self.delete_all_info() + + if ( + self.ip_in_ips(ip, self.blocked_ips()) + or self.ip_in_range(ip, self.blocked_ip_network()) + or self.is_user_agent_blocked(user_agent, self.blocked_agents()) + ): + if self.ip_in_ips(ip, self.blocked_ips()) or self.ip_in_range( + ip, self.blocked_ip_network() + ): + return HttpResponseForbidden( + "Your IP address is restricted from accessing this site." + ) + if self.is_user_agent_blocked(user_agent, self.blocked_agents()): + return HttpResponseForbidden( + "Your user agent is restricted from accessing this site." + ) + + x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") + if x_forwarded_for: + ip = x_forwarded_for.split(",")[0].strip() + else: + ip = request.META.get("REMOTE_ADDR") + + if ip: + ip_record, created = IP.objects.get_or_create( + address=ip, defaults={"agent": parse(agent), "count": 1, "path": request.path} + ) + if not created: + ip_record.agent = parse(agent) + ip_record.count += 1 + ip_record.path = request.path + ip_record.save() + + return self.get_response(request) diff --git a/blt/settings.py b/blt/settings.py index 3a5917db7..fb8a382cc 100644 --- a/blt/settings.py +++ b/blt/settings.py @@ -118,6 +118,7 @@ "django.middleware.security.SecurityMiddleware", "whitenoise.middleware.WhiteNoiseMiddleware", "tz_detect.middleware.TimezoneMiddleware", + "blt.middleware.ip_restrict.IPRestrictMiddleware", ) TESTING = len(sys.argv) > 1 and sys.argv[1] == "test" @@ -360,7 +361,7 @@ }, }, } - +DEFAULT_FILE_STORAGE = "django.core.files.storage.FileSystemStorage" USERS_AVATAR_PATH = "avatars" AVATAR_PATH = os.path.join(MEDIA_ROOT, USERS_AVATAR_PATH) diff --git a/website/admin.py b/website/admin.py index 9dbd3354e..0eb98807f 100644 --- a/website/admin.py +++ b/website/admin.py @@ -1,12 +1,14 @@ from django.contrib import admin from django.contrib.auth.models import User from django.template.defaultfilters import truncatechars +from django.utils import timezone from import_export import resources from import_export.admin import ImportExportModelAdmin from website.models import ( IP, Bid, + Blocked, ChatBotLog, Company, CompanyAdmin, @@ -281,8 +283,60 @@ def issue_description(self, obj): return obj.issue.description +def block_ip(modeladmin, request, queryset): + for ip in queryset: + Blocked.objects.create(address=ip.address, count=ip.count, created=timezone.now()) + + modeladmin.message_user(request, "Selected IPs have been blocked successfully.") + + +block_ip.short_description = "Block selected IPs" + + +def unblock_ip(modeladmin, request, queryset): + for ip in queryset: + Blocked.objects.filter(ip=ip.address).delete() + modeladmin.message_user(request, "Selected IPs have ben unblocked successfully") + + +unblock_ip.short_description = "Unblock selected IPs" + + +def block_user_agent(modeladmin, request, queryset): + for ip in queryset: + Blocked.objects.create(user_agent_string=ip.agent, count=ip.count, created=timezone.now()) + + modeladmin.message_user(request, "Selected UserAgent have been blocked successfully.") + + +block_user_agent.short_description = "Block selected UserAgent" + + +def unblock_user_agent(modeladmin, request, queryset): + for ip in queryset: + Blocked.objects.filter(user_agent_string=ip.agent).delete() + + modeladmin.message_user(request, "Selected UserAgent have been unblocked successfully.") + + +unblock_user_agent.short_description = "Unblock selected UserAgent" + + class IPAdmin(admin.ModelAdmin): - list_display = ("id", "address", "user", "issuenumber", "created", "agent", "path") + list_display = ( + "id", + "address", + "user", + "issuenumber", + "count", + "created", + "agent", + "path", + "method", + "referer", + ) + + actions = [block_ip, unblock_ip, block_user_agent, unblock_user_agent] class MonitorAdmin(admin.ModelAdmin): @@ -310,6 +364,17 @@ class SuggestionVotesAdmin(admin.ModelAdmin): list_display = ("user", "suggestion", "up_vote", "down_vote") +class BlockedAdmin(admin.ModelAdmin): + list_display = ( + "address", + "reason_for_block", + "ip_network", + "user_agent_string", + "count", + "created", + ) + + class ProjectAdmin(admin.ModelAdmin): list_display = ( "id", @@ -320,7 +385,6 @@ class ProjectAdmin(admin.ModelAdmin): "created", "modified", ) - search_fields = ["name", "description", "slug"] @@ -348,6 +412,7 @@ class TagAdmin(admin.ModelAdmin): admin.site.register(IssueScreenshot, IssueScreenshotAdmin) admin.site.register(HuntPrize) admin.site.register(ChatBotLog, ChatBotLogAdmin) +admin.site.register(Blocked, BlockedAdmin) admin.site.register(Suggestion, SuggestionAdmin) admin.site.register(SuggestionVotes, SuggestionVotesAdmin) diff --git a/website/migrations/0129_blocked.py b/website/migrations/0129_blocked.py new file mode 100644 index 000000000..1bcf36a7c --- /dev/null +++ b/website/migrations/0129_blocked.py @@ -0,0 +1,38 @@ +# Generated by Django 5.1 on 2024-08-13 08:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("website", "0128_userprofile_discounted_hourly_rate_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="Blocked", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("address", models.GenericIPAddressField(blank=True, null=True)), + ( + "reason_for_block", + models.TextField(blank=True, max_length=255, null=True), + ), + ("ip_network", models.GenericIPAddressField(blank=True, null=True)), + ( + "user_agent_string", + models.CharField(blank=True, default="", max_length=255, null=True), + ), + ("count", models.IntegerField(default=1)), + ("created", models.DateField(blank=True, null=True)), + ], + ), + ] diff --git a/website/models.py b/website/models.py index 1dc35e92d..cfeb8db02 100644 --- a/website/models.py +++ b/website/models.py @@ -10,6 +10,7 @@ from colorthief import ColorThief from django.conf import settings from django.contrib.auth.models import User +from django.core.cache import cache from django.core.exceptions import MultipleObjectsReturned, ValidationError from django.core.files.base import ContentFile from django.core.files.storage import default_storage @@ -805,6 +806,35 @@ def __str__(self): return f"{self.user.username} - {self.amount} BACON" +class Blocked(models.Model): + address = models.GenericIPAddressField(null=True, blank=True) + reason_for_block = models.TextField(blank=True, null=True, max_length=255) + ip_network = models.GenericIPAddressField(null=True, blank=True) + user_agent_string = models.CharField(max_length=255, default="", null=True, blank=True) + count = models.IntegerField(default=1) + created = models.DateField(null=True, blank=True) + + def __str__(self): + return f"user agent : {self.user_agent_string} | IP : {self.address}" + + +@receiver(post_save, sender=Blocked) +@receiver(post_delete, sender=Blocked) +def clear_blocked_cache(sender, instance=None, **kwargs): + """ + Clears the cache when a Blocked instance is created, updated, or deleted. + """ + cache.delete("blocked_ips") + cache.delete("blocked_ip_network") + cache.delete("blocked_agents") + blocked_ips = Blocked.objects.values_list("address", flat=True) + blocked_ip_network = Blocked.objects.values_list("ip_network", flat=True) + blocked_agents = Blocked.objects.values_list("user_agent_string", flat=True) + cache.set("blocked_ips", blocked_ips, timeout=86400) + cache.set("blocked_ip_network", blocked_ip_network, timeout=86400) + cache.set("blocked_agents", blocked_agents, timeout=86400) + + class TimeLog(models.Model): user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="timelogs"