Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 75 additions & 23 deletions client/doublezerod/internal/pim/cmd/send/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,34 @@ import (
"golang.org/x/net/ipv4"
)

var iface = flag.String("iface", "", "interface to use")
var (
iface = flag.String("iface", "", "interface to use")
group = flag.String("group", "", "multicast group to join/prune")
upstreamNeighbor = flag.String("upstream", "", "upstream neighbor address (for JoinPrune messages)")
rpAddress = flag.String("rp", "10.0.0.0", "RP address defaults to 10.0.0.0")
join = flag.Bool("join", false, "send a join message")
prune = flag.Bool("prune", false, "send a prune message")
holdtime = flag.Int("holdtime", 120, "holdtime for JoinPrune messages (default 120 seconds)")
)

func main() {
flag.Parse()
if *iface == "" {
log.Fatalf("interface not specified")
}

if *group == "" {
log.Fatalf("multicast group not specified")
}

if *upstreamNeighbor == "" {
log.Fatalf("upstream neighbor address not specified")
}

if (!*join && !*prune) || (*join && *prune) {
log.Fatalf("either -join or -prune must be specified")
}

c, err := net.ListenPacket("ip4:103", "0.0.0.0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
Expand Down Expand Up @@ -88,32 +109,63 @@ func main() {

buf = gopacket.NewSerializeBuffer()

join := &pim.JoinPruneMessage{
UpstreamNeighborAddress: net.IP([]byte{169, 254, 1, 3}),
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Groups: []pim.Group{
{
AddressFamily: 1,
NumJoinedSources: 1,
NumPrunedSources: 0,
MaskLength: 32,
MulticastGroupAddress: net.IP([]byte{239, 0, 0, 3}),
Joins: []pim.SourceAddress{
{
AddressFamily: 1,
Flags: 7,
MaskLength: 32,
EncodingType: 0,
Address: pim.RpAddress,
var msg *pim.JoinPruneMessage
if *join {
msg = &pim.JoinPruneMessage{
UpstreamNeighborAddress: net.ParseIP(*upstreamNeighbor).To4(),
NumGroups: 1,
Reserved: 0,
Holdtime: uint16(*holdtime),
Groups: []pim.Group{
{
AddressFamily: 1,
NumJoinedSources: 1,
NumPrunedSources: 0,
MaskLength: 32,
MulticastGroupAddress: net.ParseIP(*group).To4(),
Joins: []pim.SourceAddress{
{
AddressFamily: 1,
Flags: 7,
MaskLength: 32,
EncodingType: 0,
Address: net.ParseIP(*rpAddress).To4(),
},
},
Prunes: []pim.SourceAddress{},
},
Prunes: []pim.SourceAddress{},
},
}}
}
}
if *prune {
msg = &pim.JoinPruneMessage{
UpstreamNeighborAddress: net.ParseIP(*upstreamNeighbor).To4(),
NumGroups: 1,
Reserved: 0,
Holdtime: uint16(*holdtime),
Groups: []pim.Group{
{
AddressFamily: 1,
NumJoinedSources: 0,
NumPrunedSources: 1,
MaskLength: 32,
MulticastGroupAddress: net.ParseIP(*group).To4(),
Joins: []pim.SourceAddress{},
Prunes: []pim.SourceAddress{
{
AddressFamily: 1,
Flags: 7,
MaskLength: 32,
EncodingType: 0,
Address: net.ParseIP(*rpAddress).To4(),
},
},
},
},
}
}

err = join.SerializeTo(buf, opts)
err = msg.SerializeTo(buf, opts)
if err != nil {
log.Fatalf("failed to serialize PIM JoinPrune msg %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions client/doublezerod/internal/pim/pim.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func decodePimHelloMessage(data []byte, p gopacket.PacketBuilder) error {
return err
}
if addr == nil {
return errors.New("Invalid address")
return errors.New("invalid address")
}

hello.SecondaryAddress = append(hello.SecondaryAddress, addr)
Expand All @@ -218,7 +218,7 @@ func decodePimJoinPruneMessage(data []byte, p gopacket.PacketBuilder) error {
return err
}
if addr == nil {
return errors.New("Invalid address")
return errors.New("invalid address")
}
joinPrune.UpstreamNeighborAddress = addr
data = data[len(addr)+2:]
Expand Down
15 changes: 10 additions & 5 deletions client/doublezerod/internal/pim/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var (
RpAddress = net.IP([]byte{10, 0, 0, 0})
)

const (
joinHoldtime = uint16(120) // ask upstream router to keep join state for 120 seconds
pruneHoldtime = uint16(5) // ask upstream router to flush join state after 5 seconds
)

type RawConner interface {
WriteTo(h *ipv4.Header, b []byte, cm *ipv4.ControlMessage) error
Close() error
Expand Down Expand Up @@ -62,7 +67,7 @@ func (s *PIMServer) Start(conn RawConner, iface string, tunnelAddr net.IP, group
if err != nil {
slog.Error("failed to send PIM hello msg", "error", err)
}
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, RpAddress, nil)
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, RpAddress, nil, joinHoldtime)
if err != nil {
slog.Error("failed to serialize PIM join msg", "error", err)
}
Expand All @@ -83,7 +88,7 @@ func (s *PIMServer) Start(conn RawConner, iface string, tunnelAddr net.IP, group
if err != nil {
slog.Error("failed to send PIM hello msg", "error", err)
}
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, RpAddress, nil)
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, RpAddress, nil, joinHoldtime)
if err != nil {
slog.Error("failed to serialize PIM join msg", "error", err)
}
Expand All @@ -92,7 +97,7 @@ func (s *PIMServer) Start(conn RawConner, iface string, tunnelAddr net.IP, group
slog.Error("failed to send PIM join msg", "error", err)
}
case <-s.done:
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, nil, RpAddress)
joinPruneMsgBuf, err := constructJoinPruneMessage(tunnelAddr, groups, nil, RpAddress, pruneHoldtime)
if err != nil {
slog.Error("failed to serialize PIM prune msg", "error", err)
}
Expand Down Expand Up @@ -142,7 +147,7 @@ func constructHelloMessage() (gopacket.SerializeBuffer, error) {
}

// TODO: at some point this could require multiple groups with joins/prunes mixed together
func constructJoinPruneMessage(upstreamNeighbor net.IP, multicastGroupAddresses []net.IP, joinSourceAddress net.IP, pruneSourceAddress net.IP) (gopacket.SerializeBuffer, error) {
func constructJoinPruneMessage(upstreamNeighbor net.IP, multicastGroupAddresses []net.IP, joinSourceAddress net.IP, pruneSourceAddress net.IP, holdtime uint16) (gopacket.SerializeBuffer, error) {
numGroups := len(multicastGroupAddresses)
opts := gopacket.SerializeOptions{}
buf := gopacket.NewSerializeBuffer()
Expand All @@ -152,7 +157,7 @@ func constructJoinPruneMessage(upstreamNeighbor net.IP, multicastGroupAddresses
UpstreamNeighborAddress: upstreamNeighbor,
NumGroups: uint8(numGroups),
Reserved: 0,
Holdtime: 210,
Holdtime: holdtime,
Groups: groups,
}

Expand Down
8 changes: 4 additions & 4 deletions client/doublezerod/internal/pim/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func checkJoinMesssage(t *testing.T, b []byte) {
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2deb,
Checksum: 0x2e45,
},
}

Expand All @@ -134,7 +134,7 @@ func checkJoinMesssage(t *testing.T, b []byte) {
UpstreamNeighborAddress: net.IP([]byte{169, 254, 0, 0}),
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 120,
Groups: []pim.Group{
{
GroupID: 0,
Expand Down Expand Up @@ -172,7 +172,7 @@ func checkPruneMesssage(t *testing.T, b []byte) {
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2deb,
Checksum: 0x2eb8,
},
}

Expand All @@ -185,7 +185,7 @@ func checkPruneMesssage(t *testing.T, b []byte) {
UpstreamNeighborAddress: net.IP([]byte{169, 254, 0, 0}),
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 5,
Groups: []pim.Group{
{
GroupID: 0,
Expand Down
16 changes: 8 additions & 8 deletions client/doublezerod/internal/runtime/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ func TestMulticastSubscriber(t *testing.T) {
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2eeb,
Checksum: 0x2f45,
},
}
if diff := cmp.Diff(got, want, cmpopts.IgnoreFields(pim.PIMMessage{}, "BaseLayer")); diff != "" {
Expand All @@ -1398,7 +1398,7 @@ func TestMulticastSubscriber(t *testing.T) {
UpstreamNeighborAddress: net.IP([]byte{169, 254, 0, 0}),
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 120,
Groups: []pim.Group{{
GroupID: 0,
AddressFamily: 1,
Expand Down Expand Up @@ -1456,7 +1456,7 @@ func TestMulticastSubscriber(t *testing.T) {
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2eeb,
Checksum: 0x2fb8,
},
}
if diff := cmp.Diff(got, want, cmpopts.IgnoreFields(pim.PIMMessage{}, "BaseLayer")); diff != "" {
Expand All @@ -1468,7 +1468,7 @@ func TestMulticastSubscriber(t *testing.T) {
UpstreamNeighborAddress: net.IP([]byte{169, 254, 0, 0}),
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 5,
Groups: []pim.Group{{
GroupID: 0,
AddressFamily: 1,
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func verifyPimJoinMessageSent(t *testing.T, pimJoinPruneChan chan []byte, upstre
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2deb,
Checksum: 0x2e45,
},
}
if diff := cmp.Diff(got, want, cmpopts.IgnoreFields(pim.PIMMessage{}, "BaseLayer")); diff != "" {
Expand All @@ -2217,7 +2217,7 @@ func verifyPimJoinMessageSent(t *testing.T, pimJoinPruneChan chan []byte, upstre
UpstreamNeighborAddress: upstreamNeighbor,
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 120,
Groups: []pim.Group{{
GroupID: 0,
AddressFamily: 1,
Expand Down Expand Up @@ -2266,7 +2266,7 @@ func verifyPruneMessageSent(t *testing.T, pimJoinPruneChan chan []byte, upstream
Header: pim.PIMHeader{
Version: 2,
Type: pim.JoinPrune,
Checksum: 0x2deb,
Checksum: 0x2eb8,
},
}
if diff := cmp.Diff(got, want, cmpopts.IgnoreFields(pim.PIMMessage{}, "BaseLayer")); diff != "" {
Expand All @@ -2278,7 +2278,7 @@ func verifyPruneMessageSent(t *testing.T, pimJoinPruneChan chan []byte, upstream
UpstreamNeighborAddress: upstreamNeighbor,
NumGroups: 1,
Reserved: 0,
Holdtime: 210,
Holdtime: 5,
Groups: []pim.Group{{
GroupID: 0,
AddressFamily: 1,
Expand Down