Author: wangkuntian Date: Fri Oct 13 16:25:17 2023 +0800 feat: add distributed traffic feature --- agent/l3/dvr_edge_ha_router.py | 4 +- agent/l3/extensions/rg_port_forwarding.py | 398 ++++++++++++++++++++++++++++ agent/l3/ha.py | 9 - agent/l3/ha_router.py | 74 +++--- agent/l3/keepalived_state_change.py | 12 +- agent/l3/router_info.py | 19 +- agent/linux/dhcp.py | 137 +++++++++- agent/linux/interface.py | 43 +-- agent/linux/keepalived.py | 1 + api/rpc/agentnotifiers/dhcp_rpc_agent_api.py | 9 + api/rpc/callbacks/resources.py | 3 + conf/agent/l3/keepalived.py | 2 + conf/common.py | 16 +- conf/policies/__init__.py | 2 + conf/policies/rg_port_forwarding.py | 76 ++++++ db/l3_attrs_db.py | 7 +- db/l3_db.py | 27 +- db/l3_hamode_db.py | 97 +++++++ db/migration/alembic_migrations/versions/EXPAND_HEAD | 2 +- .../train/expand/1c19a98b5eef_add_router_configurations.py | 36 +++ .../expand/cab12b72ed90_add_router_gateway_port_forwarding.py | 55 ++++ db/models/l3_attrs.py | 2 + db/models/rg_port_forwarding.py | 59 +++++ extensions/rg_port_forwarding.py | 119 +++++++++ objects/rg_port_forwarding.py | 87 ++++++ objects/router.py | 15 +- scheduler/l3_agent_scheduler.py | 107 ++++++-- services/l3_router/l3_router_plugin.py | 12 + services/rg_portforwarding/__init__.py | 0 services/rg_portforwarding/common/__init__.py | 0 services/rg_portforwarding/common/exceptions.py | 77 ++++++ services/rg_portforwarding/pf_plugin.py | 369 ++++++++++++++++++++++++++ 32 files changed, 1749 insertions(+), 127 deletions(-) diff --git a/agent/l3/dvr_edge_ha_router.py b/agent/l3/dvr_edge_ha_router.py index 71f740bef9..b92f70b70f 100644 --- a/agent/l3/dvr_edge_ha_router.py +++ b/agent/l3/dvr_edge_ha_router.py @@ -114,9 +114,7 @@ class DvrEdgeHaRouter(dvr_edge_router.DvrEdgeRouter, def _external_gateway_added(self, ex_gw_port, interface_name, ns_name, preserve_ips): - link_up = self.external_gateway_link_up() - self._plug_external_gateway(ex_gw_port, interface_name, ns_name, - link_up=link_up) + self._plug_external_gateway(ex_gw_port, interface_name, ns_name) def _is_this_snat_host(self): return self.agent_conf.agent_mode == constants.L3_AGENT_MODE_DVR_SNAT diff --git a/agent/l3/extensions/rg_port_forwarding.py b/agent/l3/extensions/rg_port_forwarding.py new file mode 100644 index 0000000000..a159e4df34 --- /dev/null +++ b/agent/l3/extensions/rg_port_forwarding.py @@ -0,0 +1,398 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +from typing import Optional, List +from oslo_concurrency import lockutils +from oslo_log import log as logging + +from neutron_lib import constants +from neutron_lib.rpc import Connection +from neutron_lib.context import Context +from neutron_lib.agent import l3_extension + +from neutron.agent.linux.ip_lib import IPDevice +from neutron.agent.l3.router_info import RouterInfo +from neutron.agent.linux.iptables_manager import IptablesManager + +from neutron.api.rpc.handlers import resources_rpc +from neutron.api.rpc.callbacks import resources, events +from neutron.api.rpc.callbacks.consumer import registry + +from neutron.common import coordination + +from neutron.objects.ports import Port +from neutron.objects.router import Router +from neutron.objects.rg_port_forwarding import RGPortForwarding + +LOG = logging.getLogger(__name__) + +PORT_FORWARDING_PREFIX = 'rg_portforwarding-' +DEFAULT_PORT_FORWARDING_CHAIN = 'rg-pf' +PORT_FORWARDING_CHAIN_PREFIX = 'pf-' + + +def _get_port_forwarding_chain_name(pf_id): + chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id + return chain_name[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP] + + +class RGPortForwardingMapping(object): + def __init__(self): + self.managed_port_forwardings = {} + self.router_pf_mapping = collections.defaultdict(set) + + @lockutils.synchronized('rg-port-forwarding-cache') + def check_port_forwarding_changes(self, new_pf: RGPortForwarding) -> bool: + old_pf = self.managed_port_forwardings.get(new_pf.id) + return old_pf != new_pf + + @lockutils.synchronized('rg-port-forwarding-cache') + def set_port_forwardings(self, port_forwardings: List[RGPortForwarding]): + for port_forwarding in port_forwardings: + self._set_router_port_forwarding(port_forwarding, + port_forwarding.router_id) + + def _set_router_port_forwarding(self, + port_forwarding: RGPortForwarding, + router_id: str): + self.router_pf_mapping[router_id].add(port_forwarding.id) + self.managed_port_forwardings[port_forwarding.id] = port_forwarding + + @lockutils.synchronized('rg-port-forwarding-cache') + def update_port_forwardings(self, port_forwardings): + for port_forwarding in port_forwardings: + self.managed_port_forwardings[port_forwarding.id] = port_forwarding + + @lockutils.synchronized('rg-port-forwarding-cache') + def del_port_forwardings(self, port_forwardings): + for port_forwarding in port_forwardings: + if not self.managed_port_forwardings.get(port_forwarding.id): + continue + self.managed_port_forwardings.pop(port_forwarding.id) + self.router_pf_mapping[port_forwarding.router_id].discard( + port_forwarding.id) + if not self.router_pf_mapping[port_forwarding.router_id]: + self.router_pf_mapping.pop(port_forwarding.router_id) + + @lockutils.synchronized('rg-port-forwarding-cache') + def clean_port_forwardings_by_router_id(self, router_id: str): + pf_ids = self.router_pf_mapping.pop(router_id, []) + for pf_id in pf_ids: + self.managed_port_forwardings.pop(pf_id, None) + + +class RGPortForwardingAgentExtension(l3_extension.L3AgentExtension): + SUPPORTED_RESOURCE_TYPES = [resources.RGPORTFORWARDING] + + def consume_api(self, agent_api): + self.agent_api = agent_api + + def initialize(self, connection, driver_type): + self.mapping = RGPortForwardingMapping() + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() + self._register_rpc_consumers() + + def _register_rpc_consumers(self): + registry.register(self._handle_notification, + resources.RGPORTFORWARDING) + self._connection = Connection() + endpoints = [resources_rpc.ResourcesPushRpcCallback()] + topic = resources_rpc.resource_type_versioned_topic( + resources.RGPORTFORWARDING) + self._connection.create_consumer(topic, endpoints, fanout=True) + self._connection.consume_in_threads() + + def _handle_notification(self, context: Context, + resource_type: str, + forwardings, event_type): + for forwarding in forwardings: + self._process_port_forwarding_event( + context, forwarding, event_type) + + def _get_gw_port_and_ip(self, + ri: RouterInfo) -> (Optional[Port], Optional[str]): + ex_gw_port = ri.get_ex_gw_port() + ex_gw_port_ip = self._get_gw_port_ip(ex_gw_port) + if not ex_gw_port_ip: + LOG.error(f"Router {ri.router_id} external port " + f"{ex_gw_port['id']} does not have any IP addresses") + return None, None + return ex_gw_port, ex_gw_port_ip + + def _process_port_forwarding_event(self, context: Context, + port_forwarding: RGPortForwarding, + event_type: str): + router_id = port_forwarding.router_id + ri = self._get_router_info(router_id) + if not self._check_if_need_process(ri, force=True): + return + + ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri) + if not ex_gw_port or not ex_gw_port_ip: + return + + (interface_name, namespace, + iptables_manager) = self._get_resource_by_router(ri, ex_gw_port) + + if event_type == events.CREATED: + self._process_create([port_forwarding], ri, interface_name, + ex_gw_port_ip, namespace, iptables_manager) + elif event_type == events.UPDATED: + self._process_update([port_forwarding], interface_name, + ex_gw_port_ip, namespace, iptables_manager) + elif event_type == events.DELETED: + self._process_delete([port_forwarding], interface_name, + ex_gw_port_ip, namespace, iptables_manager) + + def ha_state_change(self, context: Context, data: Router) -> None: + pass + + def update_network(self, context: Context, data: dict) -> None: + pass + + def add_router(self, context: Context, data: Router) -> None: + LOG.info(f"call add_router for {data['id']}") + self.process_port_forwarding(context, data) + + def update_router(self, context: Context, data: Router) -> None: + LOG.info(f"call update_router for {data['id']}") + self.process_port_forwarding(context, data) + + def delete_router(self, context: Context, data: Router) -> None: + self.mapping.clean_port_forwardings_by_router_id(data['id']) + + def _get_router_info(self, router_id) -> Optional[RouterInfo]: + router_info = self.agent_api.get_router_info(router_id) + if router_info: + return router_info + LOG.debug("Router %s is not managed by this agent. " + "It was possibly deleted concurrently.", router_id) + + @staticmethod + def _check_if_need_process(ri: RouterInfo, force: bool = False) -> bool: + if not ri or not ri.get_ex_gw_port(): + return False + + if force: + return True + + is_distributed = ri.router.get('distributed') + agent_mode = ri.agent_conf.agent_mode + if (is_distributed and + agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL, + constants.L3_AGENT_MODE_DVR]): + # just support centralized cases + return False + + if is_distributed and not ri.snat_namespace.exists(): + return False + + return True + + def process_port_forwarding(self, context: Context, data: Router): + ri = self._get_router_info(data['id']) + if not self._check_if_need_process(ri): + return + self.check_local_port_forwardings(context, ri) + + @staticmethod + def _get_gw_port_ip(gw_port: dict) -> Optional[str]: + fixed_ips = gw_port.get('fixed_ips', []) + if not fixed_ips: + return + return fixed_ips[0].get('ip_address', None) + + @staticmethod + def _get_resource_by_router(ri: RouterInfo, ex_gw_port: dict) -> ( + str, str, IptablesManager): + is_distributed = ri.router.get('distributed') + if is_distributed: + interface_name = ri.get_snat_external_device_interface_name( + ex_gw_port) + namespace = ri.snat_namespace.name + iptables_manager = ri.snat_iptables_manager + else: + interface_name = ri.get_external_device_interface_name(ex_gw_port) + namespace = ri.ns_name + iptables_manager = ri.iptables_manager + + return interface_name, namespace, iptables_manager + + def check_local_port_forwardings(self, context: Context, ri: RouterInfo): + pfs = self.resource_rpc.bulk_pull( + context, resources.RGPORTFORWARDING, + filter_kwargs={'router_id': ri.router_id}) + if not pfs: + return + ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri) + if not ex_gw_port_ip or not ex_gw_port_ip: + return + (interface_name, namespace, + iptables_manager) = self._get_resource_by_router(ri, ex_gw_port) + local_pfs = set(self.mapping.managed_port_forwardings.keys()) + new_pfs = [] + updated_pfs = [] + current_pfs = set() + for pf in pfs: + if pf.id in self.mapping.managed_port_forwardings: + if self.mapping.check_port_forwarding_changes(pf): + updated_pfs.append(pf) + else: + new_pfs.append(pf) + current_pfs.add(pf.id) + + remove_pf_ids_set = local_pfs - current_pfs + remove_pfs = [self.mapping.managed_port_forwardings[pf_id] + for pf_id in remove_pf_ids_set] + + self._process_create(new_pfs, ri, interface_name, ex_gw_port_ip, + namespace, iptables_manager) + + self._process_update(updated_pfs, interface_name, ex_gw_port_ip, + namespace, iptables_manager) + + self._process_delete(remove_pfs, interface_name, ex_gw_port_ip, + namespace, iptables_manager) + + @staticmethod + def _install_default_rules(iptables_manager: IptablesManager): + default_rule = '-j %s-%s' % (iptables_manager.wrap_name, + DEFAULT_PORT_FORWARDING_CHAIN) + LOG.info(f'Add default chain {DEFAULT_PORT_FORWARDING_CHAIN}') + LOG.info(f'Add default rule {default_rule}') + iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN) + iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule) + iptables_manager.apply() + + @staticmethod + def _get_rg_rules(port_forward: RGPortForwarding, wrap_name: str): + chain_rule_list = [] + pf_chain_name = _get_port_forwarding_chain_name(port_forward.id) + chain_rule_list.append( + (DEFAULT_PORT_FORWARDING_CHAIN, f'-j {wrap_name}-{pf_chain_name}')) + gw_ip_address = port_forward.gw_ip_address + protocol = port_forward.protocol + internal_ip_address = str(port_forward.internal_ip_address) + internal_port = port_forward.internal_port + external_port = port_forward.external_port + chain_rule = ( + pf_chain_name, + f'-d {gw_ip_address}/32 -p {protocol} -m {protocol} ' + f'--dport {external_port} ' + f'-j DNAT --to-destination {internal_ip_address}:{internal_port}' + ) + chain_rule_list.append(chain_rule) + return chain_rule_list + + def _rule_apply(self, + iptables_manager: IptablesManager, + port_forwarding: RGPortForwarding, + rule_tag: str): + iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag) + if (DEFAULT_PORT_FORWARDING_CHAIN not in + iptables_manager.ipv4['nat'].chains): + self._install_default_rules(iptables_manager) + + for chain, rule in self._get_rg_rules(port_forwarding, + iptables_manager.wrap_name): + LOG.info(f'Add router gateway port forwarding ' + f'rule {rule} in {chain}') + if chain not in iptables_manager.ipv4['nat'].chains: + iptables_manager.ipv4['nat'].add_chain(chain) + iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag) + + def _store_local(self, pf_objs: List[RGPortForwarding], event_type: str): + if event_type == events.CREATED: + self.mapping.set_port_forwardings(pf_objs) + elif event_type == events.UPDATED: + self.mapping.update_port_forwardings(pf_objs) + elif event_type == events.DELETED: + self.mapping.del_port_forwardings(pf_objs) + + def _process_create(self, + port_forwardings: List[RGPortForwarding], + ri: RouterInfo, + interface_name: str, + interface_ip: str, + namespace: str, + iptables_manager: IptablesManager): + if not port_forwardings: + return + + ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None) + if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE: + ri.enable_keepalived() + + for port_forwarding in port_forwardings: + if port_forwarding.id in self.mapping.managed_port_forwardings: + LOG.debug("Skip port forwarding %s for create, as it had been " + "managed by agent", port_forwarding.id) + continue + rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id + port_forwarding.gw_ip_address = interface_ip + self._rule_apply(iptables_manager, port_forwarding, rule_tag) + iptables_manager.apply() + self._store_local(port_forwardings, events.CREATED) + + def _process_update(self, + port_forwardings: List[RGPortForwarding], + interface_name: str, + interface_ip: str, + namespace: str, + iptables_manager: IptablesManager): + if not port_forwardings: + return + device = IPDevice(interface_name, namespace=namespace) + for port_forwarding in port_forwardings: + # check if port forwarding change from OVO and router rpc + if not self.mapping.check_port_forwarding_changes(port_forwarding): + LOG.debug("Skip port forwarding %s for update, as there is no " + "difference between the memory managed by agent", + port_forwarding.id) + continue + current_chain = _get_port_forwarding_chain_name(port_forwarding.id) + iptables_manager.ipv4['nat'].remove_chain(current_chain) + ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id] + device.delete_socket_conntrack_state(interface_ip, + ori_pf.external_port, + protocol=ori_pf.protocol) + rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id + port_forwarding.gw_ip_address = interface_ip + self._rule_apply(iptables_manager, port_forwarding, rule_tag) + iptables_manager.apply() + self._store_local(port_forwardings, events.UPDATED) + + @coordination.synchronized('router-lock-ns-{namespace}') + def _process_delete(self, + port_forwardings: List[RGPortForwarding], + interface_name: str, + interface_ip: str, + namespace: str, + iptables_manager: IptablesManager): + if not port_forwardings: + return + device = IPDevice(interface_name, namespace=namespace) + for port_forwarding in port_forwardings: + current_chain = _get_port_forwarding_chain_name(port_forwarding.id) + iptables_manager.ipv4['nat'].remove_chain(current_chain) + device.delete_socket_conntrack_state( + interface_ip, + port_forwarding.external_port, + protocol=port_forwarding.protocol) + + iptables_manager.apply() + + self._store_local(port_forwardings, events.DELETED) diff --git a/agent/l3/ha.py b/agent/l3/ha.py index 17891dc983..182fa68175 100644 --- a/agent/l3/ha.py +++ b/agent/l3/ha.py @@ -163,15 +163,6 @@ class AgentMixin(object): 'agent %(host)s', state_change_data) - # Set external gateway port link up or down according to state - if state == 'master': - ri.set_external_gw_port_link_status(link_up=True, set_gw=True) - elif state == 'backup': - ri.set_external_gw_port_link_status(link_up=False) - else: - LOG.warning('Router %s has status %s, ' - 'no action to router gateway device.', - router_id, state) # TODO(dalvarez): Fix bug 1677279 by moving the IPv6 parameters # configuration to keepalived-state-change in order to remove the # dependency that currently exists on l3-agent running for the IPv6 diff --git a/agent/l3/ha_router.py b/agent/l3/ha_router.py index 0a21902771..ef10ff76e4 100644 --- a/agent/l3/ha_router.py +++ b/agent/l3/ha_router.py @@ -17,6 +17,7 @@ import shutil import signal import netaddr +from typing import Optional, List from neutron_lib.api.definitions import portbindings from neutron_lib import constants as n_consts from neutron_lib.utils import runtime @@ -137,6 +138,22 @@ class HaRouter(router.RouterInfo): else: return False + @property + def configurations(self) -> Optional[dict]: + return self.router.get('configurations', {}) + + @property + def master(self) -> Optional[str]: + if self.configurations: + return self.configurations.get('master_agent', None) + return None + + @property + def slaves(self) -> Optional[List[str]]: + if self.configurations: + return self.configurations.get('slave_agents', []) + return [] + def initialize(self, process_monitor): ha_port = self.router.get(n_consts.HA_INTERFACE_KEY) if not ha_port: @@ -162,19 +179,32 @@ class HaRouter(router.RouterInfo): throttle_restart_value=( self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER)) + # The following call is required to ensure that if the state path does + # not exist it gets created. + self.keepalived_manager.get_full_config_file_path('test') + config = self.keepalived_manager.config interface_name = self.get_ha_device_name() subnets = self.ha_port.get('subnets', []) ha_port_cidrs = [subnet['cidr'] for subnet in subnets] + nopreempt = True + state = 'BACKUP' + priority = self.ha_priority + if self.slaves and self.master: + nopreempt = False + if self.master == self.agent_conf.host: + state = 'MASTER' + priority = keepalived.HA_DEFAULT_MASTER_PRIORITY + instance = keepalived.KeepalivedInstance( - 'BACKUP', + state, interface_name, self.ha_vr_id, ha_port_cidrs, - nopreempt=True, + nopreempt=nopreempt, advert_int=self.agent_conf.ha_vrrp_advert_int, - priority=self.ha_priority, + priority=priority, vrrp_health_check_interval=( self.agent_conf.ha_vrrp_health_check_interval), ha_conf_dir=self.keepalived_manager.get_conf_dir()) @@ -396,13 +426,16 @@ class HaRouter(router.RouterInfo): ha_device = self.get_ha_device_name() ha_cidr = self._get_primary_vip() config_dir = self.keepalived_manager.get_conf_dir() - state_change_log = ( - "%s/neutron-keepalived-state-change.log") % config_dir + state_change_log = f"{config_dir}/neutron-keepalived-state-change.log" def callback(pid_file): + LOG.info(f'Router: {self.router_id} master is {self.master}, ' + f'salves are {self.slaves}.') cmd = [ STATE_CHANGE_PROC_NAME, '--router_id=%s' % self.router_id, + '--master_agent=%s' % self.master, + '--slave_agents=%s' % ','.join(self.slaves), '--namespace=%s' % self.ha_namespace, '--conf_dir=%s' % config_dir, '--log-file=%s' % state_change_log, @@ -453,9 +486,7 @@ class HaRouter(router.RouterInfo): return port1_filtered == port2_filtered def external_gateway_added(self, ex_gw_port, interface_name): - link_up = self.external_gateway_link_up() - self._plug_external_gateway(ex_gw_port, interface_name, - self.ns_name, link_up=link_up) + self._plug_external_gateway(ex_gw_port, interface_name, self.ns_name) self._add_gateway_vip(ex_gw_port, interface_name) self._disable_ipv6_addressing_on_interface(interface_name) @@ -519,30 +550,3 @@ class HaRouter(router.RouterInfo): if (self.keepalived_manager.get_process().active and self.ha_state == 'master'): super(HaRouter, self).enable_radvd(internal_ports) - - def external_gateway_link_up(self): - # Check HA router ha_state for its gateway port link state. - # 'backup' instance will not link up the gateway port. - return self.ha_state == 'master' - - def set_external_gw_port_link_status(self, link_up, set_gw=False): - link_state = "up" if link_up else "down" - LOG.info('Set router %s gateway device link state to %s.', - self.router_id, link_state) - - ex_gw_port = self.get_ex_gw_port() - ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or - self.ex_gw_port and self.ex_gw_port['id']) - if ex_gw_port_id: - interface_name = self.get_external_device_name(ex_gw_port_id) - ns_name = self.get_gw_ns_name() - if (not self.driver.set_link_status( - interface_name, namespace=ns_name, link_up=link_up) and - link_up): - LOG.error('Gateway interface for router %s was not set up; ' - 'router will not work properly', self.router_id) - if link_up and set_gw: - preserve_ips = self.get_router_preserve_ips() - self._external_gateway_settings(ex_gw_port, interface_name, - ns_name, preserve_ips) - self.routes_updated([], self.routes) diff --git a/agent/l3/keepalived_state_change.py b/agent/l3/keepalived_state_change.py index 7fd9e4269e..8c10a8b00f 100644 --- a/agent/l3/keepalived_state_change.py +++ b/agent/l3/keepalived_state_change.py @@ -47,9 +47,12 @@ class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection): class MonitorDaemon(daemon.Daemon): - def __init__(self, pidfile, router_id, user, group, namespace, conf_dir, - interface, cidr): + def __init__(self, pidfile, host, router_id, master, slaves, user, group, + namespace, conf_dir, interface, cidr): + self.host = host self.router_id = router_id + self.master = master + self.slaves = slaves self.namespace = namespace self.conf_dir = conf_dir self.interface = interface @@ -62,6 +65,8 @@ class MonitorDaemon(daemon.Daemon): user=user, group=group) def run(self): + LOG.debug(f'Router: {self.router_id} master is {self.master}, ' + f'salves are {self.slaves}.') self._thread_ip_monitor = threading.Thread( target=ip_lib.ip_monitor, args=(self.namespace, self.queue, self.event_stop, @@ -169,7 +174,10 @@ def main(): keepalived.register_l3_agent_keepalived_opts() configure(cfg.CONF) MonitorDaemon(cfg.CONF.pid_file, + cfg.CONF.host, cfg.CONF.router_id, + cfg.CONF.master_agent, + cfg.CONF.slave_agents, cfg.CONF.user, cfg.CONF.group, cfg.CONF.namespace, diff --git a/agent/l3/router_info.py b/agent/l3/router_info.py index ea2b488cd2..eabdcc6e54 100644 --- a/agent/l3/router_info.py +++ b/agent/l3/router_info.py @@ -697,16 +697,14 @@ class RouterInfo(BaseRouterInfo): return [common_utils.ip_to_cidr(ip['floating_ip_address']) for ip in floating_ips] - def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name, - link_up=True): + def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name): self.driver.plug(ex_gw_port['network_id'], ex_gw_port['id'], interface_name, ex_gw_port['mac_address'], namespace=ns_name, prefix=EXTERNAL_DEV_PREFIX, - mtu=ex_gw_port.get('mtu'), - link_up=link_up) + mtu=ex_gw_port.get('mtu')) def _get_external_gw_ips(self, ex_gw_port): gateway_ips = [] @@ -766,11 +764,7 @@ class RouterInfo(BaseRouterInfo): LOG.debug("External gateway added: port(%s), interface(%s), ns(%s)", ex_gw_port, interface_name, ns_name) self._plug_external_gateway(ex_gw_port, interface_name, ns_name) - self._external_gateway_settings(ex_gw_port, interface_name, - ns_name, preserve_ips) - def _external_gateway_settings(self, ex_gw_port, interface_name, - ns_name, preserve_ips): # Build up the interface and gateway IP addresses that # will be added to the interface. ip_cidrs = common_utils.fixed_ip_cidrs(ex_gw_port['fixed_ips']) @@ -815,19 +809,18 @@ class RouterInfo(BaseRouterInfo): return any(netaddr.IPAddress(gw_ip).version == 6 for gw_ip in gateway_ips) - def get_router_preserve_ips(self): + def external_gateway_added(self, ex_gw_port, interface_name): preserve_ips = self._list_floating_ip_cidrs() + list( self.centralized_port_forwarding_fip_set) preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) - return preserve_ips - def external_gateway_added(self, ex_gw_port, interface_name): - preserve_ips = self.get_router_preserve_ips() self._external_gateway_added( ex_gw_port, interface_name, self.ns_name, preserve_ips) def external_gateway_updated(self, ex_gw_port, interface_name): - preserve_ips = self.get_router_preserve_ips() + preserve_ips = self._list_floating_ip_cidrs() + list( + self.centralized_port_forwarding_fip_set) + preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) self._external_gateway_added( ex_gw_port, interface_name, self.ns_name, preserve_ips) diff --git a/agent/linux/dhcp.py b/agent/linux/dhcp.py index 249e1a8199..5b82fe328f 100644 --- a/agent/linux/dhcp.py +++ b/agent/linux/dhcp.py @@ -19,6 +19,7 @@ import os import re import shutil import time +from typing import List import netaddr from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext @@ -40,6 +41,7 @@ from neutron.agent.linux import iptables_manager from neutron.cmd import runtime_checks as checks from neutron.common import ipv6_utils from neutron.common import utils as common_utils +from neutron.conf.common import NETWORK_HOST_OPTS from neutron.ipam import utils as ipam_utils LOG = logging.getLogger(__name__) @@ -81,6 +83,51 @@ def port_requires_dhcp_configuration(port): constants.DEVICE_OWNER_DHCP] +class Octopus(object): + def __init__(self): + self.tentacles = collections.defaultdict(Tentacle) + + def __str__(self): + lines = [''] + for subnet_id, tentacle in self.tentacles.items(): + line = (f"Subnet {subnet_id} has multi routers " + f"{len(tentacle.gateway_ports) > 1} " + f"{tentacle}") + lines.append(line) + return '\n'.join(lines) + + +class Tentacle(object): + def __init__(self, gateway_ip: str): + self.gateway_ip = gateway_ip + self.tags = [] + self.gateway_ports = collections.defaultdict(Sucker) + self.suckers = collections.defaultdict(Sucker) + + def __str__(self): + lines = [""] + for port_id, sucker in self.suckers.items(): + line = f" Port {port_id} {sucker}" + lines.append(line) + return '\n'.join(lines) + + +class Sucker(object): + def __init__(self, host: str, device_owner: str, ip_address: str): + self.host = host + self.device_owner = device_owner + self.ip_address = ip_address + self.tag = None + + def subnet_tag(self, subnet_id: str): + return f'{self.host}-subnet-{subnet_id}' + + def __str__(self): + return (f"ip: {self.ip_address} \t" + f"binding_host: {self.host} \t" + f"device_owner: {self.device_owner}") + + class DictModel(dict): """Convert dict into an object that provides attribute access to values.""" @@ -149,6 +196,10 @@ class DhcpBase(object): self.process_monitor = process_monitor self.device_manager = DeviceManager(self.conf, plugin) self.version = version + self.octopus = Octopus() + self._init_octopus() + self.compute_to_network = dict() + self._init_compute_to_network() @abc.abstractmethod def enable(self): @@ -193,6 +244,31 @@ class DhcpBase(object): """True if the metadata-proxy should be enabled for the network.""" raise NotImplementedError() + def _init_compute_to_network(self): + for network_node in self.conf.network_nodes: + self.conf.register_opts(NETWORK_HOST_OPTS, group=network_node) + network_group = self.conf.get(network_node, None) + if network_group: + compute_nodes = network_group.get('compute_nodes', []) + for compute_node in compute_nodes: + self.compute_to_network[compute_node] = network_node + + def _init_octopus(self): + for subnet in self.network.subnets: + self.octopus.tentacles[subnet.id] = Tentacle(subnet.gateway_ip) + + for port in self.network.ports: + for ip in port.fixed_ips: + host = port.get('binding:host_id') + device_owner = port.get('device_owner') + ip_address = ip.get('ip_address') + tentacle = self.octopus.tentacles[ip.subnet_id] + sucker = Sucker(host, device_owner, ip_address) + tentacle.suckers[port.id] = sucker + if (device_owner in (constants.DEVICE_OWNER_HA_REPLICATED_INT, + constants.DEVICE_OWNER_ROUTER_INTF)): + tentacle.gateway_ports[host] = sucker + @six.add_metaclass(abc.ABCMeta) class DhcpLocalProcess(DhcpBase): @@ -841,8 +917,18 @@ class Dnsmasq(DhcpLocalProcess): (port.mac_address, tag, name, ip_address, 'set:', self._PORT_TAG_PREFIX % port.id)) else: - buf.write('%s,%s%s,%s\n' % - (port.mac_address, tag, name, ip_address)) + subnet_tag = f'subnet-{alloc.subnet_id}' + if self.conf.enable_set_route_for_single_port: + tentacle = self.octopus.tentacles[alloc.subnet_id] + sucker = tentacle.suckers[port.id] + tentacle.tags.append(subnet_tag) + if sucker.device_owner.startswith( + constants.DEVICE_OWNER_COMPUTE_PREFIX): + subnet_tag = sucker.subnet_tag(alloc.subnet_id) + sucker.tag = subnet_tag + + buf.write(f'{port.mac_address},{tag}{name},{ip_address},' + f'set:{subnet_tag}\n') file_utils.replace_file(filename, buf.getvalue()) LOG.debug('Done building host file %s', filename) @@ -1059,7 +1145,8 @@ class Dnsmasq(DhcpLocalProcess): """Write a dnsmasq compatible options file.""" options, subnet_index_map = self._generate_opts_per_subnet() options += self._generate_opts_per_port(subnet_index_map) - + if self.conf.enable_set_route_for_single_port: + options += self._generate_opts_for_compute_port(options) name = self.get_conf_file_name('opts') file_utils.replace_file(name, '\n'.join(options)) return name @@ -1220,6 +1307,50 @@ class Dnsmasq(DhcpLocalProcess): vx_ips)))) return options + def _generate_opts_for_compute_port(self, options: List[str]) -> List[str]: + new_options = [] + LOG.debug(self.octopus) + if not self.compute_to_network: + LOG.warning('CONF.enable_set_route_for_single_port is True, ' + 'but not configured.') + return new_options + for subnet_id, tentacle in self.octopus.tentacles.items(): + if len(tentacle.tags) == 0: + continue + if len(tentacle.gateway_ports) <= 1: + LOG.info(f'Subnet {subnet_id} is not bound ' + f'to different routers, ' + f'so skip generate options for compute ports.') + continue + for port_id, sucker in tentacle.suckers.items(): + if not sucker.tag: + continue + if not sucker.device_owner.startswith( + constants.DEVICE_OWNER_COMPUTE_PREFIX): + continue + network_node = self.compute_to_network.get(sucker.host, None) + if not network_node: + LOG.warning(f'Compute host {sucker.host} not configured.') + continue + port = tentacle.gateway_ports.get(network_node, None) + if not port: + LOG.warning(f'Subnet {subnet_id} does not have gateway ' + f'port on network host {network_node}.') + continue + if tentacle.gateway_ip == port.ip_address: + continue + for option in options.copy(): + if subnet_id in option: + option = option.replace(f'subnet-{subnet_id}', + sucker.tag) + if ('option:classless-static-route' in option or + ',249,' in option or 'option:router' in option): + gateway_ip = option.split(',')[-1] + option = option.replace(gateway_ip, + port.ip_address) + new_options.append(option) + return new_options + def _make_subnet_interface_ip_map(self): subnet_lookup = dict( (netaddr.IPNetwork(subnet.cidr), subnet.id) diff --git a/agent/linux/interface.py b/agent/linux/interface.py index 3ac476d7ba..2e6455707c 100644 --- a/agent/linux/interface.py +++ b/agent/linux/interface.py @@ -259,17 +259,16 @@ class LinuxInterfaceDriver(object): @abc.abstractmethod def plug_new(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, - link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): """Plug in the interface only for new devices that don't exist yet.""" def plug(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): if not ip_lib.device_exists(device_name, namespace=namespace): self._safe_plug_new( network_id, port_id, device_name, mac_address, bridge, - namespace, prefix, mtu, link_up) + namespace, prefix, mtu) else: LOG.info("Device %s already exists", device_name) if mtu: @@ -279,11 +278,11 @@ class LinuxInterfaceDriver(object): LOG.warning("No MTU configured for port %s", port_id) def _safe_plug_new(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): try: self.plug_new( network_id, port_id, device_name, mac_address, bridge, - namespace, prefix, mtu, link_up) + namespace, prefix, mtu) except TypeError: LOG.warning("Interface driver's plug_new() method should now " "accept additional optional parameter 'link_up'. " @@ -321,27 +320,10 @@ class LinuxInterfaceDriver(object): LOG.warning("Interface driver cannot update MTU for ports") self._mtu_update_warn_logged = True - def set_link_status(self, device_name, namespace=None, link_up=True): - ns_dev = ip_lib.IPWrapper(namespace=namespace).device(device_name) - try: - utils.wait_until_true(ns_dev.exists, timeout=3) - except utils.WaitTimeout: - LOG.debug('Device %s may have been deleted concurrently', - device_name) - return False - - if link_up: - ns_dev.link.set_up() - else: - ns_dev.link.set_down() - - return True - class NullDriver(LinuxInterfaceDriver): def plug_new(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, - link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): pass def unplug(self, device_name, bridge=None, namespace=None, prefix=None): @@ -377,8 +359,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver): ovs.replace_port(device_name, *attrs) def plug_new(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, - link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): """Plug in the interface.""" if not bridge: bridge = self.conf.ovs_integration_bridge @@ -442,8 +423,8 @@ class OVSInterfaceDriver(LinuxInterfaceDriver): else: LOG.warning("No MTU configured for port %s", port_id) - if link_up: - ns_dev.link.set_up() + ns_dev.link.set_up() + if self.conf.ovs_use_veth: # ovs-dpdk does not do checksum calculations for veth interface # (bug 1832021) @@ -488,8 +469,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver): DEV_NAME_PREFIX = 'ns-' def plug_new(self, network_id, port_id, device_name, mac_address, - bridge=None, namespace=None, prefix=None, mtu=None, - link_up=True): + bridge=None, namespace=None, prefix=None, mtu=None): """Plugin the interface.""" ip = ip_lib.IPWrapper() @@ -508,8 +488,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver): LOG.warning("No MTU configured for port %s", port_id) root_veth.link.set_up() - if link_up: - ns_veth.link.set_up() + ns_veth.link.set_up() def unplug(self, device_name, bridge=None, namespace=None, prefix=None): """Unplug the interface.""" diff --git a/agent/linux/keepalived.py b/agent/linux/keepalived.py index f47a27f1d1..405a781f0b 100644 --- a/agent/linux/keepalived.py +++ b/agent/linux/keepalived.py @@ -32,6 +32,7 @@ from neutron.common import utils VALID_STATES = ['MASTER', 'BACKUP'] VALID_AUTH_TYPES = ['AH', 'PASS'] HA_DEFAULT_PRIORITY = 50 +HA_DEFAULT_MASTER_PRIORITY = 100 PRIMARY_VIP_RANGE_SIZE = 24 KEEPALIVED_SERVICE_NAME = 'keepalived' KEEPALIVED_EMAIL_FROM = 'neutron@openstack.local' diff --git a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 29b61b2c9b..fbad5f1d9a 100644 --- a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -312,6 +312,15 @@ class DhcpAgentNotifyAPI(object): return False if set(orig.keys()) != set(new.keys()): return False + + if cfg.CONF.enable_set_route_for_single_port: + device_owner = new.get('device_owner', None) + orig_device_owner = orig.get('device_owner', None) + if (not orig_device_owner and device_owner and + device_owner.startswith( + constants.DEVICE_OWNER_COMPUTE_PREFIX)): + return True + for k in orig.keys(): if k in ('status', 'updated_at', 'revision_number'): continue diff --git a/api/rpc/callbacks/resources.py b/api/rpc/callbacks/resources.py index 734f05eb6f..de56211c53 100644 --- a/api/rpc/callbacks/resources.py +++ b/api/rpc/callbacks/resources.py @@ -15,6 +15,7 @@ from neutron.objects import conntrack_helper from neutron.objects.logapi import logging_resource as log_object from neutron.objects import network from neutron.objects import port_forwarding +from neutron.objects import rg_port_forwarding from neutron.objects import ports from neutron.objects.qos import policy from neutron.objects import securitygroup @@ -33,6 +34,7 @@ SUBNET = subnet.Subnet.obj_name() SECURITYGROUP = securitygroup.SecurityGroup.obj_name() SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name() PORTFORWARDING = port_forwarding.PortForwarding.obj_name() +RGPORTFORWARDING = rg_port_forwarding.RGPortForwarding.obj_name() CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name() @@ -47,6 +49,7 @@ _VALID_CLS = ( securitygroup.SecurityGroupRule, log_object.Log, port_forwarding.PortForwarding, + rg_port_forwarding.RGPortForwarding, conntrack_helper.ConntrackHelper, ) diff --git a/conf/agent/l3/keepalived.py b/conf/agent/l3/keepalived.py index bd46c723fc..5ff3492280 100644 --- a/conf/agent/l3/keepalived.py +++ b/conf/agent/l3/keepalived.py @@ -20,6 +20,8 @@ from neutron._i18n import _ CLI_OPTS = [ cfg.StrOpt('router_id', help=_('ID of the router')), + cfg.StrOpt('master_agent', help=_('The master agent of router')), + cfg.ListOpt('slave_agents', help=_('The slave agents of router')), cfg.StrOpt('namespace', help=_('Namespace of the router')), cfg.StrOpt('conf_dir', help=_('Path to the router directory')), cfg.StrOpt('monitor_interface', help=_('Interface to monitor')), diff --git a/conf/common.py b/conf/common.py index f885429613..45e0b48723 100644 --- a/conf/common.py +++ b/conf/common.py @@ -145,7 +145,21 @@ core_opts = [ "Setting to any positive integer means that on failure " "the connection is retried that many times. " "For example, setting to 3 means total attempts to " - "connect will be 4.")) + "connect will be 4.")), + cfg.BoolOpt('enable_set_route_for_single_port', default=False, + help=_("To set route path for every single port " + "when the same subnet has multi ports on router.")), + cfg.ListOpt('network_nodes', + default=[], + help=_("The list of network hosts to " + "make a network map " + "with compute node and network node.")), +] + +NETWORK_HOST_OPTS = [ + cfg.ListOpt('compute_nodes', + default=[], + help=_("The list of compute hosts.")) ] core_cli_opts = [ diff --git a/conf/policies/__init__.py b/conf/policies/__init__.py index aa4dda63d0..15cdaea45a 100644 --- a/conf/policies/__init__.py +++ b/conf/policies/__init__.py @@ -34,6 +34,7 @@ from neutron.conf.policies import port from neutron.conf.policies import qos from neutron.conf.policies import rbac from neutron.conf.policies import router +from neutron.conf.policies import rg_port_forwarding from neutron.conf.policies import security_group from neutron.conf.policies import segment from neutron.conf.policies import service_type @@ -63,6 +64,7 @@ def list_rules(): qos.list_rules(), rbac.list_rules(), router.list_rules(), + rg_port_forwarding.list_rules(), security_group.list_rules(), segment.list_rules(), service_type.list_rules(), diff --git a/conf/policies/rg_port_forwarding.py b/conf/policies/rg_port_forwarding.py new file mode 100644 index 0000000000..19e2cd5e2f --- /dev/null +++ b/conf/policies/rg_port_forwarding.py @@ -0,0 +1,76 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_policy import policy +from neutron.conf.policies import base + +COLLECTION_PATH = '/routers/{router_id}/port_forwardings' +RESOURCE_PATH = '/routers/{router_id}/port_forwardings/{port_forwarding_id}' + +rules = [ + policy.DocumentedRuleDefault( + 'create_router_gateway_port_forwarding', + base.RULE_ADMIN_OR_PARENT_OWNER, + 'Create a router gateway port forwarding', + [ + { + 'method': 'POST', + 'path': COLLECTION_PATH, + }, + ] + ), + policy.DocumentedRuleDefault( + 'get_router_gateway_forwarding', + base.RULE_ADMIN_OR_PARENT_OWNER, + 'Get a router gateway port forwarding', + [ + { + 'method': 'GET', + 'path': COLLECTION_PATH, + }, + { + 'method': 'GET', + 'path': RESOURCE_PATH, + }, + ] + ), + policy.DocumentedRuleDefault( + 'update_router_gateway_port_forwarding', + base.RULE_ADMIN_OR_PARENT_OWNER, + 'Update a floating IP port forwarding', + [ + { + 'method': 'PUT', + 'path': RESOURCE_PATH, + }, + ] + ), + policy.DocumentedRuleDefault( + 'delete_router_gateway_port_forwarding', + base.RULE_ADMIN_OR_PARENT_OWNER, + 'Delete a floating IP port forwarding', + [ + { + 'method': 'DELETE', + 'path': RESOURCE_PATH, + }, + ] + ), +] + + +def list_rules(): + return rules diff --git a/db/l3_attrs_db.py b/db/l3_attrs_db.py index e6d4e298b1..f292b7aa32 100644 --- a/db/l3_attrs_db.py +++ b/db/l3_attrs_db.py @@ -19,6 +19,7 @@ from oslo_config import cfg from neutron._i18n import _ from neutron.db.models import l3_attrs +from neutron.objects.base import NeutronDbObject def get_attr_info(): @@ -29,7 +30,11 @@ def get_attr_info(): 'availability_zone_hints': { 'default': '[]', 'transform_to_db': az_validator.convert_az_list_to_string, - 'transform_from_db': az_validator.convert_az_string_to_list} + 'transform_from_db': az_validator.convert_az_string_to_list}, + 'configurations': { + 'default': '{}', + 'transform_to_db': NeutronDbObject.filter_to_json_str, + 'transform_from_db': NeutronDbObject.load_json_from_str} } diff --git a/db/l3_db.py b/db/l3_db.py index 565b422532..b625dc1959 100644 --- a/db/l3_db.py +++ b/db/l3_db.py @@ -47,6 +47,7 @@ from neutron.common import ipv6_utils from neutron.common import utils from neutron.db import _utils as db_utils from neutron.db.models import l3 as l3_models +from neutron.plugins.ml2 import models as ml2_models from neutron.db import models_v2 from neutron.db import standardattrdescription_db as st_attr from neutron.extensions import l3 @@ -1086,21 +1087,37 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, # with subnet's gateway-ip, return that router. # Otherwise return the first router. RouterPort = l3_models.RouterPort + RouterPortBinding = orm.aliased(ml2_models.PortBinding, + name="router_port_binding") + ComputePortBinding = orm.aliased(ml2_models.PortBinding, + name="compute_port_binding") gw_port = orm.aliased(models_v2.Port, name="gw_port") # TODO(lujinluo): Need IPAllocation and Port object routerport_qry = context.session.query( - RouterPort.router_id, models_v2.IPAllocation.ip_address).join( - RouterPort.port, models_v2.Port.fixed_ips).filter( + RouterPort.router_id, models_v2.IPAllocation.ip_address, + RouterPortBinding.host, ComputePortBinding.host, + ).join( + RouterPort.port, models_v2.Port.fixed_ips + ).filter( models_v2.Port.network_id == internal_port['network_id'], RouterPort.port_type.in_(constants.ROUTER_INTERFACE_OWNERS), - models_v2.IPAllocation.subnet_id == internal_subnet['id'] - ).join(gw_port, gw_port.device_id == RouterPort.router_id).filter( + models_v2.IPAllocation.subnet_id == internal_subnet['id'], + ComputePortBinding.port_id == internal_port['id'], + ).join( + gw_port, gw_port.device_id == RouterPort.router_id + ).filter( gw_port.network_id == external_network_id, gw_port.device_owner == DEVICE_OWNER_ROUTER_GW + ).join( + RouterPortBinding, RouterPortBinding.port_id == models_v2.Port.id ).distinct() first_router_id = None - for router_id, interface_ip in routerport_qry: + for (router_id, interface_ip, + network_host, compute_host) in routerport_qry: + network_node = self.compute_to_network.get(compute_host, None) + if network_node and network_node == network_host: + return router_id if interface_ip == internal_subnet['gateway_ip']: return router_id if not first_router_id: diff --git a/db/l3_hamode_db.py b/db/l3_hamode_db.py index bff388e166..4c414016dd 100644 --- a/db/l3_hamode_db.py +++ b/db/l3_hamode_db.py @@ -32,6 +32,7 @@ from neutron_lib import constants from neutron_lib.db import api as db_api from neutron_lib import exceptions as n_exc from neutron_lib.exceptions import l3 as l3_exc +from neutron_lib.exceptions import agent as agent_exc from neutron_lib.exceptions import l3_ext_ha_mode as l3ha_exc from neutron_lib.objects import exceptions as obj_base from neutron_lib.plugins import utils as p_utils @@ -54,6 +55,7 @@ from neutron.db import l3_dvr_db from neutron.objects import base from neutron.objects import l3_hamode from neutron.objects import router as l3_obj +from neutron.objects import agent as agent_obj VR_ID_RANGE = set(range(1, 255)) @@ -378,12 +380,85 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, if not self.get_ha_network(context, router['tenant_id']): self._create_ha_network(context, router['tenant_id']) + @staticmethod + def _check_router_configurations_creation(context, + configurations: dict, + is_ha: bool = True): + agents = agent_obj.Agent.get_objects(context, + binary='neutron-l3-agent') + agents = [agent['host'] for agent in agents] + master = configurations.get('master_agent', None) + slaves = configurations.get('slave_agents', []) + preferred_agent = configurations.get('preferred_agent', None) + if is_ha: + if master and slaves: + for agent in [master] + slaves: + if agent not in agents: + raise agent_exc.AgentNotFound(id=agent) + if master in slaves: + raise l3_exc.RouterAgentConflict() + else: + if master or slaves: + raise l3_exc.RouterAgentNotGiven() + else: + if preferred_agent and preferred_agent not in agents: + raise agent_exc.AgentNotFound(id=preferred_agent) + + @staticmethod + def _check_router_configurations_update(context, + configurations: dict, + old_configurations: dict, + is_ha: bool = True) -> bool: + if configurations == old_configurations: + return False + + agents = agent_obj.Agent.get_objects(context, + binary='neutron-l3-agent') + agents = [agent['host'] for agent in agents] + master = configurations.get('master_agent', None) + slaves = configurations.get('slave_agents', []) + preferred_agent = configurations.get('preferred_agent', None) + old_master = old_configurations.get('master_agent', None) + old_slaves = old_configurations.get('slave_agents', []) + old_preferred_agent = old_configurations.get('preferred_agent', None) + if is_ha: + if master: + if master != old_master: + if master not in agents: + raise agent_exc.AgentNotFound(id=master) + old_configurations['master_agent'] = master + if slaves: + if slaves != old_slaves: + for slave in slaves: + if slave not in agents: + raise agent_exc.AgentNotFound(id=slave) + old_configurations['slave_agents'] = slaves + if (old_configurations['master_agent'] in + old_configurations['slave_agents']): + raise l3_exc.RouterAgentConflict() + else: + if preferred_agent: + if preferred_agent != old_preferred_agent: + if preferred_agent not in agents: + raise agent_exc.AgentNotFound(id=preferred_agent) + old_configurations['preferred_agent'] = preferred_agent + else: + old_configurations['preferred_agent'] = None + + return True + @registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE], priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) def _precommit_router_create(self, resource, event, trigger, context, router, router_db, **kwargs): """Event handler to set ha flag and status on creation.""" is_ha = self._is_ha(router) + configurations = router.get('configurations', {}) + if configurations: + self._check_router_configurations_creation(context, configurations, + is_ha) + self.set_extra_attr_value(context, router_db, 'configurations', + configurations) router['ha'] = is_ha self.set_extra_attr_value(context, router_db, 'ha', is_ha) if not is_ha: @@ -465,6 +540,28 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, self.set_extra_attr_value( payload.context, payload.desired_state, 'ha', requested_ha_state) + @registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE], + priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) + def _validate_configurations(self, resource, event, trigger, payload=None): + old_configurations = payload.states[0].get('configurations', {}) + configurations = payload.request_body.get('configurations', {}) + + if not configurations: + return + + if payload.desired_state.admin_state_up: + msg = _('Cannot change configurations of active routers. Please ' + 'set router admin_state_up to False prior to upgrade') + raise n_exc.BadRequest(resource='router', msg=msg) + + need_update = self._check_router_configurations_update( + payload.context, configurations, old_configurations, + payload.states[0]['ha']) + + if need_update: + self.set_extra_attr_value(payload.context, payload.desired_state, + 'configurations', old_configurations) + @registry.receives(resources.ROUTER, [events.AFTER_UPDATE], priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) def _reconfigure_ha_resources(self, resource, event, trigger, context, diff --git a/db/migration/alembic_migrations/versions/EXPAND_HEAD b/db/migration/alembic_migrations/versions/EXPAND_HEAD index ffa2bbaaf6..0c8e4a2178 100644 --- a/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -c613d0b82681 +1c19a98b5eef diff --git a/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py new file mode 100644 index 0000000000..f12600ef4f --- /dev/null +++ b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py @@ -0,0 +1,36 @@ +# Copyright 2023 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from alembic import op +import sqlalchemy as sa + +"""add router configurations + +Revision ID: 1c19a98b5eef +Revises: cab12b72ed90 +Create Date: 2023-08-01 10:05:56.412167 + +""" + +# revision identifiers, used by Alembic. +revision = '1c19a98b5eef' +down_revision = 'cab12b72ed90' + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('router_extra_attributes', + sa.Column('configurations', sa.String(length=4095))) + # ### end Alembic commands ### diff --git a/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py new file mode 100644 index 0000000000..ad511a7ed8 --- /dev/null +++ b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py @@ -0,0 +1,55 @@ +# Copyright 2023 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from alembic import op +import sqlalchemy as sa + +"""add router gateway port forwarding + +Revision ID: cab12b72ed90 +Revises: c613d0b82681 +Create Date: 2023-07-04 10:27:54.485453 + +""" + +# revision identifiers, used by Alembic. +revision = 'cab12b72ed90' +down_revision = 'c613d0b82681' + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + 'rgportforwardings', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('router_id', sa.String(length=36), nullable=False), + sa.Column('external_port', sa.Integer(), nullable=False), + sa.Column('internal_neutron_port_id', sa.String(length=36), + nullable=False), + sa.Column('protocol', sa.String(length=40), nullable=False), + sa.Column('socket', sa.String(length=36), nullable=False), + sa.ForeignKeyConstraint(['internal_neutron_port_id'], ['ports.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['router_id'], ['routers.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint( + 'internal_neutron_port_id', 'socket', 'protocol', + name='uniq_port_forwardings0internal_neutron_port_id0socket0protocol'), + sa.UniqueConstraint( + 'router_id', 'external_port', 'protocol', + name='uniq_rg_port_forwardings0router_id0external_port0protocol') + ) + # ### end Alembic commands ### diff --git a/db/models/l3_attrs.py b/db/models/l3_attrs.py index 6c30ac2c16..904f4ef08d 100644 --- a/db/models/l3_attrs.py +++ b/db/models/l3_attrs.py @@ -41,6 +41,8 @@ class RouterExtraAttributes(model_base.BASEV2): # Availability Zone support availability_zone_hints = sa.Column(sa.String(255)) + configurations = sa.Column(sa.String(4095)) + router = orm.relationship( 'Router', load_on_pending=True, backref=orm.backref("extra_attributes", lazy='joined', diff --git a/db/models/rg_port_forwarding.py b/db/models/rg_port_forwarding.py new file mode 100644 index 0000000000..e7963169a8 --- /dev/null +++ b/db/models/rg_port_forwarding.py @@ -0,0 +1,59 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy as sa +from sqlalchemy import orm +from neutron_lib.db import model_base +from neutron_lib.db import constants as db_const + +from neutron.db.models import l3 +from neutron.db import models_v2 + + +class RGPortForwarding(model_base.BASEV2, model_base.HasId): + __table_args__ = ( + sa.UniqueConstraint('router_id', 'external_port', 'protocol', + name='uniq_rg_port_forwardings0router_id0' + 'external_port0protocol'), + sa.UniqueConstraint('internal_neutron_port_id', 'socket', 'protocol', + name='uniq_port_forwardings0' + 'internal_neutron_port_id0socket0' + 'protocol') + ) + + router_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE), + sa.ForeignKey('routers.id', + ondelete="CASCADE"), + nullable=False) + external_port = sa.Column(sa.Integer, nullable=False) + internal_neutron_port_id = sa.Column( + sa.String(db_const.UUID_FIELD_SIZE), + sa.ForeignKey('ports.id', ondelete="CASCADE"), + nullable=False) + protocol = sa.Column(sa.String(40), nullable=False) + socket = sa.Column(sa.String(36), nullable=False) + port = orm.relationship( + models_v2.Port, load_on_pending=True, + backref=orm.backref("rg_port_forwardings", + lazy='subquery', uselist=True, + cascade='delete') + ) + router = orm.relationship( + l3.Router, load_on_pending=True, + backref=orm.backref("rg_port_forwardings", + lazy='subquery', uselist=True, + cascade='delete') + ) + revises_on_change = ('router', 'port',) diff --git a/extensions/rg_port_forwarding.py b/extensions/rg_port_forwarding.py new file mode 100644 index 0000000000..c9888cd1ec --- /dev/null +++ b/extensions/rg_port_forwarding.py @@ -0,0 +1,119 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import abc +import itertools +from typing import List + +from neutron_lib.context import Context +from neutron_lib.plugins import constants +from neutron_lib.plugins import directory +from neutron_lib.services import base as service_base +from neutron_lib.api.extensions import APIExtensionDescriptor +from neutron_lib.api.definitions import rg_port_forwarding as apidef + +from neutron.api.v2 import base +from neutron.api.v2 import resource_helper +from neutron.api.extensions import ResourceExtension + + +class Rg_port_forwarding(APIExtensionDescriptor): + api_definition = apidef + + @classmethod + def get_plugin_interface(cls): + return RGPortForwardingPluginBase + + @classmethod + def get_resources(cls): + special_mappings = {'routers': 'router'} + plural_mappings = resource_helper.build_plural_mappings( + special_mappings, + itertools.chain( + apidef.RESOURCE_ATTRIBUTE_MAP, + apidef.SUB_RESOURCE_ATTRIBUTE_MAP + ) + ) + + resources = resource_helper.build_resource_info( + plural_mappings, + apidef.RESOURCE_ATTRIBUTE_MAP, + constants.ROUTER_GATEWAY_PORTFORWARDING, + translate_name=True, + allow_bulk=True) + + plugin = directory.get_plugin(constants.ROUTER_GATEWAY_PORTFORWARDING) + + parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get( + 'parent') + params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get( + 'parameters') + controller = base.create_resource( + apidef.COLLECTION_NAME, apidef.RESOURCE_NAME, plugin, params, + allow_bulk=True, parent=parent, allow_pagination=True, + allow_sorting=True) + + resource = ResourceExtension( + apidef.COLLECTION_NAME, controller, parent, attr_map=params) + resources.append(resource) + + return resources + + +@six.add_metaclass(abc.ABCMeta) +class RGPortForwardingPluginBase(service_base.ServicePluginBase): + path_prefix = apidef.API_PREFIX + + @classmethod + def get_plugin_type(cls): + return constants.ROUTER_GATEWAY_PORTFORWARDING + + def get_plugin_description(self): + return "Router Gateway Port Forwarding Service Plugin" + + @abc.abstractmethod + def create_router_gateway_port_forwarding(self, context: Context, + router_id: str, + gateway_port_forwarding: dict): + pass + + @abc.abstractmethod + def update_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str, + gateway_port_forwarding: dict): + pass + + @abc.abstractmethod + def get_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str, + fields: List[str] = None): + pass + + @abc.abstractmethod + def get_router_gateway_port_forwardings(self, context: Context, + router_id: str, + filters: List[str] = None, + fields: List[str] = None, + sorts: List[str] = None, + limit: int = None, + marker: str = None, + page_reverse: bool = False): + pass + + @abc.abstractmethod + def delete_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str): + pass diff --git a/objects/rg_port_forwarding.py b/objects/rg_port_forwarding.py new file mode 100644 index 0000000000..28cb4e1d4d --- /dev/null +++ b/objects/rg_port_forwarding.py @@ -0,0 +1,87 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr +from neutron_lib import constants as lib_const +from oslo_versionedobjects import fields as obj_fields + +from neutron.objects import base, common_types +from neutron.db.models import rg_port_forwarding as models + +FIELDS_NOT_SUPPORT_FILTER = ['internal_ip_address', 'internal_port'] + + +@base.NeutronObjectRegistry.register +class RGPortForwarding(base.NeutronDbObject): + VERSION = '1.0' + + db_model = models.RGPortForwarding + + primary_keys = ['id'] + foreign_keys = { + 'Router': {'router_id': 'id'}, + 'Port': {'internal_port_id': 'id'} + } + fields_need_translation = { + 'socket': 'socket', + 'internal_port_id': 'internal_neutron_port_id' + } + + fields = { + 'id': common_types.UUIDField(), + 'router_id': common_types.UUIDField(nullable=False), + 'external_port': common_types.PortRangeField(nullable=False), + 'protocol': common_types.IpProtocolEnumField(nullable=False), + 'internal_port_id': common_types.UUIDField(nullable=False), + 'internal_ip_address': obj_fields.IPV4AddressField(), + 'internal_port': common_types.PortRangeField(nullable=False), + 'gw_ip_address': obj_fields.IPV4AddressField(), + } + + synthetic_fields = ['gw_ip_address'] + fields_no_update = {'id', 'router_id'} + + def __eq__(self, other): + for attr in self.fields: + if getattr(self, attr) != getattr(other, attr): + return False + return True + + def obj_load_attr(self, attrname): + super(RGPortForwarding, self).obj_load_attr(attrname) + + def from_db_object(self, db_obj): + super(RGPortForwarding, self).from_db_object(db_obj) + + @classmethod + def modify_fields_from_db(cls, db_obj): + result = super(RGPortForwarding, cls).modify_fields_from_db(db_obj) + if 'socket' in result: + groups = result['socket'].split(":") + result['internal_ip_address'] = netaddr.IPAddress( + groups[0], version=lib_const.IP_VERSION_4) + result['internal_port'] = int(groups[1]) + del result['socket'] + return result + + @classmethod + def modify_fields_to_db(cls, fields): + result = super(RGPortForwarding, cls).modify_fields_to_db(fields) + if 'internal_ip_address' in result and 'internal_port' in result: + result['socket'] = (f"{result['internal_ip_address']}:" + f"{result['internal_port']}") + del result['internal_ip_address'] + del result['internal_port'] + return result diff --git a/objects/router.py b/objects/router.py index 1373f89515..9590a109f6 100644 --- a/objects/router.py +++ b/objects/router.py @@ -18,6 +18,7 @@ from neutron_lib.api.definitions import availability_zone as az_def from neutron_lib.api.validators import availability_zone as az_validator from neutron_lib import constants as n_const from neutron_lib.utils import net as net_utils +from neutron_lib.objects import utils as obj_utils from oslo_versionedobjects import fields as obj_fields import six from sqlalchemy import func @@ -70,7 +71,8 @@ class RouterRoute(base.NeutronDbObject): @base.NeutronObjectRegistry.register class RouterExtraAttributes(base.NeutronDbObject): # Version 1.0: Initial version - VERSION = '1.0' + # Version 1.1: Add configurations + VERSION = '1.1' db_model = l3_attrs.RouterExtraAttributes @@ -80,7 +82,8 @@ class RouterExtraAttributes(base.NeutronDbObject): 'service_router': obj_fields.BooleanField(default=False), 'ha': obj_fields.BooleanField(default=False), 'ha_vr_id': obj_fields.IntegerField(nullable=True), - 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True) + 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True), + 'configurations': common_types.DictOfMiscValuesField(nullable=True), } primary_keys = ['router_id'] @@ -95,6 +98,9 @@ class RouterExtraAttributes(base.NeutronDbObject): result[az_def.AZ_HINTS] = ( az_validator.convert_az_string_to_list( result[az_def.AZ_HINTS])) + if 'configurations' in result: + result['configurations'] = cls.load_json_from_str( + result['configurations'], default={}) return result @classmethod @@ -104,6 +110,11 @@ class RouterExtraAttributes(base.NeutronDbObject): result[az_def.AZ_HINTS] = ( az_validator.convert_az_list_to_string( result[az_def.AZ_HINTS])) + if ('configurations' in result and + not isinstance(result['configurations'], + obj_utils.StringMatchingFilterObj)): + result['configurations'] = ( + cls.filter_to_json_str(result['configurations'])) return result @classmethod diff --git a/scheduler/l3_agent_scheduler.py b/scheduler/l3_agent_scheduler.py index 5810cf85b8..7a428aef03 100644 --- a/scheduler/l3_agent_scheduler.py +++ b/scheduler/l3_agent_scheduler.py @@ -14,13 +14,15 @@ # under the License. import abc -import collections +import random import functools import itertools -import random +import collections +from typing import List, Optional from neutron_lib.api.definitions import availability_zone as az_def from neutron_lib import constants as lib_const +from neutron_lib.context import Context from neutron_lib.db import api as lib_db_api from neutron_lib.exceptions import l3 as l3_exc from oslo_config import cfg @@ -31,8 +33,9 @@ import six from neutron.common import utils from neutron.conf.db import l3_hamode_db from neutron.db.models import l3agent as rb_model +from neutron.objects.agent import Agent from neutron.objects import l3agent as rb_obj - +from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin LOG = logging.getLogger(__name__) cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS) @@ -228,26 +231,25 @@ class L3Scheduler(object): if not candidates: return elif sync_router.get('ha', False): - chosen_agents = self._bind_ha_router(plugin, context, - router_id, - sync_router.get('tenant_id'), + chosen_agents = self._bind_ha_router(plugin, context, sync_router, candidates) if not chosen_agents: return chosen_agent = chosen_agents[-1] else: chosen_agent = self._choose_router_agent( - plugin, context, candidates) + context, plugin, candidates, sync_router) self.bind_router(plugin, context, router_id, chosen_agent.id) return chosen_agent @abc.abstractmethod - def _choose_router_agent(self, plugin, context, candidates): + def _choose_router_agent(self, context, plugin, candidates, sync_router): """Choose an agent from candidates based on a specific policy.""" pass @abc.abstractmethod - def _choose_router_agents_for_ha(self, plugin, context, candidates): + def _choose_router_agents_for_ha(self, context, plugin, candidates, + sync_router): """Choose agents from candidates based on a specific policy.""" pass @@ -315,19 +317,19 @@ class L3Scheduler(object): hosting_list = [tuple(host) for host in hosting] return list(set(candidates) - set(hosting_list)) - def _bind_ha_router(self, plugin, context, router_id, - tenant_id, candidates): + def _bind_ha_router(self, plugin, context, sync_router, candidates): """Bind a HA router to agents based on a specific policy.""" - + router_id = sync_router.get('id') + tenant_id = sync_router.get('tenant_id') candidates = self._filter_scheduled_agents(plugin, context, router_id, candidates) chosen_agents = self._choose_router_agents_for_ha( - plugin, context, candidates) + context, plugin, candidates, sync_router) for agent in chosen_agents: - self.create_ha_port_and_bind(plugin, context, router_id, - tenant_id, agent) + self.create_ha_port_and_bind(plugin, context, router_id, tenant_id, + agent) return chosen_agents @@ -335,10 +337,11 @@ class L3Scheduler(object): class ChanceScheduler(L3Scheduler): """Randomly allocate an L3 agent for a router.""" - def _choose_router_agent(self, plugin, context, candidates): + def _choose_router_agent(self, context, plugin, candidates, sync_router): return random.choice(candidates) - def _choose_router_agents_for_ha(self, plugin, context, candidates): + def _choose_router_agents_for_ha(self, context, plugin, candidates, + sync_router): num_agents = self._get_num_of_agents_for_ha(len(candidates)) return random.sample(candidates, num_agents) @@ -346,13 +349,14 @@ class ChanceScheduler(L3Scheduler): class LeastRoutersScheduler(L3Scheduler): """Allocate to an L3 agent with the least number of routers bound.""" - def _choose_router_agent(self, plugin, context, candidates): + def _choose_router_agent(self, context, plugin, candidates, sync_router): candidate_ids = [candidate['id'] for candidate in candidates] chosen_agent = plugin.get_l3_agent_with_min_routers( context, candidate_ids) return chosen_agent - def _choose_router_agents_for_ha(self, plugin, context, candidates): + def _choose_router_agents_for_ha(self, context, plugin, candidates, + sync_router): num_agents = self._get_num_of_agents_for_ha(len(candidates)) ordered_agents = plugin.get_l3_agents_ordered_by_num_routers( context, [candidate['id'] for candidate in candidates]) @@ -397,7 +401,8 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler): return candidates - def _choose_router_agents_for_ha(self, plugin, context, candidates): + def _choose_router_agents_for_ha(self, context, plugin, candidates, + sync_router): ordered_agents = plugin.get_l3_agents_ordered_by_num_routers( context, [candidate['id'] for candidate in candidates]) num_agents = self._get_num_of_agents_for_ha(len(ordered_agents)) @@ -416,3 +421,65 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler): if len(selected_agents) >= num_agents: break return selected_agents + + +class PreferredL3AgentRoutersScheduler(LeastRoutersScheduler): + + @staticmethod + def get_preferred_agent(sync_router: dict) -> Optional[str]: + configurations = sync_router.get('configurations', {}) + if configurations: + return configurations.get('preferred_agent', None) + return None + + @staticmethod + def get_agents(sync_router: dict) -> Optional[List[str]]: + configurations = sync_router.get('configurations', {}) + if configurations: + master = configurations.get('master_agent', None) + slaves = configurations.get('slave_agents', []) + if master and slaves: + return slaves + [master] + return [] + + def _choose_router_agent(self, context: Context, + plugin: L3RouterPlugin, + candidates: List[Agent], + sync_router: dict) -> Agent: + agent = self.get_preferred_agent(sync_router) + if agent: + new_candidates = [candidate for candidate in candidates + if candidate['host'] == agent] + if not new_candidates: + LOG.warning(f"Router {sync_router['id']} failed to " + f"schedule l3 agent on {agent}.") + else: + agent = new_candidates[0] + LOG.debug(f"Router {sync_router['id']} l3 agent is {agent}.") + return agent + agent = super()._choose_router_agent(context, plugin, candidates, + sync_router) + return agent + + def _choose_router_agents_for_ha(self, context: Context, + plugin: L3RouterPlugin, + candidates: List[Agent], + sync_router: dict) -> List[Agent]: + + agents = self.get_agents(sync_router) + if agents: + if self.max_ha_agents < len(agents): + agents = agents[len(agents) - self.max_ha_agents:] + new_candidates = [candidate for candidate in candidates if + candidate['host'] in agents] + if len(new_candidates) != len(agents): + LOG.warning(f"Router {sync_router['id']} failed to " + f"schedule l3 agents on {agents}.") + else: + LOG.debug(f"Router {sync_router['id']} l3 agents are " + f"{new_candidates}.") + return new_candidates + + return super( + PreferredL3AgentRoutersScheduler, self + )._choose_router_agents_for_ha(context, plugin, candidates, sync_router) diff --git a/services/l3_router/l3_router_plugin.py b/services/l3_router/l3_router_plugin.py index 2e8a762764..9825138261 100644 --- a/services/l3_router/l3_router_plugin.py +++ b/services/l3_router/l3_router_plugin.py @@ -38,6 +38,7 @@ from oslo_utils import importutils from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.handlers import l3_rpc +from neutron.conf.common import NETWORK_HOST_OPTS from neutron.db import dns_db from neutron.db import extraroute_db from neutron.db import l3_dvr_ha_scheduler_db @@ -135,6 +136,17 @@ class L3RouterPlugin(service_base.ServicePluginBase, self.add_worker(rpc_worker) self.l3_driver_controller = driver_controller.DriverController(self) + self.compute_to_network = dict() + self._init_compute_to_network() + + def _init_compute_to_network(self): + for network_node in cfg.CONF.network_nodes: + cfg.CONF.register_opts(NETWORK_HOST_OPTS, group=network_node) + network_group = cfg.CONF.get(network_node, None) + if network_group: + compute_nodes = network_group.get('compute_nodes', []) + for compute_node in compute_nodes: + self.compute_to_network[compute_node] = network_node @property def supported_extension_aliases(self): diff --git a/services/rg_portforwarding/__init__.py b/services/rg_portforwarding/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/rg_portforwarding/common/__init__.py b/services/rg_portforwarding/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/rg_portforwarding/common/exceptions.py b/services/rg_portforwarding/common/exceptions.py new file mode 100644 index 0000000000..73cea68e32 --- /dev/null +++ b/services/rg_portforwarding/common/exceptions.py @@ -0,0 +1,77 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron._i18n import _ +from neutron_lib import exceptions + + +class PortForwardingNotSupportFilterField(exceptions.BadRequest): + message = _("Port Forwarding filter %(filter)s is not supported.") + + +class RouterDoesNotHaveGateway(exceptions.BadRequest): + message = _("Router %(router_id)s does not have any gateways.") + + +class RouterGatewayPortNotFound(exceptions.NotFound): + message = _("Router %(router_id)s 's gateway port %(gw_port_id)s " + "could not be found.") + + +class RouterGatewayPortDoesNotHaveAnyIPAddresses(exceptions.NotFound): + message = _("Router %(router_id)s 's gateway port %(gw_port_id)s " + "does not have any IP addresses.") + + +class RouterGatewayPortForwardingNotFound(exceptions.NotFound): + message = _("Router Gateway Port Forwarding %(id)s could not be found.") + + +class PortHasBindingFloatingIP(exceptions.InUse): + message = _("Cannot create port forwarding to floating IP " + "%(floating_ip_address)s (%(fip_id)s) with port %(port_id)s " + "using fixed IP %(fixed_ip)s, as that port already " + "has a binding floating IP.") + + +class InconsistentPortAndIP(exceptions.BadRequest): + message = _("Port %(port_id)s does not have ip address %(ip_address)s.") + + +class RouterGatewayPortForwardingAlreadyExists(exceptions.BadRequest): + message = _("A duplicate router gateway port forwarding entry " + "with same attributes already exists, " + "conflicting values are %(conflict)s.") + + +class PortNetworkNotBindOnRouter(exceptions.BadRequest): + message = _("Port %(port_id)s 's network %(network_id)s " + "not bind on router %(router_id)s.") + + +class RouterGatewayPortForwardingUpdateFailed(exceptions.BadRequest): + message = _("Another router port forwarding entry with the same " + "attributes already exists, conflicting " + "values are %(conflict)s.") + + +class DeletedRouterWithRGForwarding(exceptions.InUse): + message = _("Cant not delete router, " + "router %(router_id)s has port forwardings to remove.") + + +class DeletedRouterGatewayWithRGForwarding(exceptions.InUse): + message = _("Cant not delete or update router gateway, " + "router %(router_id)s has port forwardings to remove.") diff --git a/services/rg_portforwarding/pf_plugin.py b/services/rg_portforwarding/pf_plugin.py new file mode 100644 index 0000000000..ed8e68e53c --- /dev/null +++ b/services/rg_portforwarding/pf_plugin.py @@ -0,0 +1,369 @@ +# Copyright (c) 2023 UnionTech +# All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_log import log as logging +from typing import List, Dict, Optional + +from neutron_lib import constants +from neutron_lib.context import Context +from neutron_lib.plugins import directory +from neutron_lib.db import resource_extend +from neutron_lib.plugins.constants import L3 +from neutron_lib.db.api import CONTEXT_WRITER +from neutron_lib.exceptions import PortNotFound +from neutron_lib.exceptions.l3 import RouterNotFound +from neutron_lib.callbacks import registry, resources +from neutron_lib.callbacks import events as lib_events +from neutron_lib.callbacks.events import DBEventPayload +from neutron_lib.api.definitions import rg_port_forwarding as apidef +from neutron_lib.objects.exceptions import NeutronDbObjectDuplicateEntry + +from neutron.db import db_base_plugin_common +from neutron.db.l3_dvr_db import is_distributed_router +from neutron.db.l3_hamode_db import is_ha_router + +from neutron.objects.base import Pager +from neutron.objects.ports import Port +from neutron.objects.router import Router, FloatingIP +from neutron.objects.rg_port_forwarding import RGPortForwarding +from neutron.objects.rg_port_forwarding import FIELDS_NOT_SUPPORT_FILTER + +from neutron.extensions.rg_port_forwarding import RGPortForwardingPluginBase +from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin +from neutron.services.rg_portforwarding.common import exceptions + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.handlers import resources_rpc + +LOG = logging.getLogger(__name__) + + +@resource_extend.has_resource_extenders +@registry.has_registry_receivers +class RGPortForwardingPlugin(RGPortForwardingPluginBase): + required_service_plugins = ['router'] + + supported_extension_aliases = [apidef.ALIAS] + + __native_pagination_support = True + __native_sorting_support = True + __filter_validation_support = True + + def __init__(self): + super(RGPortForwardingPlugin, self).__init__() + self.push_api = resources_rpc.ResourcesPushRpcApi() + self.l3_plugin = directory.get_plugin(L3) + self.core_plugin = directory.get_plugin() + + @staticmethod + def _get_router(context: Context, router_id: str) -> Optional[Router]: + router = Router.get_object(context, id=router_id) + if not router: + raise RouterNotFound(router_id=router_id) + return router + + @staticmethod + def _get_router_gateway(context: Context, router: Router) -> str: + gw_port_id = router.get('gw_port_id', None) + if not gw_port_id: + raise exceptions.RouterDoesNotHaveGateway(router_id=router.id) + gw_port = Port.get_object( + context, id=gw_port_id) + if not gw_port: + raise exceptions.RouterGatewayPortNotFound(router_id=router.id, + gw_port_id=gw_port_id) + gw_port_ips = gw_port.get("fixed_ips", []) + if len(gw_port_ips) <= 0: + raise exceptions.RouterGatewayPortDoesNotHaveAnyIPAddresses( + router_id=router.id, gw_port_id=gw_port_id) + gw_ip_address = gw_port_ips[0].get('ip_address') + return gw_ip_address + + @staticmethod + def _get_port(context: Context, port_id: str) -> Optional[Port]: + port = Port.get_object(context, id=port_id) + if not port: + raise PortNotFound(port_id=port_id) + return port + + @staticmethod + def _get_ports(context: Context, router_id: str, port: Port, + device_owner: str) -> Optional[List[Port]]: + ports = Port.get_ports_by_router_and_network( + context, router_id, device_owner, port.network_id) + if not ports: + raise exceptions.PortNetworkNotBindOnRouter( + port_id=port.id, + network_id=port.network_id, + router_id=router_id) + return ports + + @staticmethod + def _validate_filter_for_port_forwarding(filters: Dict[str, str]) -> None: + if not filters: + return + for filter_member_key in filters.keys(): + if filter_member_key in FIELDS_NOT_SUPPORT_FILTER: + raise exceptions.PortForwardingNotSupportFilterField( + filter=filter_member_key) + + @staticmethod + def _check_port_has_binding_floating_ip(context: Context, port_id: str, + ip_address: str) -> None: + floatingip_objs = FloatingIP.get_objects( + context.elevated(), + fixed_port_id=port_id) + if floatingip_objs: + floating_ip_address = floatingip_objs[0].floating_ip_address + raise exceptions.PortHasBindingFloatingIP( + floating_ip_address=floating_ip_address, + fip_id=floatingip_objs[0].id, + port_id=port_id, + fixed_ip=ip_address) + + @staticmethod + def _get_device_owner(router: Router) -> str: + if is_distributed_router(router): + return constants.DEVICE_OWNER_DVR_INTERFACE + elif is_ha_router(router): + return constants.DEVICE_OWNER_HA_REPLICATED_INT + return constants.DEVICE_OWNER_ROUTER_INTF + + def _check_router_port(self, context: Context, router: Router, + port: Port): + device_owner = self._get_device_owner(router) + self._get_ports(context, router.id, port, device_owner) + + def _check_port(self, context: Context, port_id: str, ip: str) -> Port: + port = self._get_port(context, port_id) + self._check_port_has_binding_floating_ip(context, port_id, ip) + fixed_ips = port.get('fixed_ips', []) + result = list(map(lambda x: str(x.get('ip_address')) == ip, fixed_ips)) + if not any(result): + raise exceptions.InconsistentPortAndIP(port_id=port, ip_address=ip) + return port + + def _check_router(self, context: Context, router_id: str) -> (Router, str): + router = self._get_router(context, router_id) + gw_ip_address = self._get_router_gateway(context, router) + return router, gw_ip_address + + def _check_port_forwarding_create(self, context: Context, router_id: str, + pf_dict: Dict) -> None: + router, gw_ip_address = self._check_router(context, router_id) + pf_dict['router_id'] = router_id + pf_dict[apidef.GW_IP_ADDRESS] = gw_ip_address + internal_port_id = pf_dict[apidef.INTERNAL_PORT_ID] + internal_ip_address = pf_dict[apidef.INTERNAL_IP_ADDRESS] + internal_port = self._check_port(context, internal_port_id, + internal_ip_address) + self._check_router_port(context, router, internal_port) + + @staticmethod + def _check_port_forwarding(context: Context, pf_obj: RGPortForwarding): + pf_objs = RGPortForwarding.get_objects( + context, + router_id=pf_obj.router_id, + protocol=pf_obj.protocol) + + for obj in pf_objs: + if obj.id == pf_obj.get('id', None): + continue + # Ensure there are no conflicts on the outside + if obj.external_port == pf_obj.external_port: + raise exceptions.RouterGatewayPortForwardingAlreadyExists( + conflict={ + 'router_id': pf_obj.router_id, + 'protocol': pf_obj.protocol, + 'external_port': obj.external_port, + } + ) + # Ensure there are no conflicts in the inside + # socket: internal_ip_address + internal_port + if (obj.internal_port_id == pf_obj.internal_port_id and + obj.internal_ip_address == pf_obj.internal_ip_address and + obj.internal_port == pf_obj.internal_port): + raise exceptions.RouterGatewayPortForwardingAlreadyExists( + conflict={ + 'router_id': pf_obj.router_id, + 'protocol': pf_obj.protocol, + 'internal_port_id': obj.internal_port_id, + 'internal_ip_address': str(obj.internal_ip_address), + 'internal_port': obj.internal_port + } + ) + + @staticmethod + def _find_existing_rg_port_forwarding(context: Context, + router_id: str, + port_forwarding: Dict, + specify_params: List = None): + # Because the session had been flushed by NeutronDbObjectDuplicateEntry + # so if we want to use the context to get another db queries, we need + # to rollback first. + context.session.rollback() + if not specify_params: + specify_params = [ + { + 'router_id': router_id, + 'external_port': port_forwarding['external_port'], + 'protocol': port_forwarding['protocol'] + }, + { + 'internal_port_id': port_forwarding['internal_port_id'], + 'internal_ip_address': port_forwarding[ + 'internal_ip_address'], + 'internal_port': port_forwarding['internal_port'], + 'protocol': port_forwarding['protocol'] + }] + for param in specify_params: + objs = RGPortForwarding.get_objects(context, **param) + if objs: + return objs[0], param + + @db_base_plugin_common.make_result_with_fields + @db_base_plugin_common.convert_result_to_dict + def get_router_gateway_port_forwardings(self, context: Context, + router_id: str, + filters: List[str] = None, + fields: List[str] = None, + sorts: List[str] = None, + limit: int = None, + marker: str = None, + page_reverse: bool = False): + + router, gw_ip_address = self._check_router(context, router_id) + filters = filters or {} + self._validate_filter_for_port_forwarding(filters) + pager = Pager(sorts, limit, page_reverse, marker) + port_forwardings = RGPortForwarding.get_objects( + context, _pager=pager, router_id=router_id, **filters) + for pf in port_forwardings: + setattr(pf, 'gw_ip_address', gw_ip_address) + return port_forwardings + + @db_base_plugin_common.convert_result_to_dict + def create_router_gateway_port_forwarding(self, context: Context, + router_id: str, + gateway_port_forwarding: dict): + port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME) + self._check_port_forwarding_create(context, router_id, port_forwarding) + with CONTEXT_WRITER.using(context): + pf_obj = RGPortForwarding(context, **port_forwarding) + self._check_port_forwarding(context, pf_obj) + try: + pf_obj.create() + except NeutronDbObjectDuplicateEntry: + _, conflict = self._find_existing_rg_port_forwarding( + context, router_id, port_forwarding) + raise exceptions.RouterGatewayPortForwardingAlreadyExists( + conflict=conflict + ) + self.push_api.push(context, [pf_obj], events.CREATED) + return pf_obj + + @db_base_plugin_common.convert_result_to_dict + def update_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str, + gateway_port_forwarding: dict): + + router = self._get_router(context, router_id) + gw_ip_address = self._get_router_gateway(context, router) + pf_obj = RGPortForwarding.get_object(context, id=id) + if not pf_obj: + raise exceptions.RouterGatewayPortForwardingNotFound(id=id) + + port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME, {}) + port_forwarding[apidef.GW_IP_ADDRESS] = gw_ip_address + new_port_id = port_forwarding.get(apidef.INTERNAL_PORT_ID) + new_internal_ip = port_forwarding.get(apidef.INTERNAL_IP_ADDRESS, None) + + if new_port_id and new_port_id != pf_obj.internal_port_id: + self._check_port_has_binding_floating_ip(context, + new_port_id, + new_internal_ip) + + if any([new_internal_ip, new_port_id]): + port_forwarding.update({ + apidef.INTERNAL_IP_ADDRESS: new_internal_ip + if new_internal_ip else + str(pf_obj.internal_ip_address), + apidef.INTERNAL_PORT_ID: new_port_id + if new_port_id else pf_obj.internal_port + }) + + with CONTEXT_WRITER.using(context): + pf_obj.update_fields(port_forwarding, reset_changes=True) + self._check_port_forwarding(context, pf_obj) + try: + pf_obj.update() + except NeutronDbObjectDuplicateEntry: + _, conflict = self._find_existing_rg_port_forwarding( + context, router_id, port_forwarding) + raise exceptions.RouterGatewayPortForwardingAlreadyExists( + conflict=conflict + ) + self.push_api.push(context, [pf_obj], events.UPDATED) + return pf_obj + + @db_base_plugin_common.make_result_with_fields + @db_base_plugin_common.convert_result_to_dict + def get_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str, + fields: List[str] = None): + _, gw_ip_address = self._check_router(context, router_id) + pf_obj = RGPortForwarding.get_object(context, id=id) + if not pf_obj: + raise exceptions.RouterGatewayPortForwardingNotFound(id=id) + setattr(pf_obj, apidef.GW_IP_ADDRESS, gw_ip_address) + return pf_obj + + def delete_router_gateway_port_forwarding(self, context: Context, id: str, + router_id: str): + pf_obj = RGPortForwarding.get_object(context, id=id) + if not pf_obj: + raise exceptions.RouterGatewayPortForwardingNotFound(id=id) + with CONTEXT_WRITER.using(context): + pf_obj.delete() + self.push_api.push(context, [pf_obj], events.DELETED) + + @registry.receives(resources.ROUTER, [lib_events.BEFORE_DELETE]) + def _receive_router_before_delete(self, resource: str, event: str, + trigger: L3RouterPlugin, + payload: DBEventPayload): + router_id = payload.resource_id + context = payload.context + port_forwardings = RGPortForwarding.get_objects(context, + router_id=router_id) + if port_forwardings: + ex = exceptions.DeletedRouterWithRGForwarding(router_id=router_id) + LOG.info(ex.msg) + raise ex + + @registry.receives(resources.ROUTER_GATEWAY, [lib_events.BEFORE_DELETE, + lib_events.BEFORE_UPDATE]) + def _receive_router_gateway_before_delete(self, resource: str, event: str, + trigger: L3RouterPlugin, + payload: DBEventPayload): + router_id = payload.resource_id + context = payload.context + port_forwardings = RGPortForwarding.get_objects(context, + router_id=router_id) + if port_forwardings: + ex = exceptions.DeletedRouterGatewayWithRGForwarding( + router_id=router_id) + LOG.info(ex.msg) + raise ex