-
Notifications
You must be signed in to change notification settings - Fork 160
/
router.go
189 lines (178 loc) · 6.22 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2016 ETH Zurich
// Copyright 2018 ETH Zurich, Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains the main router processing loop.
package main
import (
"sync"
"github.com/scionproto/scion/go/border/brconf"
"github.com/scionproto/scion/go/border/internal/metrics"
"github.com/scionproto/scion/go/border/rcmn"
"github.com/scionproto/scion/go/border/rctrl"
"github.com/scionproto/scion/go/border/rctx"
"github.com/scionproto/scion/go/border/rpkt"
"github.com/scionproto/scion/go/lib/assert"
"github.com/scionproto/scion/go/lib/common"
"github.com/scionproto/scion/go/lib/fatal"
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/ringbuf"
_ "github.com/scionproto/scion/go/lib/scrypto" // Make sure math/rand is seeded
)
const processBufCnt = 128
type Router struct {
// Id is the SCION element ID, e.g. "br4-ff00:0:2f".
Id string
// confDir is the directory containing the configuration file.
confDir string
// freePkts is a ring-buffer of unused packets.
freePkts *ringbuf.Ring
// sRevInfoQ is a channel for handling SignedRevInfo payloads.
sRevInfoQ chan rpkt.RawSRevCallbackArgs
// pktErrorQ is a channel for handling packet errors
pktErrorQ chan pktErrorArgs
// setCtxMtx serializes modifications to the router context. Topology updates
// can either be caused by a sighup reload, receiving an updated dynamic or
// static topology from the discovery service, or from dropping an expired
// dynamic topology.
setCtxMtx sync.Mutex
}
func NewRouter(id, confDir string) (*Router, error) {
r := &Router{Id: id, confDir: confDir}
if err := r.setup(); err != nil {
return nil, err
}
return r, nil
}
// Start sets up networking, and starts go routines for handling the main packet
// processing as well as various other router functions.
func (r *Router) Start() {
go func() {
defer log.LogPanicAndExit()
r.PacketError()
}()
go func() {
defer log.LogPanicAndExit()
rctrl.Control(r.sRevInfoQ, cfg.General.ReconnectToDispatcher)
}()
if err := r.startDiscovery(); err != nil {
fatal.Fatal(common.NewBasicError("Unable to start discovery", err))
}
}
// ReloadConfig handles reloading the configuration when SIGHUP is received.
func (r *Router) ReloadConfig() error {
var err error
var config *brconf.BRConf
if config, err = r.loadNewConfig(); err != nil {
return common.NewBasicError("Unable to load config", err)
}
if err := r.setupCtxFromConfig(config); err != nil {
return common.NewBasicError("Unable to set up new context", err)
}
return nil
}
func (r *Router) handleSock(s *rctx.Sock, stop, stopped chan struct{}) {
defer log.LogPanicAndExit()
defer close(stopped)
pkts := make(ringbuf.EntryList, processBufCnt)
dst := s.Conn.LocalAddr()
log.Debug("handleSock starting", "addr", dst)
for {
n, _ := s.Ring.Read(pkts, true)
if n < 0 {
log.Debug("handleSock stopping", "addr", dst)
return
}
for i := 0; i < n; i++ {
rp := pkts[i].(*rpkt.RtrPkt)
r.processPacket(rp)
rp.Release()
pkts[i] = nil
}
}
}
// processPacket is the heart of the router's packet handling. It delegates
// everything from parsing the incoming packet, to routing the outgoing packet.
func (r *Router) processPacket(rp *rpkt.RtrPkt) {
if assert.On {
assert.Must(rp.DirFrom != rcmn.DirUnset, "DirFrom must be set")
assert.Must(rp.Ingress.Dst != nil, "Ingress.Dst must be set")
assert.Must(rp.Ingress.Src != nil, "Ingress.Src must be set")
assert.Must(rp.Ctx != nil, "Context must be set")
if rp.DirFrom == rcmn.DirLocal {
assert.Must(rp.Ingress.IfID == 0, "Ingress.IfID must not be set for DirFrom==DirLocal")
} else {
assert.Must(rp.Ingress.IfID > 0, "Ingress.IfID must be set for DirFrom==DirExternal")
}
}
l := metrics.ProcessLabels{
In: metrics.IntfToLabel(rp.Ingress.IfID),
}
// Assign a pseudorandom ID to the packet, for correlating log entries.
rp.Id = log.RandId(4)
rp.Logger = log.New("rpkt", rp.Id)
// XXX(kormat): uncomment for debugging:
//rp.Debug("processPacket", "raw", rp.Raw)
if err := rp.Parse(); err != nil {
r.handlePktError(rp, err, "Error parsing packet")
l.Result = metrics.ErrParse
metrics.Process.PktsWith(l).Inc()
return
}
// Validation looks for errors in the packet that didn't break basic
// parsing.
valid, err := rp.Validate()
if err != nil {
r.handlePktError(rp, err, "Error validating packet")
l.Result = metrics.ErrValidate
metrics.Process.PktsWith(l).Inc()
return
}
if !valid {
rp.Error("Error validating packet, no specific error")
l.Result = metrics.ErrValidate
metrics.Process.PktsWith(l).Inc()
return
}
// Check if the packet needs to be processed locally, and if so register
// hooks for doing so.
if err := rp.NeedsLocalProcessing(); err != nil {
rp.Error("Error checking for local processing", "err", err)
l.Result = metrics.ErrProcessLocal
metrics.Process.PktsWith(l).Inc()
return
}
// Parse the packet payload, if a previous step has registered a relevant
// hook for doing so.
if _, err := rp.Payload(true); err != nil {
// Any errors at this point are application-level, and hence not
// calling handlePktError, as no SCMP errors will be sent.
rp.Error("Error parsing payload", "err", err)
l.Result = metrics.ErrParsePayload
metrics.Process.PktsWith(l).Inc()
return
}
// Process the packet, if a previous step has registered a relevant hook for doing so.
if err := rp.Process(); err != nil {
r.handlePktError(rp, err, "Error processing packet")
l.Result = metrics.ErrProcess
metrics.Process.PktsWith(l).Inc()
return
}
// Forward the packet. Packets destined to self are forwarded to the local dispatcher.
if err := rp.Route(); err != nil {
r.handlePktError(rp, err, "Error routing packet")
l.Result = metrics.ErrRoute
metrics.Process.PktsWith(l).Inc()
}
}