Skip to content

Commit

Permalink
Merge pull request #9811 from nelljerram/qos-openstack
Browse files Browse the repository at this point in the history
[RS-2320] Add QoS support in OpenStack code
  • Loading branch information
nelljerram authored Feb 14, 2025
2 parents 058e696 + 2d6dd0a commit 18cbece
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 29 deletions.
6 changes: 3 additions & 3 deletions networking-calico/devstack/plugin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ EOF
# support to the core DevStack repository.
iniset $NEUTRON_CONF DEFAULT core_plugin calico

# Reset service_plugins to be empty, as the Calico plugin
# itself supports the 'router' extension.
inidelete $NEUTRON_CONF DEFAULT service_plugins
# Calico itself implements the 'router' extension, but we need a service plugin
# for QoS.
iniset $NEUTRON_CONF DEFAULT service_plugins qos

# Propagate ENABLE_DEBUG_LOG_LEVEL to neutron.conf, so that
# it applies to the Calico DHCP agent on each compute node.
Expand Down
6 changes: 6 additions & 0 deletions networking-calico/networking_calico/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
help="When in a multi-region OpenStack deployment, a unique "
"name for the region that this node (controller or "
"compute) belongs to."),
# Max connection options, in advance of max connection support being added
# properly to the Neutron API.
cfg.IntOpt('max_ingress_connections_per_port', default=0,
help="If non-zero, a maximum number of ingress connections to impose on each port."),
cfg.IntOpt('max_egress_connections_per_port', default=0,
help="If non-zero, a maximum number of egress connections to impose on each port."),
]


Expand Down
3 changes: 3 additions & 0 deletions networking-calico/networking_calico/plugins/calico/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __init__(self):
# Add the ability to handle floating IPs.
self._supported_extension_aliases.extend(["router"])

# Add the ability to handle QoS.
self._supported_extension_aliases.extend(["qos", "qos-bw-limit-direction"])

# Suppress the Neutron server's DHCP agent scheduling. This is useful
# because it suppresses many WARNING logs that would otherwise appear,
# but that are actually spurious in a Calico/OpenStack deployment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# Ocata and earlier.
from neutron.db.l3_db import FloatingIP

from neutron.db.qos import models as qos_models

from networking_calico.common import config as calico_config
from networking_calico.compat import cfg
from networking_calico.compat import log
Expand Down Expand Up @@ -263,6 +265,7 @@ def add_extra_port_information(self, context, port):
self.add_port_interface_name(port)
self.add_port_project_data(port, context)
self.add_port_sg_names(port, context)
self.add_port_qos(port, context)

return port

Expand Down Expand Up @@ -303,6 +306,52 @@ def add_port_sg_names(self, port, context):
)
port[PORT_KEY_SG_NAMES][sg['id']] = sg_name

def add_port_qos(self, port, context):
"""add_port_qos
Determine and store QoS parameters for a port.
This method assumes it's being called from within a database
transaction and does not take out another one.
"""
qos = {}

qos_policy_id = port.get('qos_policy_id')
if qos_policy_id:
qos_policy = context.session.query(
qos_models.QosPolicy
).filter_by(
id=qos_policy_id
).first()
if qos_policy:
for r in qos_policy['rules']:
if r['type'] == "bandwidth_limit":
direction = r.get('direction', 'egress')
if r['max_kbps'] != 0:
if direction == "egress":
qos['egressBandwidth'] = r['max_kbps'] * 1000
else:
qos['ingressBandwidth'] = r['max_kbps'] * 1000
if r['max_burst_kbps'] != 0:
if direction == "egress":
qos['egressBurst'] = r['max_burst_kbps'] * 1000
else:
qos['ingressBurst'] = r['max_burst_kbps'] * 1000
elif r['type'] == "packet_rate_limit":
direction = r.get('direction', 'egress')
if r['max_kpps'] != 0:
if direction == "egress":
qos['egressPacketRate'] = r['max_kpps'] * 1000
else:
qos['ingressPacketRate'] = r['max_kpps'] * 1000

if cfg.CONF.calico.max_ingress_connections_per_port != 0:
qos['ingressMaxConnections'] = cfg.CONF.calico.max_ingress_connections_per_port
if cfg.CONF.calico.max_egress_connections_per_port != 0:
qos['egressMaxConnections'] = cfg.CONF.calico.max_egress_connections_per_port

port['qos'] = qos

def add_port_project_data(self, port, context):
"""add_port_project_data
Expand Down Expand Up @@ -442,6 +491,9 @@ def endpoint_spec(port):
if ip_nats:
data['ipNATs'] = ip_nats

if port['qos']:
data['qosControls'] = port['qos']

# Return that data.
return data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
sys.modules['neutron.db'] = m_neutron.db
sys.modules['neutron.db.models'] = m_neutron.db.models
sys.modules['neutron.db.models.l3'] = m_neutron.db.models.l3
sys.modules['neutron.db.qos'] = m_neutron.db.qos
sys.modules['neutron.openstack'] = m_neutron.openstack
sys.modules['neutron.openstack.common'] = m_neutron.openstack.common
sys.modules['neutron.openstack.common.db'] = m_neutron.openstack.common.db
Expand Down Expand Up @@ -255,9 +256,7 @@ def setUp(self):
self.db = mech_calico.plugin_dir.get_plugin()
self.db_context = mech_calico.ctx.get_admin_context()
self.db_context.to_dict.return_value = {}
self.db_context.session.query.return_value.filter_by.side_effect = (
self.db_query
)
self.db_context.session.query.side_effect = self.db_query

# Arrange what the DB's get_ports will return.
self.db.get_ports.side_effect = self.get_ports
Expand Down Expand Up @@ -610,31 +609,110 @@ def get_port_security_group_bindings(self, context, filters):
return [b for b in self.port_security_group_bindings
if b['port_id'] in allowed_ids]

def db_query(self, **kw):
def db_query(self, model, **kw):
m = mock.MagicMock()
if 'IPAllocation' in str(model.name):
m.filter_by.side_effect = self.db_query_ip_allocation
return m
if 'FloatingIP' in str(model.name):
m.filter_by.side_effect = self.db_query_floating_ip
return m
if 'Network' in str(model.name):
m.filter_by.side_effect = self.db_query_network
return m
if 'QosPolicy' in str(model.name):
m.filter_by.side_effect = self.db_query_qos_policy
return m
raise Exception("db_query model=%r kw=%r" % (model, kw))

def db_query_ip_allocation(self, **kw):
# 'port_id' query key for IPAllocations
if kw.get('port_id', None):
for port in self.osdb_ports:
if port['id'] == kw['port_id']:
return port['fixed_ips']
# 'fixed_port_id query key for FloatingIPs
elif kw.get('fixed_port_id', None):
fips = []
for fip in floating_ports:
if fip['fixed_port_id'] == kw['fixed_port_id']:
fips.append(fip)
return fips
for port in self.osdb_ports:
if port['id'] == kw['port_id']:
return port['fixed_ips']

def db_query_floating_ip(self, **kw):
fips = []
for fip in floating_ports:
if fip['fixed_port_id'] == kw['fixed_port_id']:
fips.append(fip)
return fips

def db_query_network(self, **kw):
# 'id' query key for Networks
elif kw.get('id', None):
for network in self.osdb_networks:
if network['id'] == kw['id']:
network_mock = mock.MagicMock()
network_mock.first.return_value = network
return network_mock
else:
raise Exception("db_query doesn't know how to handle kw=%r" % kw)

for network in self.osdb_networks:
if network['id'] == kw['id']:
network_mock = mock.MagicMock()
network_mock.first.return_value = network
return network_mock
return None

def db_query_qos_policy(self, **kw):
policies = {
# Example from
# https://docs.openstack.org/api-ref/network/v2/index.html#id695.
'1': {
"project_id": "8d4c70a21fed4aeba121a1a429ba0d04",
"tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
"id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"is_default": False,
"name": "10Mbit",
"description": "This policy limits the ports to 10Mbit max.",
"revision_number": 3,
"created_at": "2018-04-03T21:26:39Z",
"updated_at": "2018-04-03T21:26:39Z",
"shared": False,
"rules": [
{
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
"qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"max_kbps": 10000,
"max_burst_kbps": 0,
"type": "bandwidth_limit"
},
{
"id": "5f126d84-551a-4dcf-bb01-0e9c0df0c794",
"qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
"dscp_mark": 26,
"type": "dscp_marking"
}
],
"tags": ["tag1,tag2"]
},
# A policy that will set all possible fields.
'2': {
"id": "2",
"rules": [
{
"max_kbps": 1,
"max_burst_kbps": 2,
"direction": "ingress",
"type": "bandwidth_limit"
},
{
"max_kbps": 3,
"max_burst_kbps": 4,
"direction": "egress",
"type": "bandwidth_limit"
},
{
"max_kpps": 5,
"direction": "ingress",
"type": "packet_rate_limit"
},
{
"max_kpps": 6,
"direction": "egress",
"type": "packet_rate_limit"
},
],
},
}

m = mock.MagicMock()
m.first.return_value = policies.get(kw['id'], None)
return m


class FixedUUID(object):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ def setUp(self):
lib.m_compat.cfg.CONF.calico.etcd_compaction_period_mins = 0
lib.m_compat.cfg.CONF.calico.project_name_cache_max = 0
lib.m_compat.cfg.CONF.calico.openstack_region = self.region
lib.m_compat.cfg.CONF.calico.max_ingress_connections_per_port = 0
lib.m_compat.cfg.CONF.calico.max_egress_connections_per_port = 0
calico_config._reset_globals()
datamodel_v2._reset_globals()

Expand Down Expand Up @@ -525,8 +527,7 @@ def test_start_two_ports(self):
# Delete lib.port1.
context = self.make_context()
context._port = lib.port1
context._plugin_context.session.query.return_value.filter_by.\
side_effect = self.db_query
context._plugin_context.session.query.side_effect = self.db_query
self.driver.delete_port_postcommit(context)
self.assertEtcdWrites({})
self.assertEtcdDeletes(set([ep_deadbeef_key_v3]))
Expand Down Expand Up @@ -921,6 +922,68 @@ def test_start_two_ports(self):
self.assertEtcdWrites(expected_writes)
self.assertEtcdDeletes(set())

# Add a QoS policy.
context._port['qos_policy_id'] = '1'
self.osdb_ports[0]['qos_policy_id'] = '1'
self.driver.update_port_postcommit(context)

# Expected changes
ep_hello_value_v3['spec']['qosControls'] = {
'egressBandwidth': 10000000,
}
expected_writes = {
ep_hello_key_v3: ep_hello_value_v3,
sg_1_key_v3: sg_1_value_v3,
}
self.assertEtcdWrites(expected_writes)
self.assertEtcdDeletes(set())

# Add configuration for max connections.
lib.m_compat.cfg.CONF.calico.max_ingress_connections_per_port = 10
lib.m_compat.cfg.CONF.calico.max_egress_connections_per_port = 20
self.driver.update_port_postcommit(context)

# Expected changes
ep_hello_value_v3['spec']['qosControls'] = {
'egressBandwidth': 10000000,
'ingressMaxConnections': 10,
'egressMaxConnections': 20,
}
expected_writes = {
ep_hello_key_v3: ep_hello_value_v3,
sg_1_key_v3: sg_1_value_v3,
}
self.assertEtcdWrites(expected_writes)
self.assertEtcdDeletes(set())

# Change to a QoS policy that will set all possible settings.
context._port['qos_policy_id'] = '2'
self.osdb_ports[0]['qos_policy_id'] = '2'
self.driver.update_port_postcommit(context)

# Expected changes
ep_hello_value_v3['spec']['qosControls'] = {
'ingressBandwidth': 1000,
'egressBandwidth': 3000,
'ingressBurst': 2000,
'egressBurst': 4000,
'ingressPacketRate': 5000,
'egressPacketRate': 6000,
'ingressMaxConnections': 10,
'egressMaxConnections': 20,
}
expected_writes = {
ep_hello_key_v3: ep_hello_value_v3,
sg_1_key_v3: sg_1_value_v3,
}
self.assertEtcdWrites(expected_writes)
self.assertEtcdDeletes(set())

# Reset for future tests.
lib.m_compat.cfg.CONF.calico.max_ingress_connections_per_port = 0
lib.m_compat.cfg.CONF.calico.max_egress_connections_per_port = 0
del self.osdb_ports[0]['qos_policy_id']

# Reset the state for safety.
self.osdb_ports[0]['fixed_ips'] = old_ips

Expand Down

0 comments on commit 18cbece

Please sign in to comment.