Skip to content

Commit

Permalink
Merge pull request #486 from McStork/dns-tcp-rebase
Browse files Browse the repository at this point in the history
DNS - implementing TCP
  • Loading branch information
andrewkroh committed Dec 16, 2015
2 parents 7ec8245 + 7b8861b commit 9cf0f3d
Show file tree
Hide file tree
Showing 5 changed files with 740 additions and 15 deletions.
211 changes: 202 additions & 9 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
// messages.
//
// Future Additions:
// * Implement TcpProtocolPlugin.
// * Publish a message when packets are received that cannot be decoded.
// * Publish a message when Query packets are received that cannot be decoded.
// * Add EDNS and DNSSEC support (consider using miekg/dns instead
// of gopacket).
// * Consider adding ICMP support to
Expand All @@ -30,6 +29,7 @@ import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"

"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
Expand All @@ -45,8 +45,11 @@ const (

// Notes that are added to messages during exceptional conditions.
const (
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
NonDnsCompleteMsg = "Message's data could not be decoded as DNS."
NonDnsResponsePacketMsg = "Response packet's data could not be decoded as DNS."
EmptyMsg = "Message's data is empty."
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
"was received so this query was closed without receiving a response."
OrphanedResponseMsg = "Response was received without an associated query."
NoResponse = "No response to this query was received."
Expand All @@ -60,6 +63,8 @@ const (
TransportUdp
)

const DecodeOffset = 2

var TransportNames = []string{
"tcp",
"udp",
Expand Down Expand Up @@ -166,7 +171,7 @@ type DnsMessage struct {
// DnsStream contains DNS data from one side of a TCP transmission. A pair
// of DnsStream's are used to represent the full conversation.
type DnsStream struct {
tcptuple *common.TcpTuple
tcpTuple *common.TcpTuple

data []byte

Expand Down Expand Up @@ -318,7 +323,7 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

dnsPkt, err := decodeDnsPacket(pkt.Payload)
dnsPkt, err := decodeDnsData(TransportUdp, pkt.Payload)
if err != nil {
// This means that malformed requests or responses are being sent or
// that someone is attempting to the DNS port for non-DNS traffic. Both
Expand All @@ -344,6 +349,10 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
}
}

func (dns *Dns) ConnectionTimeout() time.Duration {
return dns.transactionTimeout
}

func (dns *Dns) receivedDnsRequest(tuple *DnsTuple, msg *DnsMessage) {
logp.Debug("dns", "Processing query. %s", tuple)

Expand Down Expand Up @@ -690,20 +699,204 @@ func nameToString(name []byte) string {
return string(s)
}

// decodeDnsPacket decodes a byte array into a DNS struct. If an error occurs
// decodeDnsData decodes a byte array into a DNS struct. If an error occurs
// then the returnd dns pointer will be nil. This method recovers from panics
// and is concurrency-safe.
func decodeDnsPacket(data []byte) (dns *layers.DNS, err error) {
func decodeDnsData(transport Transport, data []byte) (dns *layers.DNS, err error) {
var offset int
if transport == TransportTcp {
offset = DecodeOffset
}

// Recover from any panics that occur while parsing a packet.
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

d := &layers.DNS{}
err = d.DecodeFromBytes(data, gopacket.NilDecodeFeedback)
err = d.DecodeFromBytes(data[offset:], gopacket.NilDecodeFeedback)
if err != nil {
return nil, err
}
return d, nil
}

// TCP implementation

func (dns *Dns) Parse(pkt *protos.Packet, tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("DNS ParseTcp")

logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

priv := dnsPrivateData{}

if private != nil {
var ok bool
priv, ok = private.(dnsPrivateData)
if !ok {
priv = dnsPrivateData{}
}
}

payload := pkt.Payload

stream := &priv.Data[dir]

if *stream == nil {
*stream = &DnsStream{
tcpTuple: tcpTuple,
data: payload,
message: &DnsMessage{Ts: pkt.Ts, Tuple: pkt.Tuple},
}
if len(payload) <= DecodeOffset {
logp.Debug("dns", EmptyMsg+" addresses %s",
tcpTuple.String())

return priv
}
} else {
(*stream).data = append((*stream).data, payload...)
dataLength := len((*stream).data)
if dataLength > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("dns", "Stream data too large, dropping DNS stream")
return priv
}
if dataLength <= DecodeOffset {
logp.Debug("dns", EmptyMsg+" addresses %s",
tcpTuple.String())
return priv
}
}

data, err := decodeDnsData(TransportTcp, (*stream).data)

if err != nil {
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len((*stream).data))

// wait for decoding with the next segment
return priv
}

dns.messageComplete(tcpTuple, dir, *stream, data)
return priv
}

func (dns *Dns) messageComplete(tcpTuple *common.TcpTuple, dir uint8, s *DnsStream, decodedData *layers.DNS) {
dns.handleDns(s.message, tcpTuple, dir, s.data, decodedData)

s.PrepareForNewMessage()
}

func (dns *Dns) handleDns(m *DnsMessage, tcpTuple *common.TcpTuple, dir uint8, data []byte, decodedData *layers.DNS) {
dnsTuple := DnsTupleFromIpPort(&m.Tuple, TransportTcp, decodedData.ID)
m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcpTuple.IpPort())
m.Data = decodedData
m.Length = len(data)

if decodedData.QR == Query {
dns.receivedDnsRequest(&dnsTuple, m)
} else /* Response */ {
dns.receivedDnsResponse(&dnsTuple, m)
}
}

func (stream *DnsStream) PrepareForNewMessage() {
stream.message = nil
}

func (dns *Dns) ReceivedFin(tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
if private == nil {
return private
}
dnsData, ok := private.(dnsPrivateData)
if !ok {
return private
}
if dnsData.Data[dir] == nil {
return dnsData
}
stream := dnsData.Data[dir]
if stream.message != nil {
decodedData, err := decodeDnsData(TransportTcp, stream.data)

if err == nil {
dns.messageComplete(tcpTuple, dir, stream, decodedData)
} else /*Failed decode */ {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
stream.PrepareForNewMessage()
}
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
}
}

return dnsData
}

func (dns *Dns) GapInStream(tcpTuple *common.TcpTuple, dir uint8, nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
dnsData, ok := private.(dnsPrivateData)

if !ok {
return private, false
}

stream := dnsData.Data[dir]

if stream == nil || stream.message == nil {
return private, false
}

decodedData, err := decodeDnsData(TransportTcp, stream.data)

// Add Notes if the failed stream is the response
if err != nil {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
}

// drop the stream because it is binary and it would be rare to have a decodable message later
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
return private, true
}

// publish and ignore the gap. No case should reach this code though ...
dns.messageComplete(tcpTuple, dir, stream, decodedData)
return private, false
}

// Add Notes to the query stream about a failure to decode the response
func (dns *Dns) publishDecodeFailureNotes(dnsData dnsPrivateData) {
streamOrigin := dnsData.Data[tcp.TcpDirectionOriginal]
streamReverse := dnsData.Data[tcp.TcpDirectionReverse]

if streamOrigin == nil || streamReverse == nil {
return
}

dataOrigin, err := decodeDnsData(TransportTcp, streamOrigin.data)
tupleReverse := streamReverse.message.Tuple

if err == nil {
dnsTupleReverse := DnsTupleFromIpPort(&tupleReverse, TransportTcp, dataOrigin.ID)
hashDnsTupleOrigin := (&dnsTupleReverse).RevHashable()

trans := dns.deleteTransaction(hashDnsTupleOrigin)

if trans == nil { // happens when a Gap is followed by Fin
return
}

trans.Notes = append(trans.Notes, NonDnsResponsePacketMsg)

dns.publishTransaction(trans)
dns.deleteTransaction(hashDnsTupleOrigin)
} else {
logp.Debug("dns", "Unabled to decode response with adresses %s has no associated query", streamReverse.tcpTuple.String())
}
}
Loading

0 comments on commit 9cf0f3d

Please sign in to comment.