-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
184 lines (146 loc) · 6.31 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
import docker
import time
import os
import logging
import ipaddress
from pathlib import Path
from collections import defaultdict
from xml.etree import ElementTree as ET
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Configuration
DOMAIN = os.environ.get("DOMAIN", "home.arpa")
HOSTS_FILE = "/etc/docker_hosts"
UPDATE_INTERVAL = 2 # seconds
# Define subnet to IP mapping structure
class SubnetBasedIPs:
def __init__(self):
self.networks = [] # List of (network, ip) tuples
def add_ip(self, ip, subnet):
try:
network = ipaddress.ip_network(subnet, strict=False)
self.networks.append((network, ip))
except ValueError as e:
logging.error(f"Invalid subnet {subnet} for IP {ip}: {e}")
def get_best_ip(self, client_ip):
try:
client = ipaddress.ip_address(client_ip)
for network, ip in self.networks:
if client in network:
return ip
# If no matching subnet found, return the first IP
return self.networks[0][1] if self.networks else None
except ValueError as e:
logging.error(f"Invalid client IP {client_ip}: {e}")
return self.networks[0][1] if self.networks else None
def get_macvlan_networks():
"""Get all Docker networks that use the macvlan driver."""
client = docker.from_env()
networks = client.networks.list()
macvlan_networks = []
for network in networks:
if network.attrs.get('Driver') == 'macvlan':
config = network.attrs.get('IPAM', {}).get('Config', [])
for cfg in config:
subnet = cfg.get('Subnet')
if subnet:
macvlan_networks.append((network.name, subnet))
return macvlan_networks
def get_container_ips():
client = docker.from_env()
container_ips = defaultdict(SubnetBasedIPs)
macvlan_networks = get_macvlan_networks()
if not macvlan_networks:
logging.warning("No macvlan networks found!")
return container_ips
logging.info(f"Monitoring macvlan networks: {macvlan_networks}")
# Create network name to subnet mapping
network_subnets = {name: subnet for name, subnet in macvlan_networks}
for container in client.containers.list():
hostname = container.attrs['Config'].get('Hostname')
container_name = container.name
networks = container.attrs['NetworkSettings']['Networks']
logging.debug(f"Checking container: {container_name} (hostname: {hostname})")
for network_name, network_config in networks.items():
if network_name in network_subnets:
ip = network_config.get('IPAddress')
subnet = network_subnets[network_name]
logging.debug(f"Found IP for {network_name} ({subnet}): {ip}")
if ip and container_name:
container_ips[f"{container_name}.{DOMAIN}"].add_ip(ip, subnet)
return container_ips
def get_macvlan_networks():
"""Get all Docker networks that use the macvlan driver."""
client = docker.from_env()
networks = client.networks.list()
macvlan_networks = []
for network in networks:
if network.attrs.get('Driver') == 'macvlan':
config = network.attrs.get('IPAM', {}).get('Config', [])
for cfg in config:
subnet = cfg.get('Subnet')
if subnet:
macvlan_networks.append((network.name, subnet))
logging.info(f"Found macvlan network: {network.name} with subnet: {subnet}")
return macvlan_networks
def generate_hosts_entries(container_ips):
"""Generate all possible host entries for CoreDNS to use."""
entries = []
# Get all unique subnets from macvlan networks
test_subnets = [subnet for _, subnet in get_macvlan_networks()]
logging.info(f"Generated entries for subnets: {test_subnets}")
for hostname, subnet_ips in container_ips.items():
# Generate entries for each detected subnet
for subnet in test_subnets:
try:
# Use a representative IP from each subnet for testing
test_ip = str(next(ipaddress.ip_network(subnet).hosts()))
best_ip = subnet_ips.get_best_ip(test_ip)
if best_ip:
entries.append(f"{best_ip} {hostname}")
except Exception as e:
logging.error(f"Error generating entry for {hostname} subnet {subnet}: {e}")
return entries
def update_hosts_file(container_ips):
# Create hosts file content
hosts_content = "# Auto-generated by docker-hosts-provider\n"
hosts_content += "127.0.0.1 localhost\n\n"
# Add entries for each subnet and remove duplicates
entries = list(set(generate_hosts_entries(container_ips)))
hosts_content += "\n".join(entries)
logging.info("Writing hosts file with content:")
logging.info(hosts_content)
# Ensure directory exists
os.makedirs(os.path.dirname(HOSTS_FILE), exist_ok=True)
try:
with open(HOSTS_FILE, 'w') as f:
f.write(hosts_content)
logging.info(f"Successfully updated {HOSTS_FILE}")
except Exception as e:
logging.error(f"Error writing hosts file: {e}")
raise
def main():
logging.info("Starting Docker DNS monitor...")
# avahi_publisher = AvahiPublisher()
while True:
try:
current_ips = get_container_ips()
if current_ips:
logging.info("Current DNS mappings:")
for hostname, subnet_ips in sorted(current_ips.items()):
for network, ip in subnet_ips.networks:
logging.info(f" {hostname} -> {ip} (subnet: {network})")
update_hosts_file(current_ips)
# avahi_publisher.publish_hosts(current_ips)
else:
logging.warning("No containers found on macvlan networks.")
time.sleep(UPDATE_INTERVAL)
except Exception as e:
logging.error(f"Error: {e}")
time.sleep(UPDATE_INTERVAL)
if __name__ == "__main__":
main()