Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support root nameserver and additional record types #40

Merged
merged 8 commits into from
Oct 14, 2024
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## UNRELEASED

#### Nothworthy Changes

* Added support for more record types: ALIAS, DS, NAPTR, TLSA, TLSA
* Added support for root nameservers

## v0.0.1 - 2022-01-14 - Moving

#### Nothworthy Changes
Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ providers:

#### Records

TransipProvider A, AAAA, CAA, CNAME, MX, NS, SRV, SPF, SSHFP, and TXT
TransipProvider A, AAAA, ALIAS, CAA, CNAME, DS, MX, NAPTR, NS, SPF, SRV, SSHFP, TLSA, TXT

#### Root NS records

TransipProvider support root NS record management.
**notes:**
- Transip currently only supports FQDN values for root nameservers.
- Transip has no TTL for root nameservers, so the TTL value from the source is ignored


#### Dynamic

Expand Down
266 changes: 248 additions & 18 deletions octodns_transip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from collections import defaultdict, namedtuple
from logging import getLogger

from requests.utils import is_ipv4_address
from transip import TransIP
from transip.exceptions import TransIPHTTPError
from transip.v6.objects import DnsEntry
from transip.v6.objects import DnsEntry, Nameserver
from urllib3.util.ssl_ import is_ipaddress

from octodns.provider import ProviderException
from octodns.provider import ProviderException, SupportsException
from octodns.provider.base import BaseProvider
from octodns.record import Record

Expand All @@ -31,17 +33,53 @@ class TransipNewZoneException(TransipException):
pass


class TransipRetrieveRecordsException(ProviderException):
pass


class TransipRetrieveNameserverException(ProviderException):
pass


class TransipSaveRecordsException(ProviderException):
pass


class TransipSaveNameserverException(ProviderException):
pass


class TransipProvider(BaseProvider):
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS_ROOT_NS = True
SUPPORTS = set(
('A', 'AAAA', 'CNAME', 'MX', 'NS', 'SRV', 'SPF', 'TXT', 'SSHFP', 'CAA')
(
'A',
'AAAA',
'CNAME',
'MX',
'NS',
'SRV',
'SPF',
'TXT',
'SSHFP',
'CAA',
'TLSA',
'NAPTR',
'ALIAS',
'DS',
)
)
# unsupported by OctoDNS: 'TLSA'
MIN_TTL = 120
TIMEOUT = 15
ROOT_RECORD = '@'

# TransIP root nameservers don't have TTL configurable.
# This value is enforced on root NS records to prevent TTL-only changes
# See root NS handling in _process_desired_zone for more information
ROOT_NS_TTL = 3600

def __init__(self, id, account, key=None, key_file=None, *args, **kwargs):
self.log = getLogger('TransipProvider[{}]'.format(id))
self.log.debug('__init__: id=%s, account=%s, token=***', id, account)
Expand All @@ -68,10 +106,9 @@ def populate(self, zone, target=False, lenient=False):
)

before = len(zone.records)

try:
domain = self._client.domains.get(zone.name.strip('.'))
records = domain.dns.list()

except TransIPHTTPError as e:
if e.response_code == 404 and target is False:
# Zone not found in account, and not a target so just
Expand All @@ -93,9 +130,61 @@ def populate(self, zone, target=False, lenient=False):
)
)

self.log.debug(
'populate: found %s records for zone %s', len(records), zone.name
)
# Retrieve dns records from transip api
try:
records = domain.dns.list()
self.log.debug(
'populate: found %s records for zone %s',
len(records),
zone.name,
)
except TransIPHTTPError as e:
self.log.error('populate: (%s) %s ', e.response_code, e.message)
raise TransipRetrieveRecordsException(
(
'populate: ({}) failed to get ' + 'dns records for zone: {}'
).format(e.response_code, zone.name)
)

# Retrieve nameservers from transip api
try:
nameservers = domain.nameservers.list()
self.log.debug(
'populate: found %s root nameservers for zone %s',
len(nameservers),
zone.name,
)
except TransIPHTTPError as e:
self.log.error('populate: (%s) %s ', e.response_code, e.message)
raise TransipRetrieveNameserverException(
(
'populate: ({}) failed to get '
+ 'root nameservers for zone: {}'
).format(e.response_code, zone.name)
)

# If nameservers are found, add them as ROOT NS records
if nameservers:
values = []
for ns in nameservers:
if ns.hostname != '':
values.append(ns.hostname + '.')
if ns.ipv4 != '':
values.append(ns.ipv4 + '.')
if ns.ipv6 != '':
values.append(ns.ipv6 + '.')

record = Record.new(
zone,
'',
{'type': 'NS', 'ttl': self.ROOT_NS_TTL, 'values': values},
source=self,
lenient=lenient,
)
zone.add_record(record, lenient=lenient)
zone.root_ns

# If records are found, add them to the zone
if records:
values = defaultdict(lambda: defaultdict(list))
for record in records:
Expand All @@ -116,12 +205,41 @@ def populate(self, zone, target=False, lenient=False):
lenient=lenient,
)
zone.add_record(record, lenient=lenient)

self.log.info(
'populate: found %s records', len(zone.records) - before
)

return True

def _process_desired_zone(self, desired):

for record in desired.records:
if record._type == 'NS' and record.name == '':

# Check values for FQDN, IP's are not supported
values = record.values

for value in values:
if is_ipaddress(value.strip(".")):
msg = f'ip address not supported for root NS value for {record.fqdn}'
raise SupportsException(f'{self.id}: {msg}')

# TransIP root nameservers don't have TTL configurable.
# Check if TTL differs and enforce our fixed value if needed.
if record.ttl != self.ROOT_NS_TTL:
updated_record = record.copy()
updated_record.ttl = self.ROOT_NS_TTL
msg = f'TTL value not supported for root NS values for {record.fqdn}'
fallback = f'modified to fixed value ({self.ROOT_NS_TTL})'
# Not using self.supports_warn_or_except(msg, fallback)
# because strict_mode shouldn't be disabled just for an ignored value
# so always return a warning even in strict_mode
self.log.warning('%s; %s', msg, fallback)
desired.add_record(updated_record, replace=True)

return super()._process_desired_zone(desired)

def _apply(self, plan):
desired = plan.desired
changes = plan.changes
Expand All @@ -135,11 +253,44 @@ def _apply(self, plan):
'Unhandled error: ({}) {}'.format(e.response_code, e.message)
)

for change in changes:
record = change.data['new']
ross marked this conversation as resolved.
Show resolved Hide resolved

if change.data['name'] == '' and change.data['record_type'] == 'NS':
values = (
record.get('values')
if record.get('values')
else [record.get('value')]
)
ross marked this conversation as resolved.
Show resolved Hide resolved

nameservers = []
for value in values:
nameservers.append(
Nameserver(
domain.nameservers, _attr_for_nameserver(value)
)
)
try:
domain.nameservers.replace(nameservers)
except TransIPHTTPError as e:
self.log.warning(
'_apply: Set Nameservers returned one or more errors: {}'.format(
e
)
)
raise TransipSaveNameserverException(
'Unhandled error: ({}) {}'.format(
e.response_code, e.message
)
)

records = []
for record in plan.desired.records:
if record._type in self.SUPPORTS:
# Root records have '@' as name
name = record.name
if name == '' and record._type == 'NS':
continue
if name == '':
name = self.ROOT_RECORD

Expand All @@ -154,13 +305,13 @@ def _apply(self, plan):
self.log.warning(
'_apply: Set DNS returned one or more errors: {}'.format(e)
)
raise TransipException(
raise TransipSaveRecordsException(
'Unhandled error: ({}) {}'.format(e.response_code, e.message)
)


def _data_for(type_, records, current_zone):
if type_ == 'CNAME':
if type_ == 'CNAME' or type_ == 'ALIAS':
return {
'type': type_,
'ttl': records[0].expire,
Expand Down Expand Up @@ -198,12 +349,51 @@ def format_caa(record):
def format_txt(record):
return record.content.replace(';', '\\;')

def format_tlsa(record):
(
certificate_usage,
selector,
matching_type,
certificate_association_data,
) = record.content.split(' ', 4)
return {
'certificate_usage': certificate_usage,
'selector': selector,
'matching_type': matching_type,
'certificate_association_data': certificate_association_data,
}

def format_naptr(record):
order, preference, flags, service, regexp, replacement = (
record.content.split(' ', 6)
)
return {
'order': order,
'preference': preference,
'flags': flags,
'service': service,
'regexp': regexp,
'replacement': replacement,
}

def format_ds(record):
key_tag, algorithm, digest_type, digest = record.content.split(' ', 4)
return {
'key_tag': key_tag,
'algorithm': algorithm,
'digest_type': digest_type,
'digest': digest,
}

value_formatter = {
'MX': format_mx,
'SRV': format_srv,
'SSHFP': format_sshfp,
'CAA': format_caa,
'TXT': format_txt,
'TLSA': format_tlsa,
'NAPTR': format_naptr,
'DS': format_ds,
}.get(type_, lambda r: r.content)

return {
Expand Down Expand Up @@ -231,16 +421,56 @@ def _get_lowest_ttl(records):

def _entries_for(name, record):
values = record.values if hasattr(record, 'values') else [record.value]

def entry_mx(v):
return f'{v.preference} {v.exchange}'

def entry_srv(v):
return f'{v.priority} {v.weight} {v.port} {v.target}'

def entry_sshfp(v):
return f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}'

def entry_caa(v):
return f'{v.flags} {v.tag} {v.value}'

def entry_txt(v):
return v.replace('\\;', ';')

def entry_tlsa(v):
return f'{v.certificate_usage} {v.selector} {v.matching_type} {v.certificate_association_data}'

def entry_naptr(v):
return f'{v.order} {v.preference} {v.flags} {v.service} {v.regexp} {v.replacement}'

def entry_ds(v):
return f'{v.key_tag} {v.algorithm} {v.digest_type} {v.digest}'

formatter = {
'MX': lambda v: f'{v.preference} {v.exchange}',
'SRV': lambda v: f'{v.priority} {v.weight} {v.port} {v.target}',
'SSHFP': lambda v: (
f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}'
),
'CAA': lambda v: f'{v.flags} {v.tag} {v.value}',
'TXT': lambda v: v.replace('\\;', ';'),
'MX': entry_mx,
'SRV': entry_srv,
'SSHFP': entry_sshfp,
'CAA': entry_caa,
'TXT': entry_txt,
'TLSA': entry_tlsa,
'NAPTR': entry_naptr,
'DS': entry_ds,
}.get(record._type, lambda r: r)

return [
DNSEntry(name, record.ttl, record._type, formatter(value))
for value in values
]


def _attr_for_nameserver(nameserver):
nameserver = nameserver.strip('.')
return {
'hostname': nameserver if not is_ipaddress(nameserver) else '',
'ipv4': nameserver if is_ipv4_address(nameserver) else '',
'ipv6': (
nameserver
if is_ipaddress(nameserver) and not is_ipv4_address(nameserver)
else ''
),
}
Loading
Loading