This repository has been archived by the owner on May 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #240 from peteeckel/feature/arpa-network
Extend the database model with a CIDR field for .arpa zones
- Loading branch information
Showing
9 changed files
with
249 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .network import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from django.db import models | ||
from django.db.models import Lookup | ||
|
||
from netaddr import AddrFormatError, IPNetwork | ||
|
||
|
||
class NetContains(Lookup): | ||
lookup_name = "net_contains" | ||
|
||
def as_sql(self, qn, connection): | ||
lhs, lhs_params = self.process_lhs(qn, connection) | ||
rhs, rhs_params = self.process_rhs(qn, connection) | ||
params = lhs_params + rhs_params | ||
return "%s >> %s" % (lhs, rhs), params | ||
|
||
|
||
class NetContainsOrEquals(Lookup): | ||
lookup_name = "net_contains_or_equals" | ||
|
||
def as_sql(self, qn, connection): | ||
lhs, lhs_params = self.process_lhs(qn, connection) | ||
rhs, rhs_params = self.process_rhs(qn, connection) | ||
params = lhs_params + rhs_params | ||
return "%s >>= %s" % (lhs, rhs), params | ||
|
||
|
||
class NetContained(Lookup): | ||
lookup_name = "net_contained" | ||
|
||
def as_sql(self, qn, connection): | ||
lhs, lhs_params = self.process_lhs(qn, connection) | ||
rhs, rhs_params = self.process_rhs(qn, connection) | ||
params = lhs_params + rhs_params | ||
return "%s << %s" % (lhs, rhs), params | ||
|
||
|
||
class NetContainedOrEqual(Lookup): | ||
lookup_name = "net_contained_or_equal" | ||
|
||
def as_sql(self, qn, connection): | ||
lhs, lhs_params = self.process_lhs(qn, connection) | ||
rhs, rhs_params = self.process_rhs(qn, connection) | ||
params = lhs_params + rhs_params | ||
return "%s <<= %s" % (lhs, rhs), params | ||
|
||
|
||
class NetworkFormField(models.Field): | ||
def to_python(self, value): | ||
if not value: | ||
return None | ||
|
||
if isinstance(value, IPNetwork): | ||
return value | ||
|
||
try: | ||
ip_network = IPNetwork(value) | ||
except AddrFormatError as exc: | ||
raise ValidationError(exc) | ||
|
||
return ip_network | ||
|
||
|
||
class NetworkField(models.Field): | ||
description = "IPv4/v6 network associated with a reverse lookup zone" | ||
|
||
def python_type(self): | ||
return IPNetwork | ||
|
||
def from_db_value(self, value, expression, connection): | ||
return self.to_python(value) | ||
|
||
def to_python(self, value): | ||
if not value: | ||
return value | ||
|
||
try: | ||
ip_network = IPNetwork(value) | ||
except (AddressFormatError, TypeError, ValueError) as exc: | ||
raise ValidationError(exc) | ||
|
||
return ip_network | ||
|
||
def get_prep_value(self, value): | ||
if not value: | ||
return None | ||
|
||
if isinstance(value, list): | ||
return [str(self.to_python(v)) for v in value] | ||
|
||
return str(self.to_python(value)) | ||
|
||
def form_class(self): | ||
return NetworkFormField | ||
|
||
def formfield(self, **kwargs): | ||
defaults = {"form_class": self.form_class()} | ||
defaults.update(kwargs) | ||
|
||
return super().formfield(**defaults) | ||
|
||
def db_type(self, connection): | ||
return "cidr" | ||
|
||
|
||
NetworkField.register_lookup(NetContains) | ||
NetworkField.register_lookup(NetContained) | ||
NetworkField.register_lookup(NetContainsOrEquals) | ||
NetworkField.register_lookup(NetContainedOrEqual) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# Generated by Django 4.0.8 on 2022-10-26 18:55 | ||
|
||
from netaddr import IPNetwork, AddrFormatError | ||
|
||
from django.db import migrations | ||
import netbox_dns.fields.network | ||
|
||
|
||
def update_zone_arpa_network(apps, schema_editor): | ||
Zone = apps.get_model("netbox_dns", "Zone") | ||
|
||
for zone in Zone.objects.filter(name__endswith=".arpa"): | ||
name = zone.name | ||
|
||
if name.endswith(".in-addr.arpa"): | ||
address = ".".join(reversed(name.replace(".in-addr.arpa", "").split("."))) | ||
mask = len(address.split(".")) * 8 | ||
|
||
try: | ||
zone.arpa_network = IPNetwork(f"{address}/{mask}") | ||
except AddrFormatError: | ||
uone.arpa_network = None | ||
|
||
elif name.endswith("ip6.arpa"): | ||
address = "".join(reversed(name.replace(".ip6.arpa", "").split("."))) | ||
mask = len(address) | ||
address = address + "0" * (32 - mask) | ||
|
||
try: | ||
zone.arpa_network = IPNetwork( | ||
f"{':'.join([(address[i:i+4]) for i in range(0, mask, 4)])}::/{mask*4}" | ||
) | ||
except AddrFormatError: | ||
zone.arpa_network = None | ||
|
||
zone.save() | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
("netbox_dns", "0017_alter_record_ttl"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="zone", | ||
name="arpa_network", | ||
field=netbox_dns.fields.network.NetworkField(blank=True, null=True), | ||
), | ||
migrations.RunPython(update_zone_arpa_network), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters