Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddns - validate configured zone in _authenticate if possible #812

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions lexicon/providers/ddns.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# dnspython is an optional dependency of lexicon; do not throw an ImportError if
# the dependency is unmet.
try:
import dns.resolver
import dns.rdatatype
import dns.rdataclass
import dns.message
import dns.query
import dns.tsigkeyring
Expand Down Expand Up @@ -42,16 +45,36 @@ def __init__(self, config):
alg, keyid, secret = self._get_provider_option("auth_token").split(":")
self.keyring = dns.tsigkeyring.from_text({keyid: (alg, secret)})
self.endpoint = self._get_provider_option("ddns_server")
self.zone = self._get_provider_option("domain")
self.domain = self._get_provider_option("domain")
self.cached_zone_content = {}

@property
def zone(self):
return self.domain

def _run_query(self, message):
return dns.query.tcp(message, self.endpoint, timeout=10)

def _authenticate(self):
if not self.endpoint:
raise AuthenticationError("No DDNS server provided, use --ddns-server")
pass

record = dns.resolver.resolve(self.domain, rdtype="SOA", raise_on_no_answer=False)
if not record.answer.authority:
# response has no authority, we can not validate
return

for authority in record.answer.authority:
if authority.rdclass == dns.rdataclass.IN and authority.rdtype == dns.rdatatype.SOA:
break
else:
# no soa found - we can't validate
return

zone = authority.name.to_text(omit_final_dot=True).lower()
if zone != self.zone.lower():
raise AuthenticationError(
"The dns zone configured ({}) for the domain {} mismatches the dns zone retrieved via SOA ({})".format(self.zone, self.domain, zone))

# Create record. If record already exists with the same content, do nothing
def _create_record(self, rtype, name, content):
Expand Down