Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

use container ports in tasks SRV records #519

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion config.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
"srvRecordDefaultWeight": 1,
"IPSources": ["mesos", "host"],
"EnforceRFC952": false,
"EnumerationOn": true
"EnumerationOn": true,
"SRVPreferContainerPorts": false
}
6 changes: 5 additions & 1 deletion docs/docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ The configuration file should include the following fields:
"SOARetry": 600,
"SOAExpire": 86400,
"SOAMinttl": 60,
"IPSources": ["netinfo", "mesos", "host"]
"IPSources": ["netinfo", "mesos", "host"],
"SRVPreferContainerPorts": false
}
```

Expand Down Expand Up @@ -114,3 +115,6 @@ sorted by priority. If you use **Docker**, and enable the `netinfo` IPSource, it
- `mesos`: Mesos containerizer IP. **DEPRECATED**
- `docker`: Docker containerizer IP. **DEPRECATED**
- `netinfo`: Mesos 0.25 NetworkInfo.

`SRVPreferContainerPorts` is a boolean field that controls whether Mesos-DNS use the container ports from the containerinfos port mapping definitions. It return the container port only if a match is found, the host port otherwise. This behavioir requires the `netinfo` IPSource.

59 changes: 32 additions & 27 deletions records/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,38 +108,42 @@ type Config struct {
httpConfigMap httpcli.ConfigMap

MesosAuthentication httpcli.AuthMechanism

// Use container ports from portmapping definitions.
SRVPreferContainerPorts bool
}

// NewConfig return the default config of the resolver
func NewConfig() Config {
return Config{
ZkDetectionTimeout: 30,
RefreshSeconds: 60,
TTL: 60,
SRVRecordDefaultWeight: 1,
Domain: "mesos",
Port: 53,
Timeout: 5,
StateTimeoutSeconds: 300,
SOARname: "root.ns1.mesos",
SOAMname: "ns1.mesos",
SOARefresh: 60,
SOARetry: 600,
SOAExpire: 86400,
SOAMinttl: 60,
ZoneResolvers: map[string][]string{},
Resolvers: []string{"8.8.8.8"},
Listener: "0.0.0.0",
HTTPListener: "0.0.0.0",
HTTPPort: 8123,
DNSOn: true,
HTTPOn: true,
ExternalOn: true,
SetTruncateBit: true,
RecurseOn: true,
IPSources: []string{"netinfo", "mesos", "host"},
EnumerationOn: true,
MesosAuthentication: httpcli.AuthNone,
ZkDetectionTimeout: 30,
RefreshSeconds: 60,
TTL: 60,
SRVRecordDefaultWeight: 1,
Domain: "mesos",
Port: 53,
Timeout: 5,
StateTimeoutSeconds: 300,
SOARname: "root.ns1.mesos",
SOAMname: "ns1.mesos",
SOARefresh: 60,
SOARetry: 600,
SOAExpire: 86400,
SOAMinttl: 60,
ZoneResolvers: map[string][]string{},
Resolvers: []string{"8.8.8.8"},
Listener: "0.0.0.0",
HTTPListener: "0.0.0.0",
HTTPPort: 8123,
DNSOn: true,
HTTPOn: true,
ExternalOn: true,
SetTruncateBit: true,
RecurseOn: true,
IPSources: []string{"netinfo", "mesos", "host"},
EnumerationOn: true,
MesosAuthentication: httpcli.AuthNone,
SRVPreferContainerPorts: false,
}
}

Expand Down Expand Up @@ -309,6 +313,7 @@ func (c Config) log() {
"MesosAuthentication is set to none. This is probably not intentional")
}
}
logging.Verbose.Println(" - SRVPreferContainerPorts: ", c.SRVPreferContainerPorts)
}

func readCACertFile(caCertFile string) (caPool *x509.CertPool, err error) {
Expand Down
30 changes: 18 additions & 12 deletions records/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (rg *RecordGenerator) ParseState(c Config, masters ...string) error {
hostSpec = labels.RFC952
}

return rg.InsertState(sj, c.Domain, c.SOAMname, c.Listener, masters, c.IPSources, hostSpec)
return rg.InsertState(sj, c.Domain, c.SOAMname, c.Listener, masters, c.IPSources, c.SRVPreferContainerPorts, hostSpec)
}

// hashes a given name using a truncated sha1 hash
Expand All @@ -202,7 +202,7 @@ func hashString(s string) string {
}

// InsertState transforms a StateJSON into RecordGenerator RRs
func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener string, masters, ipSources []string, spec labels.Func) error {
func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener string, masters, ipSources []string, srvPreferContainerPorts bool, spec labels.Func) error {
rg.SlaveIPs = map[string][]string{}
rg.SRVs = rrs{}
rg.As = rrs{}
Expand All @@ -211,7 +211,7 @@ func (rg *RecordGenerator) InsertState(sj state.State, domain, ns, listener stri
rg.slaveRecords(sj, domain, spec)
rg.listenerRecord(listener, ns)
rg.masterRecord(domain, masters, sj.Leader)
rg.taskRecords(sj, domain, spec, ipSources)
rg.taskRecords(sj, domain, spec, ipSources, srvPreferContainerPorts)

return nil
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (rg *RecordGenerator) listenerRecord(listener string, ns string) {
}
}

func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec labels.Func, ipSources []string) {
func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec labels.Func, ipSources []string, srvPreferContainerPorts bool) {
for _, f := range sj.Frameworks {
enumerableFramework := &EnumerableFramework{
Name: f.Name,
Expand All @@ -382,21 +382,22 @@ func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec label

// only do running and discoverable tasks
if ok && (task.State == "TASK_RUNNING") {
rg.taskRecord(task, f, domain, spec, ipSources, enumerableFramework)
rg.taskRecord(task, f, domain, spec, ipSources, srvPreferContainerPorts, enumerableFramework)
}
}
}
}

type context struct {
taskName string
taskID string
slaveID string
taskIPs []net.IP
slaveIPs []string
taskName string
taskID string
slaveID string
taskIPs []net.IP
slaveIPs []string
srvPreferContainerPorts bool
}

func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain string, spec labels.Func, ipSources []string, enumFW *EnumerableFramework) {
func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain string, spec labels.Func, ipSources []string, srvPreferContainerPorts bool, enumFW *EnumerableFramework) {

newTask := &EnumerableTask{ID: task.ID, Name: task.Name}

Expand All @@ -409,6 +410,7 @@ func (rg *RecordGenerator) taskRecord(task state.Task, f state.Framework, domain
slaveIDTail(task.SlaveID),
task.IPs(ipSources...),
task.SlaveIPs,
srvPreferContainerPorts,
}

// use DiscoveryInfo name if defined instead of task name
Expand Down Expand Up @@ -488,7 +490,11 @@ func (rg *RecordGenerator) taskContextRecord(ctx context, task state.Task, f sta
}

for _, port := range task.DiscoveryInfo.Ports.DiscoveryPorts {
target := canonical + tail + ":" + strconv.Itoa(port.Number)
p := port.Number
if ctx.srvPreferContainerPorts {
p = state.MapPort(task, p)
}
target := canonical + tail + ":" + strconv.Itoa(p)
recordName(withProtocol(port.Protocol, fname, spec,
withNamedPort(port.Name, spec, asSRV(target))))
}
Expand Down
18 changes: 10 additions & 8 deletions records/generator_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
taskCount = 1000
)
type params struct {
task state.Task
f state.Framework
domain string
spec labels.Func
ipSources []string
enumFW EnumerableFramework
rg RecordGenerator
task state.Task
f state.Framework
domain string
spec labels.Func
ipSources []string
enumFW EnumerableFramework
rg RecordGenerator
srvPreferContainerPorts bool
}
var (
initialState = params{
Expand All @@ -67,6 +68,7 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
As: rrs{},
SRVs: rrs{},
},
srvPreferContainerPorts: false,
}
slaves = make([]string, clusterSize)
tasks = make([]string, taskCount)
Expand All @@ -87,6 +89,6 @@ func BenchmarkTaskRecord_withoutDiscoveryInfo(b *testing.B) {
tt.task.Name = tasks[ti]
tt.task.SlaveIPs = []string{slaves[si]}
tt.task.SlaveID = "ID-" + slaves[si]
tt.rg.taskRecord(tt.task, tt.f, tt.domain, tt.spec, tt.ipSources, &tt.enumFW)
tt.rg.taskRecord(tt.task, tt.f, tt.domain, tt.spec, tt.ipSources, tt.srvPreferContainerPorts, &tt.enumFW)
}
}
20 changes: 13 additions & 7 deletions records/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func expectRecords(rg *RecordGenerator, expect []expectedRR) (eA, eAAAA, eSRV rr
return
}

func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) RecordGenerator {
func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string, srvPreferContainerPorts bool) RecordGenerator {
var sj state.State

b, err := ioutil.ReadFile("../factories/fake.json")
Expand All @@ -240,7 +240,7 @@ func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) Rec
masters := []string{"144.76.157.37:5050"}

var rg RecordGenerator
if err := rg.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", masters, ipSources, spec); err != nil {
if err := rg.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", masters, ipSources, srvPreferContainerPorts, spec); err != nil {
t.Fatal(err)
}

Expand All @@ -249,11 +249,12 @@ func testRecordGenerator(t *testing.T, spec labels.Func, ipSources []string) Rec

// ensure we are parsing what we think we are
func TestInsertState(t *testing.T) {
rg := testRecordGenerator(t, labels.RFC952, []string{"netinfo", "docker", "mesos", "host"})
rgDocker := testRecordGenerator(t, labels.RFC952, []string{"docker", "host"})
rgMesos := testRecordGenerator(t, labels.RFC952, []string{"mesos", "host"})
rgSlave := testRecordGenerator(t, labels.RFC952, []string{"host"})
rgNetinfo := testRecordGenerator(t, labels.RFC952, []string{"netinfo"})
rg := testRecordGenerator(t, labels.RFC952, []string{"netinfo", "docker", "mesos", "host"}, false)
rgDocker := testRecordGenerator(t, labels.RFC952, []string{"docker", "host"}, false)
rgMesos := testRecordGenerator(t, labels.RFC952, []string{"mesos", "host"}, false)
rgSlave := testRecordGenerator(t, labels.RFC952, []string{"host"}, false)
rgNetinfo := testRecordGenerator(t, labels.RFC952, []string{"netinfo"}, false)
rgContainerPorts := testRecordGenerator(t, labels.RFC952, []string{"netinfo"}, true)

for i, tt := range []struct {
rrs rrs
Expand Down Expand Up @@ -333,6 +334,11 @@ func TestInsertState(t *testing.T) {
{rgNetinfo.AAAAs, "toy-store.ipv6-framework.mesos.", []string{"fd01:b::1:8000:2"}},
{rgNetinfo.AAAAs, "toy-store.ipv6-framework.slave.mesos.", []string{"2001:db8::1"}},

{rgContainerPorts.As, "toy-store.ipv6-framework.mesos.", []string{"12.0.1.2"}},

{rgContainerPorts.AAAAs, "toy-store.ipv6-framework.mesos.", []string{"fd01:b::1:8000:2"}},
{rgContainerPorts.AAAAs, "toy-store.ipv6-framework.slave.mesos.", []string{"2001:db8::1"}},

{rgDocker.As, "liquor-store.marathon.mesos.", []string{"10.3.0.1", "10.3.0.2"}},
{rgDocker.As, "liquor-store.marathon.slave.mesos.", []string{"1.2.3.11", "1.2.3.12"}},
{rgDocker.As, "nginx.marathon.mesos.", []string{"1.2.3.11"}},
Expand Down
2 changes: 1 addition & 1 deletion records/state/client/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func LoadMasterStateFailover(initialMasterIP string, stateLoader func(ip string)
}
return sj, nil
}
err = errors.New("fetched state does not contain leader information")
err = errors.New("Fetched state does not contain leader information")
return sj, err
}

Expand Down
24 changes: 23 additions & 1 deletion records/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type ContainerStatus struct {
// NetworkInfo holds the network configuration for a single interface
// as defined in the /state Mesos HTTP endpoint.
type NetworkInfo struct {
IPAddresses []IPAddress `json:"ip_addresses,omitempty"`
IPAddresses []IPAddress `json:"ip_addresses,omitempty"`
PortMappings []PortMapping `json:"port_mappings,omitempty"`
// back-compat with 0.25 IPAddress format
IPAddress string `json:"ip_address,omitempty"`
}
Expand All @@ -82,6 +83,13 @@ type IPAddress struct {
IPAddress string `json:"ip_address,omitempty"`
}

// PortMapping holds a port for a task defined in the /state Mesos HTTP endpoint.
type PortMapping struct {
Protocol string `json:"protocol,omitempty"`
HostPort int `json:"host_port"`
ContainerPort int `json:"container_port"`
}

// Task holds a task as defined in the /state Mesos HTTP endpoint.
type Task struct {
FrameworkID string `json:"framework_id"`
Expand Down Expand Up @@ -200,6 +208,20 @@ func statusIPs(st []Status, src func(*Status) []string) []string {
return nil
}

// MapPort returns the mapped port if available, or the host port
// listening on.
func MapPort(t Task, hostport int) int {
ni := t.Statuses[0].ContainerStatus.NetworkInfos
for n := range ni {
for m := range ni[n].PortMappings {
if ni[n].PortMappings[m].HostPort == hostport && ni[n].PortMappings[m].ContainerPort > 0 {
return ni[n].PortMappings[m].ContainerPort
vixns marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return hostport
}

// labels returns all given Status.[]Labels' values whose keys are equal
// to the given key
func labels(key string) func(*Status) []string {
Expand Down
2 changes: 1 addition & 1 deletion resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func fakeDNS() (*Resolver, error) {
}

spec := labels.RFC952
err = res.rs.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", res.config.Masters, res.config.IPSources, spec)
err = res.rs.InsertState(sj, "mesos", "mesos-dns.mesos.", "127.0.0.1", res.config.Masters, res.config.IPSources, res.config.SRVPreferContainerPorts, spec)
if err != nil {
return nil, err
}
Expand Down