diff options
Diffstat (limited to '0001-add-distributed-traffic-feature-support.patch')
-rw-r--r-- | 0001-add-distributed-traffic-feature-support.patch | 2583 |
1 files changed, 2583 insertions, 0 deletions
diff --git a/0001-add-distributed-traffic-feature-support.patch b/0001-add-distributed-traffic-feature-support.patch new file mode 100644 index 0000000..7c37c36 --- /dev/null +++ b/0001-add-distributed-traffic-feature-support.patch @@ -0,0 +1,2583 @@ +Author: wangkuntian <wangkuntian@uniontech.com> +Date: Fri Oct 13 16:25:17 2023 +0800 + + feat: add distributed traffic feature +--- + agent/l3/dvr_edge_ha_router.py | 4 +- + agent/l3/extensions/rg_port_forwarding.py | 398 ++++++++++++++++++++++++++++ + agent/l3/ha.py | 9 - + agent/l3/ha_router.py | 74 +++--- + agent/l3/keepalived_state_change.py | 12 +- + agent/l3/router_info.py | 19 +- + agent/linux/dhcp.py | 137 +++++++++- + agent/linux/interface.py | 43 +-- + agent/linux/keepalived.py | 1 + + api/rpc/agentnotifiers/dhcp_rpc_agent_api.py | 9 + + api/rpc/callbacks/resources.py | 3 + + conf/agent/l3/keepalived.py | 2 + + conf/common.py | 16 +- + conf/policies/__init__.py | 2 + + conf/policies/rg_port_forwarding.py | 76 ++++++ + db/l3_attrs_db.py | 7 +- + db/l3_db.py | 27 +- + db/l3_hamode_db.py | 97 +++++++ + db/migration/alembic_migrations/versions/EXPAND_HEAD | 2 +- + .../train/expand/1c19a98b5eef_add_router_configurations.py | 36 +++ + .../expand/cab12b72ed90_add_router_gateway_port_forwarding.py | 55 ++++ + db/models/l3_attrs.py | 2 + + db/models/rg_port_forwarding.py | 59 +++++ + extensions/rg_port_forwarding.py | 119 +++++++++ + objects/rg_port_forwarding.py | 87 ++++++ + objects/router.py | 15 +- + scheduler/l3_agent_scheduler.py | 107 ++++++-- + services/l3_router/l3_router_plugin.py | 12 + + services/rg_portforwarding/__init__.py | 0 + services/rg_portforwarding/common/__init__.py | 0 + services/rg_portforwarding/common/exceptions.py | 77 ++++++ + services/rg_portforwarding/pf_plugin.py | 369 ++++++++++++++++++++++++++ + 32 files changed, 1749 insertions(+), 127 deletions(-) + +diff --git a/agent/l3/dvr_edge_ha_router.py b/agent/l3/dvr_edge_ha_router.py +index 71f740bef9..b92f70b70f 100644 +--- a/agent/l3/dvr_edge_ha_router.py ++++ b/agent/l3/dvr_edge_ha_router.py +@@ -114,9 +114,7 @@ class DvrEdgeHaRouter(dvr_edge_router.DvrEdgeRouter, + + def _external_gateway_added(self, ex_gw_port, interface_name, + ns_name, preserve_ips): +- link_up = self.external_gateway_link_up() +- self._plug_external_gateway(ex_gw_port, interface_name, ns_name, +- link_up=link_up) ++ self._plug_external_gateway(ex_gw_port, interface_name, ns_name) + + def _is_this_snat_host(self): + return self.agent_conf.agent_mode == constants.L3_AGENT_MODE_DVR_SNAT +diff --git a/agent/l3/extensions/rg_port_forwarding.py b/agent/l3/extensions/rg_port_forwarding.py +new file mode 100644 +index 0000000000..a159e4df34 +--- /dev/null ++++ b/agent/l3/extensions/rg_port_forwarding.py +@@ -0,0 +1,398 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++import collections ++from typing import Optional, List ++from oslo_concurrency import lockutils ++from oslo_log import log as logging ++ ++from neutron_lib import constants ++from neutron_lib.rpc import Connection ++from neutron_lib.context import Context ++from neutron_lib.agent import l3_extension ++ ++from neutron.agent.linux.ip_lib import IPDevice ++from neutron.agent.l3.router_info import RouterInfo ++from neutron.agent.linux.iptables_manager import IptablesManager ++ ++from neutron.api.rpc.handlers import resources_rpc ++from neutron.api.rpc.callbacks import resources, events ++from neutron.api.rpc.callbacks.consumer import registry ++ ++from neutron.common import coordination ++ ++from neutron.objects.ports import Port ++from neutron.objects.router import Router ++from neutron.objects.rg_port_forwarding import RGPortForwarding ++ ++LOG = logging.getLogger(__name__) ++ ++PORT_FORWARDING_PREFIX = 'rg_portforwarding-' ++DEFAULT_PORT_FORWARDING_CHAIN = 'rg-pf' ++PORT_FORWARDING_CHAIN_PREFIX = 'pf-' ++ ++ ++def _get_port_forwarding_chain_name(pf_id): ++ chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id ++ return chain_name[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP] ++ ++ ++class RGPortForwardingMapping(object): ++ def __init__(self): ++ self.managed_port_forwardings = {} ++ self.router_pf_mapping = collections.defaultdict(set) ++ ++ @lockutils.synchronized('rg-port-forwarding-cache') ++ def check_port_forwarding_changes(self, new_pf: RGPortForwarding) -> bool: ++ old_pf = self.managed_port_forwardings.get(new_pf.id) ++ return old_pf != new_pf ++ ++ @lockutils.synchronized('rg-port-forwarding-cache') ++ def set_port_forwardings(self, port_forwardings: List[RGPortForwarding]): ++ for port_forwarding in port_forwardings: ++ self._set_router_port_forwarding(port_forwarding, ++ port_forwarding.router_id) ++ ++ def _set_router_port_forwarding(self, ++ port_forwarding: RGPortForwarding, ++ router_id: str): ++ self.router_pf_mapping[router_id].add(port_forwarding.id) ++ self.managed_port_forwardings[port_forwarding.id] = port_forwarding ++ ++ @lockutils.synchronized('rg-port-forwarding-cache') ++ def update_port_forwardings(self, port_forwardings): ++ for port_forwarding in port_forwardings: ++ self.managed_port_forwardings[port_forwarding.id] = port_forwarding ++ ++ @lockutils.synchronized('rg-port-forwarding-cache') ++ def del_port_forwardings(self, port_forwardings): ++ for port_forwarding in port_forwardings: ++ if not self.managed_port_forwardings.get(port_forwarding.id): ++ continue ++ self.managed_port_forwardings.pop(port_forwarding.id) ++ self.router_pf_mapping[port_forwarding.router_id].discard( ++ port_forwarding.id) ++ if not self.router_pf_mapping[port_forwarding.router_id]: ++ self.router_pf_mapping.pop(port_forwarding.router_id) ++ ++ @lockutils.synchronized('rg-port-forwarding-cache') ++ def clean_port_forwardings_by_router_id(self, router_id: str): ++ pf_ids = self.router_pf_mapping.pop(router_id, []) ++ for pf_id in pf_ids: ++ self.managed_port_forwardings.pop(pf_id, None) ++ ++ ++class RGPortForwardingAgentExtension(l3_extension.L3AgentExtension): ++ SUPPORTED_RESOURCE_TYPES = [resources.RGPORTFORWARDING] ++ ++ def consume_api(self, agent_api): ++ self.agent_api = agent_api ++ ++ def initialize(self, connection, driver_type): ++ self.mapping = RGPortForwardingMapping() ++ self.resource_rpc = resources_rpc.ResourcesPullRpcApi() ++ self._register_rpc_consumers() ++ ++ def _register_rpc_consumers(self): ++ registry.register(self._handle_notification, ++ resources.RGPORTFORWARDING) ++ self._connection = Connection() ++ endpoints = [resources_rpc.ResourcesPushRpcCallback()] ++ topic = resources_rpc.resource_type_versioned_topic( ++ resources.RGPORTFORWARDING) ++ self._connection.create_consumer(topic, endpoints, fanout=True) ++ self._connection.consume_in_threads() ++ ++ def _handle_notification(self, context: Context, ++ resource_type: str, ++ forwardings, event_type): ++ for forwarding in forwardings: ++ self._process_port_forwarding_event( ++ context, forwarding, event_type) ++ ++ def _get_gw_port_and_ip(self, ++ ri: RouterInfo) -> (Optional[Port], Optional[str]): ++ ex_gw_port = ri.get_ex_gw_port() ++ ex_gw_port_ip = self._get_gw_port_ip(ex_gw_port) ++ if not ex_gw_port_ip: ++ LOG.error(f"Router {ri.router_id} external port " ++ f"{ex_gw_port['id']} does not have any IP addresses") ++ return None, None ++ return ex_gw_port, ex_gw_port_ip ++ ++ def _process_port_forwarding_event(self, context: Context, ++ port_forwarding: RGPortForwarding, ++ event_type: str): ++ router_id = port_forwarding.router_id ++ ri = self._get_router_info(router_id) ++ if not self._check_if_need_process(ri, force=True): ++ return ++ ++ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri) ++ if not ex_gw_port or not ex_gw_port_ip: ++ return ++ ++ (interface_name, namespace, ++ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port) ++ ++ if event_type == events.CREATED: ++ self._process_create([port_forwarding], ri, interface_name, ++ ex_gw_port_ip, namespace, iptables_manager) ++ elif event_type == events.UPDATED: ++ self._process_update([port_forwarding], interface_name, ++ ex_gw_port_ip, namespace, iptables_manager) ++ elif event_type == events.DELETED: ++ self._process_delete([port_forwarding], interface_name, ++ ex_gw_port_ip, namespace, iptables_manager) ++ ++ def ha_state_change(self, context: Context, data: Router) -> None: ++ pass ++ ++ def update_network(self, context: Context, data: dict) -> None: ++ pass ++ ++ def add_router(self, context: Context, data: Router) -> None: ++ LOG.info(f"call add_router for {data['id']}") ++ self.process_port_forwarding(context, data) ++ ++ def update_router(self, context: Context, data: Router) -> None: ++ LOG.info(f"call update_router for {data['id']}") ++ self.process_port_forwarding(context, data) ++ ++ def delete_router(self, context: Context, data: Router) -> None: ++ self.mapping.clean_port_forwardings_by_router_id(data['id']) ++ ++ def _get_router_info(self, router_id) -> Optional[RouterInfo]: ++ router_info = self.agent_api.get_router_info(router_id) ++ if router_info: ++ return router_info ++ LOG.debug("Router %s is not managed by this agent. " ++ "It was possibly deleted concurrently.", router_id) ++ ++ @staticmethod ++ def _check_if_need_process(ri: RouterInfo, force: bool = False) -> bool: ++ if not ri or not ri.get_ex_gw_port(): ++ return False ++ ++ if force: ++ return True ++ ++ is_distributed = ri.router.get('distributed') ++ agent_mode = ri.agent_conf.agent_mode ++ if (is_distributed and ++ agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL, ++ constants.L3_AGENT_MODE_DVR]): ++ # just support centralized cases ++ return False ++ ++ if is_distributed and not ri.snat_namespace.exists(): ++ return False ++ ++ return True ++ ++ def process_port_forwarding(self, context: Context, data: Router): ++ ri = self._get_router_info(data['id']) ++ if not self._check_if_need_process(ri): ++ return ++ self.check_local_port_forwardings(context, ri) ++ ++ @staticmethod ++ def _get_gw_port_ip(gw_port: dict) -> Optional[str]: ++ fixed_ips = gw_port.get('fixed_ips', []) ++ if not fixed_ips: ++ return ++ return fixed_ips[0].get('ip_address', None) ++ ++ @staticmethod ++ def _get_resource_by_router(ri: RouterInfo, ex_gw_port: dict) -> ( ++ str, str, IptablesManager): ++ is_distributed = ri.router.get('distributed') ++ if is_distributed: ++ interface_name = ri.get_snat_external_device_interface_name( ++ ex_gw_port) ++ namespace = ri.snat_namespace.name ++ iptables_manager = ri.snat_iptables_manager ++ else: ++ interface_name = ri.get_external_device_interface_name(ex_gw_port) ++ namespace = ri.ns_name ++ iptables_manager = ri.iptables_manager ++ ++ return interface_name, namespace, iptables_manager ++ ++ def check_local_port_forwardings(self, context: Context, ri: RouterInfo): ++ pfs = self.resource_rpc.bulk_pull( ++ context, resources.RGPORTFORWARDING, ++ filter_kwargs={'router_id': ri.router_id}) ++ if not pfs: ++ return ++ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri) ++ if not ex_gw_port_ip or not ex_gw_port_ip: ++ return ++ (interface_name, namespace, ++ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port) ++ local_pfs = set(self.mapping.managed_port_forwardings.keys()) ++ new_pfs = [] ++ updated_pfs = [] ++ current_pfs = set() ++ for pf in pfs: ++ if pf.id in self.mapping.managed_port_forwardings: ++ if self.mapping.check_port_forwarding_changes(pf): ++ updated_pfs.append(pf) ++ else: ++ new_pfs.append(pf) ++ current_pfs.add(pf.id) ++ ++ remove_pf_ids_set = local_pfs - current_pfs ++ remove_pfs = [self.mapping.managed_port_forwardings[pf_id] ++ for pf_id in remove_pf_ids_set] ++ ++ self._process_create(new_pfs, ri, interface_name, ex_gw_port_ip, ++ namespace, iptables_manager) ++ ++ self._process_update(updated_pfs, interface_name, ex_gw_port_ip, ++ namespace, iptables_manager) ++ ++ self._process_delete(remove_pfs, interface_name, ex_gw_port_ip, ++ namespace, iptables_manager) ++ ++ @staticmethod ++ def _install_default_rules(iptables_manager: IptablesManager): ++ default_rule = '-j %s-%s' % (iptables_manager.wrap_name, ++ DEFAULT_PORT_FORWARDING_CHAIN) ++ LOG.info(f'Add default chain {DEFAULT_PORT_FORWARDING_CHAIN}') ++ LOG.info(f'Add default rule {default_rule}') ++ iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN) ++ iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule) ++ iptables_manager.apply() ++ ++ @staticmethod ++ def _get_rg_rules(port_forward: RGPortForwarding, wrap_name: str): ++ chain_rule_list = [] ++ pf_chain_name = _get_port_forwarding_chain_name(port_forward.id) ++ chain_rule_list.append( ++ (DEFAULT_PORT_FORWARDING_CHAIN, f'-j {wrap_name}-{pf_chain_name}')) ++ gw_ip_address = port_forward.gw_ip_address ++ protocol = port_forward.protocol ++ internal_ip_address = str(port_forward.internal_ip_address) ++ internal_port = port_forward.internal_port ++ external_port = port_forward.external_port ++ chain_rule = ( ++ pf_chain_name, ++ f'-d {gw_ip_address}/32 -p {protocol} -m {protocol} ' ++ f'--dport {external_port} ' ++ f'-j DNAT --to-destination {internal_ip_address}:{internal_port}' ++ ) ++ chain_rule_list.append(chain_rule) ++ return chain_rule_list ++ ++ def _rule_apply(self, ++ iptables_manager: IptablesManager, ++ port_forwarding: RGPortForwarding, ++ rule_tag: str): ++ iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag) ++ if (DEFAULT_PORT_FORWARDING_CHAIN not in ++ iptables_manager.ipv4['nat'].chains): ++ self._install_default_rules(iptables_manager) ++ ++ for chain, rule in self._get_rg_rules(port_forwarding, ++ iptables_manager.wrap_name): ++ LOG.info(f'Add router gateway port forwarding ' ++ f'rule {rule} in {chain}') ++ if chain not in iptables_manager.ipv4['nat'].chains: ++ iptables_manager.ipv4['nat'].add_chain(chain) ++ iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag) ++ ++ def _store_local(self, pf_objs: List[RGPortForwarding], event_type: str): ++ if event_type == events.CREATED: ++ self.mapping.set_port_forwardings(pf_objs) ++ elif event_type == events.UPDATED: ++ self.mapping.update_port_forwardings(pf_objs) ++ elif event_type == events.DELETED: ++ self.mapping.del_port_forwardings(pf_objs) ++ ++ def _process_create(self, ++ port_forwardings: List[RGPortForwarding], ++ ri: RouterInfo, ++ interface_name: str, ++ interface_ip: str, ++ namespace: str, ++ iptables_manager: IptablesManager): ++ if not port_forwardings: ++ return ++ ++ ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None) ++ if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE: ++ ri.enable_keepalived() ++ ++ for port_forwarding in port_forwardings: ++ if port_forwarding.id in self.mapping.managed_port_forwardings: ++ LOG.debug("Skip port forwarding %s for create, as it had been " ++ "managed by agent", port_forwarding.id) ++ continue ++ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id ++ port_forwarding.gw_ip_address = interface_ip ++ self._rule_apply(iptables_manager, port_forwarding, rule_tag) ++ iptables_manager.apply() ++ self._store_local(port_forwardings, events.CREATED) ++ ++ def _process_update(self, ++ port_forwardings: List[RGPortForwarding], ++ interface_name: str, ++ interface_ip: str, ++ namespace: str, ++ iptables_manager: IptablesManager): ++ if not port_forwardings: ++ return ++ device = IPDevice(interface_name, namespace=namespace) ++ for port_forwarding in port_forwardings: ++ # check if port forwarding change from OVO and router rpc ++ if not self.mapping.check_port_forwarding_changes(port_forwarding): ++ LOG.debug("Skip port forwarding %s for update, as there is no " ++ "difference between the memory managed by agent", ++ port_forwarding.id) ++ continue ++ current_chain = _get_port_forwarding_chain_name(port_forwarding.id) ++ iptables_manager.ipv4['nat'].remove_chain(current_chain) ++ ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id] ++ device.delete_socket_conntrack_state(interface_ip, ++ ori_pf.external_port, ++ protocol=ori_pf.protocol) ++ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id ++ port_forwarding.gw_ip_address = interface_ip ++ self._rule_apply(iptables_manager, port_forwarding, rule_tag) ++ iptables_manager.apply() ++ self._store_local(port_forwardings, events.UPDATED) ++ ++ @coordination.synchronized('router-lock-ns-{namespace}') ++ def _process_delete(self, ++ port_forwardings: List[RGPortForwarding], ++ interface_name: str, ++ interface_ip: str, ++ namespace: str, ++ iptables_manager: IptablesManager): ++ if not port_forwardings: ++ return ++ device = IPDevice(interface_name, namespace=namespace) ++ for port_forwarding in port_forwardings: ++ current_chain = _get_port_forwarding_chain_name(port_forwarding.id) ++ iptables_manager.ipv4['nat'].remove_chain(current_chain) ++ device.delete_socket_conntrack_state( ++ interface_ip, ++ port_forwarding.external_port, ++ protocol=port_forwarding.protocol) ++ ++ iptables_manager.apply() ++ ++ self._store_local(port_forwardings, events.DELETED) +diff --git a/agent/l3/ha.py b/agent/l3/ha.py +index 17891dc983..182fa68175 100644 +--- a/agent/l3/ha.py ++++ b/agent/l3/ha.py +@@ -163,15 +163,6 @@ class AgentMixin(object): + 'agent %(host)s', + state_change_data) + +- # Set external gateway port link up or down according to state +- if state == 'master': +- ri.set_external_gw_port_link_status(link_up=True, set_gw=True) +- elif state == 'backup': +- ri.set_external_gw_port_link_status(link_up=False) +- else: +- LOG.warning('Router %s has status %s, ' +- 'no action to router gateway device.', +- router_id, state) + # TODO(dalvarez): Fix bug 1677279 by moving the IPv6 parameters + # configuration to keepalived-state-change in order to remove the + # dependency that currently exists on l3-agent running for the IPv6 +diff --git a/agent/l3/ha_router.py b/agent/l3/ha_router.py +index 0a21902771..ef10ff76e4 100644 +--- a/agent/l3/ha_router.py ++++ b/agent/l3/ha_router.py +@@ -17,6 +17,7 @@ import shutil + import signal + + import netaddr ++from typing import Optional, List + from neutron_lib.api.definitions import portbindings + from neutron_lib import constants as n_consts + from neutron_lib.utils import runtime +@@ -137,6 +138,22 @@ class HaRouter(router.RouterInfo): + else: + return False + ++ @property ++ def configurations(self) -> Optional[dict]: ++ return self.router.get('configurations', {}) ++ ++ @property ++ def master(self) -> Optional[str]: ++ if self.configurations: ++ return self.configurations.get('master_agent', None) ++ return None ++ ++ @property ++ def slaves(self) -> Optional[List[str]]: ++ if self.configurations: ++ return self.configurations.get('slave_agents', []) ++ return [] ++ + def initialize(self, process_monitor): + ha_port = self.router.get(n_consts.HA_INTERFACE_KEY) + if not ha_port: +@@ -162,19 +179,32 @@ class HaRouter(router.RouterInfo): + throttle_restart_value=( + self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER)) + ++ # The following call is required to ensure that if the state path does ++ # not exist it gets created. ++ self.keepalived_manager.get_full_config_file_path('test') ++ + config = self.keepalived_manager.config + + interface_name = self.get_ha_device_name() + subnets = self.ha_port.get('subnets', []) + ha_port_cidrs = [subnet['cidr'] for subnet in subnets] ++ nopreempt = True ++ state = 'BACKUP' ++ priority = self.ha_priority ++ if self.slaves and self.master: ++ nopreempt = False ++ if self.master == self.agent_conf.host: ++ state = 'MASTER' ++ priority = keepalived.HA_DEFAULT_MASTER_PRIORITY ++ + instance = keepalived.KeepalivedInstance( +- 'BACKUP', ++ state, + interface_name, + self.ha_vr_id, + ha_port_cidrs, +- nopreempt=True, ++ nopreempt=nopreempt, + advert_int=self.agent_conf.ha_vrrp_advert_int, +- priority=self.ha_priority, ++ priority=priority, + vrrp_health_check_interval=( + self.agent_conf.ha_vrrp_health_check_interval), + ha_conf_dir=self.keepalived_manager.get_conf_dir()) +@@ -396,13 +426,16 @@ class HaRouter(router.RouterInfo): + ha_device = self.get_ha_device_name() + ha_cidr = self._get_primary_vip() + config_dir = self.keepalived_manager.get_conf_dir() +- state_change_log = ( +- "%s/neutron-keepalived-state-change.log") % config_dir ++ state_change_log = f"{config_dir}/neutron-keepalived-state-change.log" + + def callback(pid_file): ++ LOG.info(f'Router: {self.router_id} master is {self.master}, ' ++ f'salves are {self.slaves}.') + cmd = [ + STATE_CHANGE_PROC_NAME, + '--router_id=%s' % self.router_id, ++ '--master_agent=%s' % self.master, ++ '--slave_agents=%s' % ','.join(self.slaves), + '--namespace=%s' % self.ha_namespace, + '--conf_dir=%s' % config_dir, + '--log-file=%s' % state_change_log, +@@ -453,9 +486,7 @@ class HaRouter(router.RouterInfo): + return port1_filtered == port2_filtered + + def external_gateway_added(self, ex_gw_port, interface_name): +- link_up = self.external_gateway_link_up() +- self._plug_external_gateway(ex_gw_port, interface_name, +- self.ns_name, link_up=link_up) ++ self._plug_external_gateway(ex_gw_port, interface_name, self.ns_name) + self._add_gateway_vip(ex_gw_port, interface_name) + self._disable_ipv6_addressing_on_interface(interface_name) + +@@ -519,30 +550,3 @@ class HaRouter(router.RouterInfo): + if (self.keepalived_manager.get_process().active and + self.ha_state == 'master'): + super(HaRouter, self).enable_radvd(internal_ports) +- +- def external_gateway_link_up(self): +- # Check HA router ha_state for its gateway port link state. +- # 'backup' instance will not link up the gateway port. +- return self.ha_state == 'master' +- +- def set_external_gw_port_link_status(self, link_up, set_gw=False): +- link_state = "up" if link_up else "down" +- LOG.info('Set router %s gateway device link state to %s.', +- self.router_id, link_state) +- +- ex_gw_port = self.get_ex_gw_port() +- ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or +- self.ex_gw_port and self.ex_gw_port['id']) +- if ex_gw_port_id: +- interface_name = self.get_external_device_name(ex_gw_port_id) +- ns_name = self.get_gw_ns_name() +- if (not self.driver.set_link_status( +- interface_name, namespace=ns_name, link_up=link_up) and +- link_up): +- LOG.error('Gateway interface for router %s was not set up; ' +- 'router will not work properly', self.router_id) +- if link_up and set_gw: +- preserve_ips = self.get_router_preserve_ips() +- self._external_gateway_settings(ex_gw_port, interface_name, +- ns_name, preserve_ips) +- self.routes_updated([], self.routes) +diff --git a/agent/l3/keepalived_state_change.py b/agent/l3/keepalived_state_change.py +index 7fd9e4269e..8c10a8b00f 100644 +--- a/agent/l3/keepalived_state_change.py ++++ b/agent/l3/keepalived_state_change.py +@@ -47,9 +47,12 @@ class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection): + + + class MonitorDaemon(daemon.Daemon): +- def __init__(self, pidfile, router_id, user, group, namespace, conf_dir, +- interface, cidr): ++ def __init__(self, pidfile, host, router_id, master, slaves, user, group, ++ namespace, conf_dir, interface, cidr): ++ self.host = host + self.router_id = router_id ++ self.master = master ++ self.slaves = slaves + self.namespace = namespace + self.conf_dir = conf_dir + self.interface = interface +@@ -62,6 +65,8 @@ class MonitorDaemon(daemon.Daemon): + user=user, group=group) + + def run(self): ++ LOG.debug(f'Router: {self.router_id} master is {self.master}, ' ++ f'salves are {self.slaves}.') + self._thread_ip_monitor = threading.Thread( + target=ip_lib.ip_monitor, + args=(self.namespace, self.queue, self.event_stop, +@@ -169,7 +174,10 @@ def main(): + keepalived.register_l3_agent_keepalived_opts() + configure(cfg.CONF) + MonitorDaemon(cfg.CONF.pid_file, ++ cfg.CONF.host, + cfg.CONF.router_id, ++ cfg.CONF.master_agent, ++ cfg.CONF.slave_agents, + cfg.CONF.user, + cfg.CONF.group, + cfg.CONF.namespace, +diff --git a/agent/l3/router_info.py b/agent/l3/router_info.py +index ea2b488cd2..eabdcc6e54 100644 +--- a/agent/l3/router_info.py ++++ b/agent/l3/router_info.py +@@ -697,16 +697,14 @@ class RouterInfo(BaseRouterInfo): + return [common_utils.ip_to_cidr(ip['floating_ip_address']) + for ip in floating_ips] + +- def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name, +- link_up=True): ++ def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name): + self.driver.plug(ex_gw_port['network_id'], + ex_gw_port['id'], + interface_name, + ex_gw_port['mac_address'], + namespace=ns_name, + prefix=EXTERNAL_DEV_PREFIX, +- mtu=ex_gw_port.get('mtu'), +- link_up=link_up) ++ mtu=ex_gw_port.get('mtu')) + + def _get_external_gw_ips(self, ex_gw_port): + gateway_ips = [] +@@ -766,11 +764,7 @@ class RouterInfo(BaseRouterInfo): + LOG.debug("External gateway added: port(%s), interface(%s), ns(%s)", + ex_gw_port, interface_name, ns_name) + self._plug_external_gateway(ex_gw_port, interface_name, ns_name) +- self._external_gateway_settings(ex_gw_port, interface_name, +- ns_name, preserve_ips) + +- def _external_gateway_settings(self, ex_gw_port, interface_name, +- ns_name, preserve_ips): + # Build up the interface and gateway IP addresses that + # will be added to the interface. + ip_cidrs = common_utils.fixed_ip_cidrs(ex_gw_port['fixed_ips']) +@@ -815,19 +809,18 @@ class RouterInfo(BaseRouterInfo): + return any(netaddr.IPAddress(gw_ip).version == 6 + for gw_ip in gateway_ips) + +- def get_router_preserve_ips(self): ++ def external_gateway_added(self, ex_gw_port, interface_name): + preserve_ips = self._list_floating_ip_cidrs() + list( + self.centralized_port_forwarding_fip_set) + preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) +- return preserve_ips + +- def external_gateway_added(self, ex_gw_port, interface_name): +- preserve_ips = self.get_router_preserve_ips() + self._external_gateway_added( + ex_gw_port, interface_name, self.ns_name, preserve_ips) + + def external_gateway_updated(self, ex_gw_port, interface_name): +- preserve_ips = self.get_router_preserve_ips() ++ preserve_ips = self._list_floating_ip_cidrs() + list( ++ self.centralized_port_forwarding_fip_set) ++ preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) + self._external_gateway_added( + ex_gw_port, interface_name, self.ns_name, preserve_ips) + +diff --git a/agent/linux/dhcp.py b/agent/linux/dhcp.py +index 249e1a8199..5b82fe328f 100644 +--- a/agent/linux/dhcp.py ++++ b/agent/linux/dhcp.py +@@ -19,6 +19,7 @@ import os + import re + import shutil + import time ++from typing import List + + import netaddr + from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext +@@ -40,6 +41,7 @@ from neutron.agent.linux import iptables_manager + from neutron.cmd import runtime_checks as checks + from neutron.common import ipv6_utils + from neutron.common import utils as common_utils ++from neutron.conf.common import NETWORK_HOST_OPTS + from neutron.ipam import utils as ipam_utils + + LOG = logging.getLogger(__name__) +@@ -81,6 +83,51 @@ def port_requires_dhcp_configuration(port): + constants.DEVICE_OWNER_DHCP] + + ++class Octopus(object): ++ def __init__(self): ++ self.tentacles = collections.defaultdict(Tentacle) ++ ++ def __str__(self): ++ lines = [''] ++ for subnet_id, tentacle in self.tentacles.items(): ++ line = (f"Subnet {subnet_id} has multi routers " ++ f"{len(tentacle.gateway_ports) > 1} " ++ f"{tentacle}") ++ lines.append(line) ++ return '\n'.join(lines) ++ ++ ++class Tentacle(object): ++ def __init__(self, gateway_ip: str): ++ self.gateway_ip = gateway_ip ++ self.tags = [] ++ self.gateway_ports = collections.defaultdict(Sucker) ++ self.suckers = collections.defaultdict(Sucker) ++ ++ def __str__(self): ++ lines = [""] ++ for port_id, sucker in self.suckers.items(): ++ line = f" Port {port_id} {sucker}" ++ lines.append(line) ++ return '\n'.join(lines) ++ ++ ++class Sucker(object): ++ def __init__(self, host: str, device_owner: str, ip_address: str): ++ self.host = host ++ self.device_owner = device_owner ++ self.ip_address = ip_address ++ self.tag = None ++ ++ def subnet_tag(self, subnet_id: str): ++ return f'{self.host}-subnet-{subnet_id}' ++ ++ def __str__(self): ++ return (f"ip: {self.ip_address} \t" ++ f"binding_host: {self.host} \t" ++ f"device_owner: {self.device_owner}") ++ ++ + class DictModel(dict): + """Convert dict into an object that provides attribute access to values.""" + +@@ -149,6 +196,10 @@ class DhcpBase(object): + self.process_monitor = process_monitor + self.device_manager = DeviceManager(self.conf, plugin) + self.version = version ++ self.octopus = Octopus() ++ self._init_octopus() ++ self.compute_to_network = dict() ++ self._init_compute_to_network() + + @abc.abstractmethod + def enable(self): +@@ -193,6 +244,31 @@ class DhcpBase(object): + """True if the metadata-proxy should be enabled for the network.""" + raise NotImplementedError() + ++ def _init_compute_to_network(self): ++ for network_node in self.conf.network_nodes: ++ self.conf.register_opts(NETWORK_HOST_OPTS, group=network_node) ++ network_group = self.conf.get(network_node, None) ++ if network_group: ++ compute_nodes = network_group.get('compute_nodes', []) ++ for compute_node in compute_nodes: ++ self.compute_to_network[compute_node] = network_node ++ ++ def _init_octopus(self): ++ for subnet in self.network.subnets: ++ self.octopus.tentacles[subnet.id] = Tentacle(subnet.gateway_ip) ++ ++ for port in self.network.ports: ++ for ip in port.fixed_ips: ++ host = port.get('binding:host_id') ++ device_owner = port.get('device_owner') ++ ip_address = ip.get('ip_address') ++ tentacle = self.octopus.tentacles[ip.subnet_id] ++ sucker = Sucker(host, device_owner, ip_address) ++ tentacle.suckers[port.id] = sucker ++ if (device_owner in (constants.DEVICE_OWNER_HA_REPLICATED_INT, ++ constants.DEVICE_OWNER_ROUTER_INTF)): ++ tentacle.gateway_ports[host] = sucker ++ + + @six.add_metaclass(abc.ABCMeta) + class DhcpLocalProcess(DhcpBase): +@@ -841,8 +917,18 @@ class Dnsmasq(DhcpLocalProcess): + (port.mac_address, tag, name, ip_address, + 'set:', self._PORT_TAG_PREFIX % port.id)) + else: +- buf.write('%s,%s%s,%s\n' % +- (port.mac_address, tag, name, ip_address)) ++ subnet_tag = f'subnet-{alloc.subnet_id}' ++ if self.conf.enable_set_route_for_single_port: ++ tentacle = self.octopus.tentacles[alloc.subnet_id] ++ sucker = tentacle.suckers[port.id] ++ tentacle.tags.append(subnet_tag) ++ if sucker.device_owner.startswith( ++ constants.DEVICE_OWNER_COMPUTE_PREFIX): ++ subnet_tag = sucker.subnet_tag(alloc.subnet_id) ++ sucker.tag = subnet_tag ++ ++ buf.write(f'{port.mac_address},{tag}{name},{ip_address},' ++ f'set:{subnet_tag}\n') + + file_utils.replace_file(filename, buf.getvalue()) + LOG.debug('Done building host file %s', filename) +@@ -1059,7 +1145,8 @@ class Dnsmasq(DhcpLocalProcess): + """Write a dnsmasq compatible options file.""" + options, subnet_index_map = self._generate_opts_per_subnet() + options += self._generate_opts_per_port(subnet_index_map) +- ++ if self.conf.enable_set_route_for_single_port: ++ options += self._generate_opts_for_compute_port(options) + name = self.get_conf_file_name('opts') + file_utils.replace_file(name, '\n'.join(options)) + return name +@@ -1220,6 +1307,50 @@ class Dnsmasq(DhcpLocalProcess): + vx_ips)))) + return options + ++ def _generate_opts_for_compute_port(self, options: List[str]) -> List[str]: ++ new_options = [] ++ LOG.debug(self.octopus) ++ if not self.compute_to_network: ++ LOG.warning('CONF.enable_set_route_for_single_port is True, ' ++ 'but not configured.') ++ return new_options ++ for subnet_id, tentacle in self.octopus.tentacles.items(): ++ if len(tentacle.tags) == 0: ++ continue ++ if len(tentacle.gateway_ports) <= 1: ++ LOG.info(f'Subnet {subnet_id} is not bound ' ++ f'to different routers, ' ++ f'so skip generate options for compute ports.') ++ continue ++ for port_id, sucker in tentacle.suckers.items(): ++ if not sucker.tag: ++ continue ++ if not sucker.device_owner.startswith( ++ constants.DEVICE_OWNER_COMPUTE_PREFIX): ++ continue ++ network_node = self.compute_to_network.get(sucker.host, None) ++ if not network_node: ++ LOG.warning(f'Compute host {sucker.host} not configured.') ++ continue ++ port = tentacle.gateway_ports.get(network_node, None) ++ if not port: ++ LOG.warning(f'Subnet {subnet_id} does not have gateway ' ++ f'port on network host {network_node}.') ++ continue ++ if tentacle.gateway_ip == port.ip_address: ++ continue ++ for option in options.copy(): ++ if subnet_id in option: ++ option = option.replace(f'subnet-{subnet_id}', ++ sucker.tag) ++ if ('option:classless-static-route' in option or ++ ',249,' in option or 'option:router' in option): ++ gateway_ip = option.split(',')[-1] ++ option = option.replace(gateway_ip, ++ port.ip_address) ++ new_options.append(option) ++ return new_options ++ + def _make_subnet_interface_ip_map(self): + subnet_lookup = dict( + (netaddr.IPNetwork(subnet.cidr), subnet.id) +diff --git a/agent/linux/interface.py b/agent/linux/interface.py +index 3ac476d7ba..2e6455707c 100644 +--- a/agent/linux/interface.py ++++ b/agent/linux/interface.py +@@ -259,17 +259,16 @@ class LinuxInterfaceDriver(object): + + @abc.abstractmethod + def plug_new(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, +- link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + """Plug in the interface only for new devices that don't exist yet.""" + + def plug(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + if not ip_lib.device_exists(device_name, + namespace=namespace): + self._safe_plug_new( + network_id, port_id, device_name, mac_address, bridge, +- namespace, prefix, mtu, link_up) ++ namespace, prefix, mtu) + else: + LOG.info("Device %s already exists", device_name) + if mtu: +@@ -279,11 +278,11 @@ class LinuxInterfaceDriver(object): + LOG.warning("No MTU configured for port %s", port_id) + + def _safe_plug_new(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + try: + self.plug_new( + network_id, port_id, device_name, mac_address, bridge, +- namespace, prefix, mtu, link_up) ++ namespace, prefix, mtu) + except TypeError: + LOG.warning("Interface driver's plug_new() method should now " + "accept additional optional parameter 'link_up'. " +@@ -321,27 +320,10 @@ class LinuxInterfaceDriver(object): + LOG.warning("Interface driver cannot update MTU for ports") + self._mtu_update_warn_logged = True + +- def set_link_status(self, device_name, namespace=None, link_up=True): +- ns_dev = ip_lib.IPWrapper(namespace=namespace).device(device_name) +- try: +- utils.wait_until_true(ns_dev.exists, timeout=3) +- except utils.WaitTimeout: +- LOG.debug('Device %s may have been deleted concurrently', +- device_name) +- return False +- +- if link_up: +- ns_dev.link.set_up() +- else: +- ns_dev.link.set_down() +- +- return True +- + + class NullDriver(LinuxInterfaceDriver): + def plug_new(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, +- link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + pass + + def unplug(self, device_name, bridge=None, namespace=None, prefix=None): +@@ -377,8 +359,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver): + ovs.replace_port(device_name, *attrs) + + def plug_new(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, +- link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + """Plug in the interface.""" + if not bridge: + bridge = self.conf.ovs_integration_bridge +@@ -442,8 +423,8 @@ class OVSInterfaceDriver(LinuxInterfaceDriver): + else: + LOG.warning("No MTU configured for port %s", port_id) + +- if link_up: +- ns_dev.link.set_up() ++ ns_dev.link.set_up() ++ + if self.conf.ovs_use_veth: + # ovs-dpdk does not do checksum calculations for veth interface + # (bug 1832021) +@@ -488,8 +469,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver): + DEV_NAME_PREFIX = 'ns-' + + def plug_new(self, network_id, port_id, device_name, mac_address, +- bridge=None, namespace=None, prefix=None, mtu=None, +- link_up=True): ++ bridge=None, namespace=None, prefix=None, mtu=None): + """Plugin the interface.""" + ip = ip_lib.IPWrapper() + +@@ -508,8 +488,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver): + LOG.warning("No MTU configured for port %s", port_id) + + root_veth.link.set_up() +- if link_up: +- ns_veth.link.set_up() ++ ns_veth.link.set_up() + + def unplug(self, device_name, bridge=None, namespace=None, prefix=None): + """Unplug the interface.""" +diff --git a/agent/linux/keepalived.py b/agent/linux/keepalived.py +index f47a27f1d1..405a781f0b 100644 +--- a/agent/linux/keepalived.py ++++ b/agent/linux/keepalived.py +@@ -32,6 +32,7 @@ from neutron.common import utils + VALID_STATES = ['MASTER', 'BACKUP'] + VALID_AUTH_TYPES = ['AH', 'PASS'] + HA_DEFAULT_PRIORITY = 50 ++HA_DEFAULT_MASTER_PRIORITY = 100 + PRIMARY_VIP_RANGE_SIZE = 24 + KEEPALIVED_SERVICE_NAME = 'keepalived' + KEEPALIVED_EMAIL_FROM = 'neutron@openstack.local' +diff --git a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +index 29b61b2c9b..fbad5f1d9a 100644 +--- a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py ++++ b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +@@ -312,6 +312,15 @@ class DhcpAgentNotifyAPI(object): + return False + if set(orig.keys()) != set(new.keys()): + return False ++ ++ if cfg.CONF.enable_set_route_for_single_port: ++ device_owner = new.get('device_owner', None) ++ orig_device_owner = orig.get('device_owner', None) ++ if (not orig_device_owner and device_owner and ++ device_owner.startswith( ++ constants.DEVICE_OWNER_COMPUTE_PREFIX)): ++ return True ++ + for k in orig.keys(): + if k in ('status', 'updated_at', 'revision_number'): + continue +diff --git a/api/rpc/callbacks/resources.py b/api/rpc/callbacks/resources.py +index 734f05eb6f..de56211c53 100644 +--- a/api/rpc/callbacks/resources.py ++++ b/api/rpc/callbacks/resources.py +@@ -15,6 +15,7 @@ from neutron.objects import conntrack_helper + from neutron.objects.logapi import logging_resource as log_object + from neutron.objects import network + from neutron.objects import port_forwarding ++from neutron.objects import rg_port_forwarding + from neutron.objects import ports + from neutron.objects.qos import policy + from neutron.objects import securitygroup +@@ -33,6 +34,7 @@ SUBNET = subnet.Subnet.obj_name() + SECURITYGROUP = securitygroup.SecurityGroup.obj_name() + SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name() + PORTFORWARDING = port_forwarding.PortForwarding.obj_name() ++RGPORTFORWARDING = rg_port_forwarding.RGPortForwarding.obj_name() + CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name() + + +@@ -47,6 +49,7 @@ _VALID_CLS = ( + securitygroup.SecurityGroupRule, + log_object.Log, + port_forwarding.PortForwarding, ++ rg_port_forwarding.RGPortForwarding, + conntrack_helper.ConntrackHelper, + ) + +diff --git a/conf/agent/l3/keepalived.py b/conf/agent/l3/keepalived.py +index bd46c723fc..5ff3492280 100644 +--- a/conf/agent/l3/keepalived.py ++++ b/conf/agent/l3/keepalived.py +@@ -20,6 +20,8 @@ from neutron._i18n import _ + + CLI_OPTS = [ + cfg.StrOpt('router_id', help=_('ID of the router')), ++ cfg.StrOpt('master_agent', help=_('The master agent of router')), ++ cfg.ListOpt('slave_agents', help=_('The slave agents of router')), + cfg.StrOpt('namespace', help=_('Namespace of the router')), + cfg.StrOpt('conf_dir', help=_('Path to the router directory')), + cfg.StrOpt('monitor_interface', help=_('Interface to monitor')), +diff --git a/conf/common.py b/conf/common.py +index f885429613..45e0b48723 100644 +--- a/conf/common.py ++++ b/conf/common.py +@@ -145,7 +145,21 @@ core_opts = [ + "Setting to any positive integer means that on failure " + "the connection is retried that many times. " + "For example, setting to 3 means total attempts to " +- "connect will be 4.")) ++ "connect will be 4.")), ++ cfg.BoolOpt('enable_set_route_for_single_port', default=False, ++ help=_("To set route path for every single port " ++ "when the same subnet has multi ports on router.")), ++ cfg.ListOpt('network_nodes', ++ default=[], ++ help=_("The list of network hosts to " ++ "make a network map " ++ "with compute node and network node.")), ++] ++ ++NETWORK_HOST_OPTS = [ ++ cfg.ListOpt('compute_nodes', ++ default=[], ++ help=_("The list of compute hosts.")) + ] + + core_cli_opts = [ +diff --git a/conf/policies/__init__.py b/conf/policies/__init__.py +index aa4dda63d0..15cdaea45a 100644 +--- a/conf/policies/__init__.py ++++ b/conf/policies/__init__.py +@@ -34,6 +34,7 @@ from neutron.conf.policies import port + from neutron.conf.policies import qos + from neutron.conf.policies import rbac + from neutron.conf.policies import router ++from neutron.conf.policies import rg_port_forwarding + from neutron.conf.policies import security_group + from neutron.conf.policies import segment + from neutron.conf.policies import service_type +@@ -63,6 +64,7 @@ def list_rules(): + qos.list_rules(), + rbac.list_rules(), + router.list_rules(), ++ rg_port_forwarding.list_rules(), + security_group.list_rules(), + segment.list_rules(), + service_type.list_rules(), +diff --git a/conf/policies/rg_port_forwarding.py b/conf/policies/rg_port_forwarding.py +new file mode 100644 +index 0000000000..19e2cd5e2f +--- /dev/null ++++ b/conf/policies/rg_port_forwarding.py +@@ -0,0 +1,76 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++ ++from oslo_policy import policy ++from neutron.conf.policies import base ++ ++COLLECTION_PATH = '/routers/{router_id}/port_forwardings' ++RESOURCE_PATH = '/routers/{router_id}/port_forwardings/{port_forwarding_id}' ++ ++rules = [ ++ policy.DocumentedRuleDefault( ++ 'create_router_gateway_port_forwarding', ++ base.RULE_ADMIN_OR_PARENT_OWNER, ++ 'Create a router gateway port forwarding', ++ [ ++ { ++ 'method': 'POST', ++ 'path': COLLECTION_PATH, ++ }, ++ ] ++ ), ++ policy.DocumentedRuleDefault( ++ 'get_router_gateway_forwarding', ++ base.RULE_ADMIN_OR_PARENT_OWNER, ++ 'Get a router gateway port forwarding', ++ [ ++ { ++ 'method': 'GET', ++ 'path': COLLECTION_PATH, ++ }, ++ { ++ 'method': 'GET', ++ 'path': RESOURCE_PATH, ++ }, ++ ] ++ ), ++ policy.DocumentedRuleDefault( ++ 'update_router_gateway_port_forwarding', ++ base.RULE_ADMIN_OR_PARENT_OWNER, ++ 'Update a floating IP port forwarding', ++ [ ++ { ++ 'method': 'PUT', ++ 'path': RESOURCE_PATH, ++ }, ++ ] ++ ), ++ policy.DocumentedRuleDefault( ++ 'delete_router_gateway_port_forwarding', ++ base.RULE_ADMIN_OR_PARENT_OWNER, ++ 'Delete a floating IP port forwarding', ++ [ ++ { ++ 'method': 'DELETE', ++ 'path': RESOURCE_PATH, ++ }, ++ ] ++ ), ++] ++ ++ ++def list_rules(): ++ return rules +diff --git a/db/l3_attrs_db.py b/db/l3_attrs_db.py +index e6d4e298b1..f292b7aa32 100644 +--- a/db/l3_attrs_db.py ++++ b/db/l3_attrs_db.py +@@ -19,6 +19,7 @@ from oslo_config import cfg + + from neutron._i18n import _ + from neutron.db.models import l3_attrs ++from neutron.objects.base import NeutronDbObject + + + def get_attr_info(): +@@ -29,7 +30,11 @@ def get_attr_info(): + 'availability_zone_hints': { + 'default': '[]', + 'transform_to_db': az_validator.convert_az_list_to_string, +- 'transform_from_db': az_validator.convert_az_string_to_list} ++ 'transform_from_db': az_validator.convert_az_string_to_list}, ++ 'configurations': { ++ 'default': '{}', ++ 'transform_to_db': NeutronDbObject.filter_to_json_str, ++ 'transform_from_db': NeutronDbObject.load_json_from_str} + } + + +diff --git a/db/l3_db.py b/db/l3_db.py +index 565b422532..b625dc1959 100644 +--- a/db/l3_db.py ++++ b/db/l3_db.py +@@ -47,6 +47,7 @@ from neutron.common import ipv6_utils + from neutron.common import utils + from neutron.db import _utils as db_utils + from neutron.db.models import l3 as l3_models ++from neutron.plugins.ml2 import models as ml2_models + from neutron.db import models_v2 + from neutron.db import standardattrdescription_db as st_attr + from neutron.extensions import l3 +@@ -1086,21 +1087,37 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, + # with subnet's gateway-ip, return that router. + # Otherwise return the first router. + RouterPort = l3_models.RouterPort ++ RouterPortBinding = orm.aliased(ml2_models.PortBinding, ++ name="router_port_binding") ++ ComputePortBinding = orm.aliased(ml2_models.PortBinding, ++ name="compute_port_binding") + gw_port = orm.aliased(models_v2.Port, name="gw_port") + # TODO(lujinluo): Need IPAllocation and Port object + routerport_qry = context.session.query( +- RouterPort.router_id, models_v2.IPAllocation.ip_address).join( +- RouterPort.port, models_v2.Port.fixed_ips).filter( ++ RouterPort.router_id, models_v2.IPAllocation.ip_address, ++ RouterPortBinding.host, ComputePortBinding.host, ++ ).join( ++ RouterPort.port, models_v2.Port.fixed_ips ++ ).filter( + models_v2.Port.network_id == internal_port['network_id'], + RouterPort.port_type.in_(constants.ROUTER_INTERFACE_OWNERS), +- models_v2.IPAllocation.subnet_id == internal_subnet['id'] +- ).join(gw_port, gw_port.device_id == RouterPort.router_id).filter( ++ models_v2.IPAllocation.subnet_id == internal_subnet['id'], ++ ComputePortBinding.port_id == internal_port['id'], ++ ).join( ++ gw_port, gw_port.device_id == RouterPort.router_id ++ ).filter( + gw_port.network_id == external_network_id, + gw_port.device_owner == DEVICE_OWNER_ROUTER_GW ++ ).join( ++ RouterPortBinding, RouterPortBinding.port_id == models_v2.Port.id + ).distinct() + + first_router_id = None +- for router_id, interface_ip in routerport_qry: ++ for (router_id, interface_ip, ++ network_host, compute_host) in routerport_qry: ++ network_node = self.compute_to_network.get(compute_host, None) ++ if network_node and network_node == network_host: ++ return router_id + if interface_ip == internal_subnet['gateway_ip']: + return router_id + if not first_router_id: +diff --git a/db/l3_hamode_db.py b/db/l3_hamode_db.py +index bff388e166..4c414016dd 100644 +--- a/db/l3_hamode_db.py ++++ b/db/l3_hamode_db.py +@@ -32,6 +32,7 @@ from neutron_lib import constants + from neutron_lib.db import api as db_api + from neutron_lib import exceptions as n_exc + from neutron_lib.exceptions import l3 as l3_exc ++from neutron_lib.exceptions import agent as agent_exc + from neutron_lib.exceptions import l3_ext_ha_mode as l3ha_exc + from neutron_lib.objects import exceptions as obj_base + from neutron_lib.plugins import utils as p_utils +@@ -54,6 +55,7 @@ from neutron.db import l3_dvr_db + from neutron.objects import base + from neutron.objects import l3_hamode + from neutron.objects import router as l3_obj ++from neutron.objects import agent as agent_obj + + + VR_ID_RANGE = set(range(1, 255)) +@@ -378,12 +380,85 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, + if not self.get_ha_network(context, router['tenant_id']): + self._create_ha_network(context, router['tenant_id']) + ++ @staticmethod ++ def _check_router_configurations_creation(context, ++ configurations: dict, ++ is_ha: bool = True): ++ agents = agent_obj.Agent.get_objects(context, ++ binary='neutron-l3-agent') ++ agents = [agent['host'] for agent in agents] ++ master = configurations.get('master_agent', None) ++ slaves = configurations.get('slave_agents', []) ++ preferred_agent = configurations.get('preferred_agent', None) ++ if is_ha: ++ if master and slaves: ++ for agent in [master] + slaves: ++ if agent not in agents: ++ raise agent_exc.AgentNotFound(id=agent) ++ if master in slaves: ++ raise l3_exc.RouterAgentConflict() ++ else: ++ if master or slaves: ++ raise l3_exc.RouterAgentNotGiven() ++ else: ++ if preferred_agent and preferred_agent not in agents: ++ raise agent_exc.AgentNotFound(id=preferred_agent) ++ ++ @staticmethod ++ def _check_router_configurations_update(context, ++ configurations: dict, ++ old_configurations: dict, ++ is_ha: bool = True) -> bool: ++ if configurations == old_configurations: ++ return False ++ ++ agents = agent_obj.Agent.get_objects(context, ++ binary='neutron-l3-agent') ++ agents = [agent['host'] for agent in agents] ++ master = configurations.get('master_agent', None) ++ slaves = configurations.get('slave_agents', []) ++ preferred_agent = configurations.get('preferred_agent', None) ++ old_master = old_configurations.get('master_agent', None) ++ old_slaves = old_configurations.get('slave_agents', []) ++ old_preferred_agent = old_configurations.get('preferred_agent', None) ++ if is_ha: ++ if master: ++ if master != old_master: ++ if master not in agents: ++ raise agent_exc.AgentNotFound(id=master) ++ old_configurations['master_agent'] = master ++ if slaves: ++ if slaves != old_slaves: ++ for slave in slaves: ++ if slave not in agents: ++ raise agent_exc.AgentNotFound(id=slave) ++ old_configurations['slave_agents'] = slaves ++ if (old_configurations['master_agent'] in ++ old_configurations['slave_agents']): ++ raise l3_exc.RouterAgentConflict() ++ else: ++ if preferred_agent: ++ if preferred_agent != old_preferred_agent: ++ if preferred_agent not in agents: ++ raise agent_exc.AgentNotFound(id=preferred_agent) ++ old_configurations['preferred_agent'] = preferred_agent ++ else: ++ old_configurations['preferred_agent'] = None ++ ++ return True ++ + @registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE], + priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) + def _precommit_router_create(self, resource, event, trigger, context, + router, router_db, **kwargs): + """Event handler to set ha flag and status on creation.""" + is_ha = self._is_ha(router) ++ configurations = router.get('configurations', {}) ++ if configurations: ++ self._check_router_configurations_creation(context, configurations, ++ is_ha) ++ self.set_extra_attr_value(context, router_db, 'configurations', ++ configurations) + router['ha'] = is_ha + self.set_extra_attr_value(context, router_db, 'ha', is_ha) + if not is_ha: +@@ -465,6 +540,28 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, + self.set_extra_attr_value( + payload.context, payload.desired_state, 'ha', requested_ha_state) + ++ @registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE], ++ priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) ++ def _validate_configurations(self, resource, event, trigger, payload=None): ++ old_configurations = payload.states[0].get('configurations', {}) ++ configurations = payload.request_body.get('configurations', {}) ++ ++ if not configurations: ++ return ++ ++ if payload.desired_state.admin_state_up: ++ msg = _('Cannot change configurations of active routers. Please ' ++ 'set router admin_state_up to False prior to upgrade') ++ raise n_exc.BadRequest(resource='router', msg=msg) ++ ++ need_update = self._check_router_configurations_update( ++ payload.context, configurations, old_configurations, ++ payload.states[0]['ha']) ++ ++ if need_update: ++ self.set_extra_attr_value(payload.context, payload.desired_state, ++ 'configurations', old_configurations) ++ + @registry.receives(resources.ROUTER, [events.AFTER_UPDATE], + priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE) + def _reconfigure_ha_resources(self, resource, event, trigger, context, +diff --git a/db/migration/alembic_migrations/versions/EXPAND_HEAD b/db/migration/alembic_migrations/versions/EXPAND_HEAD +index ffa2bbaaf6..0c8e4a2178 100644 +--- a/db/migration/alembic_migrations/versions/EXPAND_HEAD ++++ b/db/migration/alembic_migrations/versions/EXPAND_HEAD +@@ -1 +1 @@ +-c613d0b82681 ++1c19a98b5eef +diff --git a/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py +new file mode 100644 +index 0000000000..f12600ef4f +--- /dev/null ++++ b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py +@@ -0,0 +1,36 @@ ++# Copyright 2023 OpenStack Foundation ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++# ++ ++from alembic import op ++import sqlalchemy as sa ++ ++"""add router configurations ++ ++Revision ID: 1c19a98b5eef ++Revises: cab12b72ed90 ++Create Date: 2023-08-01 10:05:56.412167 ++ ++""" ++ ++# revision identifiers, used by Alembic. ++revision = '1c19a98b5eef' ++down_revision = 'cab12b72ed90' ++ ++ ++def upgrade(): ++ # ### commands auto generated by Alembic - please adjust! ### ++ op.add_column('router_extra_attributes', ++ sa.Column('configurations', sa.String(length=4095))) ++ # ### end Alembic commands ### +diff --git a/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py +new file mode 100644 +index 0000000000..ad511a7ed8 +--- /dev/null ++++ b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py +@@ -0,0 +1,55 @@ ++# Copyright 2023 OpenStack Foundation ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++# ++ ++from alembic import op ++import sqlalchemy as sa ++ ++"""add router gateway port forwarding ++ ++Revision ID: cab12b72ed90 ++Revises: c613d0b82681 ++Create Date: 2023-07-04 10:27:54.485453 ++ ++""" ++ ++# revision identifiers, used by Alembic. ++revision = 'cab12b72ed90' ++down_revision = 'c613d0b82681' ++ ++ ++def upgrade(): ++ # ### commands auto generated by Alembic - please adjust! ### ++ op.create_table( ++ 'rgportforwardings', ++ sa.Column('id', sa.String(length=36), nullable=False), ++ sa.Column('router_id', sa.String(length=36), nullable=False), ++ sa.Column('external_port', sa.Integer(), nullable=False), ++ sa.Column('internal_neutron_port_id', sa.String(length=36), ++ nullable=False), ++ sa.Column('protocol', sa.String(length=40), nullable=False), ++ sa.Column('socket', sa.String(length=36), nullable=False), ++ sa.ForeignKeyConstraint(['internal_neutron_port_id'], ['ports.id'], ++ ondelete='CASCADE'), ++ sa.ForeignKeyConstraint(['router_id'], ['routers.id'], ++ ondelete='CASCADE'), ++ sa.PrimaryKeyConstraint('id'), ++ sa.UniqueConstraint( ++ 'internal_neutron_port_id', 'socket', 'protocol', ++ name='uniq_port_forwardings0internal_neutron_port_id0socket0protocol'), ++ sa.UniqueConstraint( ++ 'router_id', 'external_port', 'protocol', ++ name='uniq_rg_port_forwardings0router_id0external_port0protocol') ++ ) ++ # ### end Alembic commands ### +diff --git a/db/models/l3_attrs.py b/db/models/l3_attrs.py +index 6c30ac2c16..904f4ef08d 100644 +--- a/db/models/l3_attrs.py ++++ b/db/models/l3_attrs.py +@@ -41,6 +41,8 @@ class RouterExtraAttributes(model_base.BASEV2): + # Availability Zone support + availability_zone_hints = sa.Column(sa.String(255)) + ++ configurations = sa.Column(sa.String(4095)) ++ + router = orm.relationship( + 'Router', load_on_pending=True, + backref=orm.backref("extra_attributes", lazy='joined', +diff --git a/db/models/rg_port_forwarding.py b/db/models/rg_port_forwarding.py +new file mode 100644 +index 0000000000..e7963169a8 +--- /dev/null ++++ b/db/models/rg_port_forwarding.py +@@ -0,0 +1,59 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++import sqlalchemy as sa ++from sqlalchemy import orm ++from neutron_lib.db import model_base ++from neutron_lib.db import constants as db_const ++ ++from neutron.db.models import l3 ++from neutron.db import models_v2 ++ ++ ++class RGPortForwarding(model_base.BASEV2, model_base.HasId): ++ __table_args__ = ( ++ sa.UniqueConstraint('router_id', 'external_port', 'protocol', ++ name='uniq_rg_port_forwardings0router_id0' ++ 'external_port0protocol'), ++ sa.UniqueConstraint('internal_neutron_port_id', 'socket', 'protocol', ++ name='uniq_port_forwardings0' ++ 'internal_neutron_port_id0socket0' ++ 'protocol') ++ ) ++ ++ router_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE), ++ sa.ForeignKey('routers.id', ++ ondelete="CASCADE"), ++ nullable=False) ++ external_port = sa.Column(sa.Integer, nullable=False) ++ internal_neutron_port_id = sa.Column( ++ sa.String(db_const.UUID_FIELD_SIZE), ++ sa.ForeignKey('ports.id', ondelete="CASCADE"), ++ nullable=False) ++ protocol = sa.Column(sa.String(40), nullable=False) ++ socket = sa.Column(sa.String(36), nullable=False) ++ port = orm.relationship( ++ models_v2.Port, load_on_pending=True, ++ backref=orm.backref("rg_port_forwardings", ++ lazy='subquery', uselist=True, ++ cascade='delete') ++ ) ++ router = orm.relationship( ++ l3.Router, load_on_pending=True, ++ backref=orm.backref("rg_port_forwardings", ++ lazy='subquery', uselist=True, ++ cascade='delete') ++ ) ++ revises_on_change = ('router', 'port',) +diff --git a/extensions/rg_port_forwarding.py b/extensions/rg_port_forwarding.py +new file mode 100644 +index 0000000000..c9888cd1ec +--- /dev/null ++++ b/extensions/rg_port_forwarding.py +@@ -0,0 +1,119 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++import six ++import abc ++import itertools ++from typing import List ++ ++from neutron_lib.context import Context ++from neutron_lib.plugins import constants ++from neutron_lib.plugins import directory ++from neutron_lib.services import base as service_base ++from neutron_lib.api.extensions import APIExtensionDescriptor ++from neutron_lib.api.definitions import rg_port_forwarding as apidef ++ ++from neutron.api.v2 import base ++from neutron.api.v2 import resource_helper ++from neutron.api.extensions import ResourceExtension ++ ++ ++class Rg_port_forwarding(APIExtensionDescriptor): ++ api_definition = apidef ++ ++ @classmethod ++ def get_plugin_interface(cls): ++ return RGPortForwardingPluginBase ++ ++ @classmethod ++ def get_resources(cls): ++ special_mappings = {'routers': 'router'} ++ plural_mappings = resource_helper.build_plural_mappings( ++ special_mappings, ++ itertools.chain( ++ apidef.RESOURCE_ATTRIBUTE_MAP, ++ apidef.SUB_RESOURCE_ATTRIBUTE_MAP ++ ) ++ ) ++ ++ resources = resource_helper.build_resource_info( ++ plural_mappings, ++ apidef.RESOURCE_ATTRIBUTE_MAP, ++ constants.ROUTER_GATEWAY_PORTFORWARDING, ++ translate_name=True, ++ allow_bulk=True) ++ ++ plugin = directory.get_plugin(constants.ROUTER_GATEWAY_PORTFORWARDING) ++ ++ parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get( ++ 'parent') ++ params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get( ++ 'parameters') ++ controller = base.create_resource( ++ apidef.COLLECTION_NAME, apidef.RESOURCE_NAME, plugin, params, ++ allow_bulk=True, parent=parent, allow_pagination=True, ++ allow_sorting=True) ++ ++ resource = ResourceExtension( ++ apidef.COLLECTION_NAME, controller, parent, attr_map=params) ++ resources.append(resource) ++ ++ return resources ++ ++ ++@six.add_metaclass(abc.ABCMeta) ++class RGPortForwardingPluginBase(service_base.ServicePluginBase): ++ path_prefix = apidef.API_PREFIX ++ ++ @classmethod ++ def get_plugin_type(cls): ++ return constants.ROUTER_GATEWAY_PORTFORWARDING ++ ++ def get_plugin_description(self): ++ return "Router Gateway Port Forwarding Service Plugin" ++ ++ @abc.abstractmethod ++ def create_router_gateway_port_forwarding(self, context: Context, ++ router_id: str, ++ gateway_port_forwarding: dict): ++ pass ++ ++ @abc.abstractmethod ++ def update_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str, ++ gateway_port_forwarding: dict): ++ pass ++ ++ @abc.abstractmethod ++ def get_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str, ++ fields: List[str] = None): ++ pass ++ ++ @abc.abstractmethod ++ def get_router_gateway_port_forwardings(self, context: Context, ++ router_id: str, ++ filters: List[str] = None, ++ fields: List[str] = None, ++ sorts: List[str] = None, ++ limit: int = None, ++ marker: str = None, ++ page_reverse: bool = False): ++ pass ++ ++ @abc.abstractmethod ++ def delete_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str): ++ pass +diff --git a/objects/rg_port_forwarding.py b/objects/rg_port_forwarding.py +new file mode 100644 +index 0000000000..28cb4e1d4d +--- /dev/null ++++ b/objects/rg_port_forwarding.py +@@ -0,0 +1,87 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++import netaddr ++from neutron_lib import constants as lib_const ++from oslo_versionedobjects import fields as obj_fields ++ ++from neutron.objects import base, common_types ++from neutron.db.models import rg_port_forwarding as models ++ ++FIELDS_NOT_SUPPORT_FILTER = ['internal_ip_address', 'internal_port'] ++ ++ ++@base.NeutronObjectRegistry.register ++class RGPortForwarding(base.NeutronDbObject): ++ VERSION = '1.0' ++ ++ db_model = models.RGPortForwarding ++ ++ primary_keys = ['id'] ++ foreign_keys = { ++ 'Router': {'router_id': 'id'}, ++ 'Port': {'internal_port_id': 'id'} ++ } ++ fields_need_translation = { ++ 'socket': 'socket', ++ 'internal_port_id': 'internal_neutron_port_id' ++ } ++ ++ fields = { ++ 'id': common_types.UUIDField(), ++ 'router_id': common_types.UUIDField(nullable=False), ++ 'external_port': common_types.PortRangeField(nullable=False), ++ 'protocol': common_types.IpProtocolEnumField(nullable=False), ++ 'internal_port_id': common_types.UUIDField(nullable=False), ++ 'internal_ip_address': obj_fields.IPV4AddressField(), ++ 'internal_port': common_types.PortRangeField(nullable=False), ++ 'gw_ip_address': obj_fields.IPV4AddressField(), ++ } ++ ++ synthetic_fields = ['gw_ip_address'] ++ fields_no_update = {'id', 'router_id'} ++ ++ def __eq__(self, other): ++ for attr in self.fields: ++ if getattr(self, attr) != getattr(other, attr): ++ return False ++ return True ++ ++ def obj_load_attr(self, attrname): ++ super(RGPortForwarding, self).obj_load_attr(attrname) ++ ++ def from_db_object(self, db_obj): ++ super(RGPortForwarding, self).from_db_object(db_obj) ++ ++ @classmethod ++ def modify_fields_from_db(cls, db_obj): ++ result = super(RGPortForwarding, cls).modify_fields_from_db(db_obj) ++ if 'socket' in result: ++ groups = result['socket'].split(":") ++ result['internal_ip_address'] = netaddr.IPAddress( ++ groups[0], version=lib_const.IP_VERSION_4) ++ result['internal_port'] = int(groups[1]) ++ del result['socket'] ++ return result ++ ++ @classmethod ++ def modify_fields_to_db(cls, fields): ++ result = super(RGPortForwarding, cls).modify_fields_to_db(fields) ++ if 'internal_ip_address' in result and 'internal_port' in result: ++ result['socket'] = (f"{result['internal_ip_address']}:" ++ f"{result['internal_port']}") ++ del result['internal_ip_address'] ++ del result['internal_port'] ++ return result +diff --git a/objects/router.py b/objects/router.py +index 1373f89515..9590a109f6 100644 +--- a/objects/router.py ++++ b/objects/router.py +@@ -18,6 +18,7 @@ from neutron_lib.api.definitions import availability_zone as az_def + from neutron_lib.api.validators import availability_zone as az_validator + from neutron_lib import constants as n_const + from neutron_lib.utils import net as net_utils ++from neutron_lib.objects import utils as obj_utils + from oslo_versionedobjects import fields as obj_fields + import six + from sqlalchemy import func +@@ -70,7 +71,8 @@ class RouterRoute(base.NeutronDbObject): + @base.NeutronObjectRegistry.register + class RouterExtraAttributes(base.NeutronDbObject): + # Version 1.0: Initial version +- VERSION = '1.0' ++ # Version 1.1: Add configurations ++ VERSION = '1.1' + + db_model = l3_attrs.RouterExtraAttributes + +@@ -80,7 +82,8 @@ class RouterExtraAttributes(base.NeutronDbObject): + 'service_router': obj_fields.BooleanField(default=False), + 'ha': obj_fields.BooleanField(default=False), + 'ha_vr_id': obj_fields.IntegerField(nullable=True), +- 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True) ++ 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True), ++ 'configurations': common_types.DictOfMiscValuesField(nullable=True), + } + + primary_keys = ['router_id'] +@@ -95,6 +98,9 @@ class RouterExtraAttributes(base.NeutronDbObject): + result[az_def.AZ_HINTS] = ( + az_validator.convert_az_string_to_list( + result[az_def.AZ_HINTS])) ++ if 'configurations' in result: ++ result['configurations'] = cls.load_json_from_str( ++ result['configurations'], default={}) + return result + + @classmethod +@@ -104,6 +110,11 @@ class RouterExtraAttributes(base.NeutronDbObject): + result[az_def.AZ_HINTS] = ( + az_validator.convert_az_list_to_string( + result[az_def.AZ_HINTS])) ++ if ('configurations' in result and ++ not isinstance(result['configurations'], ++ obj_utils.StringMatchingFilterObj)): ++ result['configurations'] = ( ++ cls.filter_to_json_str(result['configurations'])) + return result + + @classmethod +diff --git a/scheduler/l3_agent_scheduler.py b/scheduler/l3_agent_scheduler.py +index 5810cf85b8..7a428aef03 100644 +--- a/scheduler/l3_agent_scheduler.py ++++ b/scheduler/l3_agent_scheduler.py +@@ -14,13 +14,15 @@ + # under the License. + + import abc +-import collections ++import random + import functools + import itertools +-import random ++import collections ++from typing import List, Optional + + from neutron_lib.api.definitions import availability_zone as az_def + from neutron_lib import constants as lib_const ++from neutron_lib.context import Context + from neutron_lib.db import api as lib_db_api + from neutron_lib.exceptions import l3 as l3_exc + from oslo_config import cfg +@@ -31,8 +33,9 @@ import six + from neutron.common import utils + from neutron.conf.db import l3_hamode_db + from neutron.db.models import l3agent as rb_model ++from neutron.objects.agent import Agent + from neutron.objects import l3agent as rb_obj +- ++from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin + + LOG = logging.getLogger(__name__) + cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS) +@@ -228,26 +231,25 @@ class L3Scheduler(object): + if not candidates: + return + elif sync_router.get('ha', False): +- chosen_agents = self._bind_ha_router(plugin, context, +- router_id, +- sync_router.get('tenant_id'), ++ chosen_agents = self._bind_ha_router(plugin, context, sync_router, + candidates) + if not chosen_agents: + return + chosen_agent = chosen_agents[-1] + else: + chosen_agent = self._choose_router_agent( +- plugin, context, candidates) ++ context, plugin, candidates, sync_router) + self.bind_router(plugin, context, router_id, chosen_agent.id) + return chosen_agent + + @abc.abstractmethod +- def _choose_router_agent(self, plugin, context, candidates): ++ def _choose_router_agent(self, context, plugin, candidates, sync_router): + """Choose an agent from candidates based on a specific policy.""" + pass + + @abc.abstractmethod +- def _choose_router_agents_for_ha(self, plugin, context, candidates): ++ def _choose_router_agents_for_ha(self, context, plugin, candidates, ++ sync_router): + """Choose agents from candidates based on a specific policy.""" + pass + +@@ -315,19 +317,19 @@ class L3Scheduler(object): + hosting_list = [tuple(host) for host in hosting] + return list(set(candidates) - set(hosting_list)) + +- def _bind_ha_router(self, plugin, context, router_id, +- tenant_id, candidates): ++ def _bind_ha_router(self, plugin, context, sync_router, candidates): + """Bind a HA router to agents based on a specific policy.""" +- ++ router_id = sync_router.get('id') ++ tenant_id = sync_router.get('tenant_id') + candidates = self._filter_scheduled_agents(plugin, context, router_id, + candidates) + + chosen_agents = self._choose_router_agents_for_ha( +- plugin, context, candidates) ++ context, plugin, candidates, sync_router) + + for agent in chosen_agents: +- self.create_ha_port_and_bind(plugin, context, router_id, +- tenant_id, agent) ++ self.create_ha_port_and_bind(plugin, context, router_id, tenant_id, ++ agent) + + return chosen_agents + +@@ -335,10 +337,11 @@ class L3Scheduler(object): + class ChanceScheduler(L3Scheduler): + """Randomly allocate an L3 agent for a router.""" + +- def _choose_router_agent(self, plugin, context, candidates): ++ def _choose_router_agent(self, context, plugin, candidates, sync_router): + return random.choice(candidates) + +- def _choose_router_agents_for_ha(self, plugin, context, candidates): ++ def _choose_router_agents_for_ha(self, context, plugin, candidates, ++ sync_router): + num_agents = self._get_num_of_agents_for_ha(len(candidates)) + return random.sample(candidates, num_agents) + +@@ -346,13 +349,14 @@ class ChanceScheduler(L3Scheduler): + class LeastRoutersScheduler(L3Scheduler): + """Allocate to an L3 agent with the least number of routers bound.""" + +- def _choose_router_agent(self, plugin, context, candidates): ++ def _choose_router_agent(self, context, plugin, candidates, sync_router): + candidate_ids = [candidate['id'] for candidate in candidates] + chosen_agent = plugin.get_l3_agent_with_min_routers( + context, candidate_ids) + return chosen_agent + +- def _choose_router_agents_for_ha(self, plugin, context, candidates): ++ def _choose_router_agents_for_ha(self, context, plugin, candidates, ++ sync_router): + num_agents = self._get_num_of_agents_for_ha(len(candidates)) + ordered_agents = plugin.get_l3_agents_ordered_by_num_routers( + context, [candidate['id'] for candidate in candidates]) +@@ -397,7 +401,8 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler): + + return candidates + +- def _choose_router_agents_for_ha(self, plugin, context, candidates): ++ def _choose_router_agents_for_ha(self, context, plugin, candidates, ++ sync_router): + ordered_agents = plugin.get_l3_agents_ordered_by_num_routers( + context, [candidate['id'] for candidate in candidates]) + num_agents = self._get_num_of_agents_for_ha(len(ordered_agents)) +@@ -416,3 +421,65 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler): + if len(selected_agents) >= num_agents: + break + return selected_agents ++ ++ ++class PreferredL3AgentRoutersScheduler(LeastRoutersScheduler): ++ ++ @staticmethod ++ def get_preferred_agent(sync_router: dict) -> Optional[str]: ++ configurations = sync_router.get('configurations', {}) ++ if configurations: ++ return configurations.get('preferred_agent', None) ++ return None ++ ++ @staticmethod ++ def get_agents(sync_router: dict) -> Optional[List[str]]: ++ configurations = sync_router.get('configurations', {}) ++ if configurations: ++ master = configurations.get('master_agent', None) ++ slaves = configurations.get('slave_agents', []) ++ if master and slaves: ++ return slaves + [master] ++ return [] ++ ++ def _choose_router_agent(self, context: Context, ++ plugin: L3RouterPlugin, ++ candidates: List[Agent], ++ sync_router: dict) -> Agent: ++ agent = self.get_preferred_agent(sync_router) ++ if agent: ++ new_candidates = [candidate for candidate in candidates ++ if candidate['host'] == agent] ++ if not new_candidates: ++ LOG.warning(f"Router {sync_router['id']} failed to " ++ f"schedule l3 agent on {agent}.") ++ else: ++ agent = new_candidates[0] ++ LOG.debug(f"Router {sync_router['id']} l3 agent is {agent}.") ++ return agent ++ agent = super()._choose_router_agent(context, plugin, candidates, ++ sync_router) ++ return agent ++ ++ def _choose_router_agents_for_ha(self, context: Context, ++ plugin: L3RouterPlugin, ++ candidates: List[Agent], ++ sync_router: dict) -> List[Agent]: ++ ++ agents = self.get_agents(sync_router) ++ if agents: ++ if self.max_ha_agents < len(agents): ++ agents = agents[len(agents) - self.max_ha_agents:] ++ new_candidates = [candidate for candidate in candidates if ++ candidate['host'] in agents] ++ if len(new_candidates) != len(agents): ++ LOG.warning(f"Router {sync_router['id']} failed to " ++ f"schedule l3 agents on {agents}.") ++ else: ++ LOG.debug(f"Router {sync_router['id']} l3 agents are " ++ f"{new_candidates}.") ++ return new_candidates ++ ++ return super( ++ PreferredL3AgentRoutersScheduler, self ++ )._choose_router_agents_for_ha(context, plugin, candidates, sync_router) +diff --git a/services/l3_router/l3_router_plugin.py b/services/l3_router/l3_router_plugin.py +index 2e8a762764..9825138261 100644 +--- a/services/l3_router/l3_router_plugin.py ++++ b/services/l3_router/l3_router_plugin.py +@@ -38,6 +38,7 @@ from oslo_utils import importutils + + from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api + from neutron.api.rpc.handlers import l3_rpc ++from neutron.conf.common import NETWORK_HOST_OPTS + from neutron.db import dns_db + from neutron.db import extraroute_db + from neutron.db import l3_dvr_ha_scheduler_db +@@ -135,6 +136,17 @@ class L3RouterPlugin(service_base.ServicePluginBase, + + self.add_worker(rpc_worker) + self.l3_driver_controller = driver_controller.DriverController(self) ++ self.compute_to_network = dict() ++ self._init_compute_to_network() ++ ++ def _init_compute_to_network(self): ++ for network_node in cfg.CONF.network_nodes: ++ cfg.CONF.register_opts(NETWORK_HOST_OPTS, group=network_node) ++ network_group = cfg.CONF.get(network_node, None) ++ if network_group: ++ compute_nodes = network_group.get('compute_nodes', []) ++ for compute_node in compute_nodes: ++ self.compute_to_network[compute_node] = network_node + + @property + def supported_extension_aliases(self): +diff --git a/services/rg_portforwarding/__init__.py b/services/rg_portforwarding/__init__.py +new file mode 100644 +index 0000000000..e69de29bb2 +diff --git a/services/rg_portforwarding/common/__init__.py b/services/rg_portforwarding/common/__init__.py +new file mode 100644 +index 0000000000..e69de29bb2 +diff --git a/services/rg_portforwarding/common/exceptions.py b/services/rg_portforwarding/common/exceptions.py +new file mode 100644 +index 0000000000..73cea68e32 +--- /dev/null ++++ b/services/rg_portforwarding/common/exceptions.py +@@ -0,0 +1,77 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++from neutron._i18n import _ ++from neutron_lib import exceptions ++ ++ ++class PortForwardingNotSupportFilterField(exceptions.BadRequest): ++ message = _("Port Forwarding filter %(filter)s is not supported.") ++ ++ ++class RouterDoesNotHaveGateway(exceptions.BadRequest): ++ message = _("Router %(router_id)s does not have any gateways.") ++ ++ ++class RouterGatewayPortNotFound(exceptions.NotFound): ++ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s " ++ "could not be found.") ++ ++ ++class RouterGatewayPortDoesNotHaveAnyIPAddresses(exceptions.NotFound): ++ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s " ++ "does not have any IP addresses.") ++ ++ ++class RouterGatewayPortForwardingNotFound(exceptions.NotFound): ++ message = _("Router Gateway Port Forwarding %(id)s could not be found.") ++ ++ ++class PortHasBindingFloatingIP(exceptions.InUse): ++ message = _("Cannot create port forwarding to floating IP " ++ "%(floating_ip_address)s (%(fip_id)s) with port %(port_id)s " ++ "using fixed IP %(fixed_ip)s, as that port already " ++ "has a binding floating IP.") ++ ++ ++class InconsistentPortAndIP(exceptions.BadRequest): ++ message = _("Port %(port_id)s does not have ip address %(ip_address)s.") ++ ++ ++class RouterGatewayPortForwardingAlreadyExists(exceptions.BadRequest): ++ message = _("A duplicate router gateway port forwarding entry " ++ "with same attributes already exists, " ++ "conflicting values are %(conflict)s.") ++ ++ ++class PortNetworkNotBindOnRouter(exceptions.BadRequest): ++ message = _("Port %(port_id)s 's network %(network_id)s " ++ "not bind on router %(router_id)s.") ++ ++ ++class RouterGatewayPortForwardingUpdateFailed(exceptions.BadRequest): ++ message = _("Another router port forwarding entry with the same " ++ "attributes already exists, conflicting " ++ "values are %(conflict)s.") ++ ++ ++class DeletedRouterWithRGForwarding(exceptions.InUse): ++ message = _("Cant not delete router, " ++ "router %(router_id)s has port forwardings to remove.") ++ ++ ++class DeletedRouterGatewayWithRGForwarding(exceptions.InUse): ++ message = _("Cant not delete or update router gateway, " ++ "router %(router_id)s has port forwardings to remove.") +diff --git a/services/rg_portforwarding/pf_plugin.py b/services/rg_portforwarding/pf_plugin.py +new file mode 100644 +index 0000000000..ed8e68e53c +--- /dev/null ++++ b/services/rg_portforwarding/pf_plugin.py +@@ -0,0 +1,369 @@ ++# Copyright (c) 2023 UnionTech ++# All rights reserved ++# ++# Licensed under the Apache License, Version 2.0 (the "License"); you may ++# not use this file except in compliance with the License. You may obtain ++# a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ++# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ++# License for the specific language governing permissions and limitations ++# under the License. ++ ++ ++from oslo_log import log as logging ++from typing import List, Dict, Optional ++ ++from neutron_lib import constants ++from neutron_lib.context import Context ++from neutron_lib.plugins import directory ++from neutron_lib.db import resource_extend ++from neutron_lib.plugins.constants import L3 ++from neutron_lib.db.api import CONTEXT_WRITER ++from neutron_lib.exceptions import PortNotFound ++from neutron_lib.exceptions.l3 import RouterNotFound ++from neutron_lib.callbacks import registry, resources ++from neutron_lib.callbacks import events as lib_events ++from neutron_lib.callbacks.events import DBEventPayload ++from neutron_lib.api.definitions import rg_port_forwarding as apidef ++from neutron_lib.objects.exceptions import NeutronDbObjectDuplicateEntry ++ ++from neutron.db import db_base_plugin_common ++from neutron.db.l3_dvr_db import is_distributed_router ++from neutron.db.l3_hamode_db import is_ha_router ++ ++from neutron.objects.base import Pager ++from neutron.objects.ports import Port ++from neutron.objects.router import Router, FloatingIP ++from neutron.objects.rg_port_forwarding import RGPortForwarding ++from neutron.objects.rg_port_forwarding import FIELDS_NOT_SUPPORT_FILTER ++ ++from neutron.extensions.rg_port_forwarding import RGPortForwardingPluginBase ++from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin ++from neutron.services.rg_portforwarding.common import exceptions ++ ++from neutron.api.rpc.callbacks import events ++from neutron.api.rpc.handlers import resources_rpc ++ ++LOG = logging.getLogger(__name__) ++ ++ ++@resource_extend.has_resource_extenders ++@registry.has_registry_receivers ++class RGPortForwardingPlugin(RGPortForwardingPluginBase): ++ required_service_plugins = ['router'] ++ ++ supported_extension_aliases = [apidef.ALIAS] ++ ++ __native_pagination_support = True ++ __native_sorting_support = True ++ __filter_validation_support = True ++ ++ def __init__(self): ++ super(RGPortForwardingPlugin, self).__init__() ++ self.push_api = resources_rpc.ResourcesPushRpcApi() ++ self.l3_plugin = directory.get_plugin(L3) ++ self.core_plugin = directory.get_plugin() ++ ++ @staticmethod ++ def _get_router(context: Context, router_id: str) -> Optional[Router]: ++ router = Router.get_object(context, id=router_id) ++ if not router: ++ raise RouterNotFound(router_id=router_id) ++ return router ++ ++ @staticmethod ++ def _get_router_gateway(context: Context, router: Router) -> str: ++ gw_port_id = router.get('gw_port_id', None) ++ if not gw_port_id: ++ raise exceptions.RouterDoesNotHaveGateway(router_id=router.id) ++ gw_port = Port.get_object( ++ context, id=gw_port_id) ++ if not gw_port: ++ raise exceptions.RouterGatewayPortNotFound(router_id=router.id, ++ gw_port_id=gw_port_id) ++ gw_port_ips = gw_port.get("fixed_ips", []) ++ if len(gw_port_ips) <= 0: ++ raise exceptions.RouterGatewayPortDoesNotHaveAnyIPAddresses( ++ router_id=router.id, gw_port_id=gw_port_id) ++ gw_ip_address = gw_port_ips[0].get('ip_address') ++ return gw_ip_address ++ ++ @staticmethod ++ def _get_port(context: Context, port_id: str) -> Optional[Port]: ++ port = Port.get_object(context, id=port_id) ++ if not port: ++ raise PortNotFound(port_id=port_id) ++ return port ++ ++ @staticmethod ++ def _get_ports(context: Context, router_id: str, port: Port, ++ device_owner: str) -> Optional[List[Port]]: ++ ports = Port.get_ports_by_router_and_network( ++ context, router_id, device_owner, port.network_id) ++ if not ports: ++ raise exceptions.PortNetworkNotBindOnRouter( ++ port_id=port.id, ++ network_id=port.network_id, ++ router_id=router_id) ++ return ports ++ ++ @staticmethod ++ def _validate_filter_for_port_forwarding(filters: Dict[str, str]) -> None: ++ if not filters: ++ return ++ for filter_member_key in filters.keys(): ++ if filter_member_key in FIELDS_NOT_SUPPORT_FILTER: ++ raise exceptions.PortForwardingNotSupportFilterField( ++ filter=filter_member_key) ++ ++ @staticmethod ++ def _check_port_has_binding_floating_ip(context: Context, port_id: str, ++ ip_address: str) -> None: ++ floatingip_objs = FloatingIP.get_objects( ++ context.elevated(), ++ fixed_port_id=port_id) ++ if floatingip_objs: ++ floating_ip_address = floatingip_objs[0].floating_ip_address ++ raise exceptions.PortHasBindingFloatingIP( ++ floating_ip_address=floating_ip_address, ++ fip_id=floatingip_objs[0].id, ++ port_id=port_id, ++ fixed_ip=ip_address) ++ ++ @staticmethod ++ def _get_device_owner(router: Router) -> str: ++ if is_distributed_router(router): ++ return constants.DEVICE_OWNER_DVR_INTERFACE ++ elif is_ha_router(router): ++ return constants.DEVICE_OWNER_HA_REPLICATED_INT ++ return constants.DEVICE_OWNER_ROUTER_INTF ++ ++ def _check_router_port(self, context: Context, router: Router, ++ port: Port): ++ device_owner = self._get_device_owner(router) ++ self._get_ports(context, router.id, port, device_owner) ++ ++ def _check_port(self, context: Context, port_id: str, ip: str) -> Port: ++ port = self._get_port(context, port_id) ++ self._check_port_has_binding_floating_ip(context, port_id, ip) ++ fixed_ips = port.get('fixed_ips', []) ++ result = list(map(lambda x: str(x.get('ip_address')) == ip, fixed_ips)) ++ if not any(result): ++ raise exceptions.InconsistentPortAndIP(port_id=port, ip_address=ip) ++ return port ++ ++ def _check_router(self, context: Context, router_id: str) -> (Router, str): ++ router = self._get_router(context, router_id) ++ gw_ip_address = self._get_router_gateway(context, router) ++ return router, gw_ip_address ++ ++ def _check_port_forwarding_create(self, context: Context, router_id: str, ++ pf_dict: Dict) -> None: ++ router, gw_ip_address = self._check_router(context, router_id) ++ pf_dict['router_id'] = router_id ++ pf_dict[apidef.GW_IP_ADDRESS] = gw_ip_address ++ internal_port_id = pf_dict[apidef.INTERNAL_PORT_ID] ++ internal_ip_address = pf_dict[apidef.INTERNAL_IP_ADDRESS] ++ internal_port = self._check_port(context, internal_port_id, ++ internal_ip_address) ++ self._check_router_port(context, router, internal_port) ++ ++ @staticmethod ++ def _check_port_forwarding(context: Context, pf_obj: RGPortForwarding): ++ pf_objs = RGPortForwarding.get_objects( ++ context, ++ router_id=pf_obj.router_id, ++ protocol=pf_obj.protocol) ++ ++ for obj in pf_objs: ++ if obj.id == pf_obj.get('id', None): ++ continue ++ # Ensure there are no conflicts on the outside ++ if obj.external_port == pf_obj.external_port: ++ raise exceptions.RouterGatewayPortForwardingAlreadyExists( ++ conflict={ ++ 'router_id': pf_obj.router_id, ++ 'protocol': pf_obj.protocol, ++ 'external_port': obj.external_port, ++ } ++ ) ++ # Ensure there are no conflicts in the inside ++ # socket: internal_ip_address + internal_port ++ if (obj.internal_port_id == pf_obj.internal_port_id and ++ obj.internal_ip_address == pf_obj.internal_ip_address and ++ obj.internal_port == pf_obj.internal_port): ++ raise exceptions.RouterGatewayPortForwardingAlreadyExists( ++ conflict={ ++ 'router_id': pf_obj.router_id, ++ 'protocol': pf_obj.protocol, ++ 'internal_port_id': obj.internal_port_id, ++ 'internal_ip_address': str(obj.internal_ip_address), ++ 'internal_port': obj.internal_port ++ } ++ ) ++ ++ @staticmethod ++ def _find_existing_rg_port_forwarding(context: Context, ++ router_id: str, ++ port_forwarding: Dict, ++ specify_params: List = None): ++ # Because the session had been flushed by NeutronDbObjectDuplicateEntry ++ # so if we want to use the context to get another db queries, we need ++ # to rollback first. ++ context.session.rollback() ++ if not specify_params: ++ specify_params = [ ++ { ++ 'router_id': router_id, ++ 'external_port': port_forwarding['external_port'], ++ 'protocol': port_forwarding['protocol'] ++ }, ++ { ++ 'internal_port_id': port_forwarding['internal_port_id'], ++ 'internal_ip_address': port_forwarding[ ++ 'internal_ip_address'], ++ 'internal_port': port_forwarding['internal_port'], ++ 'protocol': port_forwarding['protocol'] ++ }] ++ for param in specify_params: ++ objs = RGPortForwarding.get_objects(context, **param) ++ if objs: ++ return objs[0], param ++ ++ @db_base_plugin_common.make_result_with_fields ++ @db_base_plugin_common.convert_result_to_dict ++ def get_router_gateway_port_forwardings(self, context: Context, ++ router_id: str, ++ filters: List[str] = None, ++ fields: List[str] = None, ++ sorts: List[str] = None, ++ limit: int = None, ++ marker: str = None, ++ page_reverse: bool = False): ++ ++ router, gw_ip_address = self._check_router(context, router_id) ++ filters = filters or {} ++ self._validate_filter_for_port_forwarding(filters) ++ pager = Pager(sorts, limit, page_reverse, marker) ++ port_forwardings = RGPortForwarding.get_objects( ++ context, _pager=pager, router_id=router_id, **filters) ++ for pf in port_forwardings: ++ setattr(pf, 'gw_ip_address', gw_ip_address) ++ return port_forwardings ++ ++ @db_base_plugin_common.convert_result_to_dict ++ def create_router_gateway_port_forwarding(self, context: Context, ++ router_id: str, ++ gateway_port_forwarding: dict): ++ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME) ++ self._check_port_forwarding_create(context, router_id, port_forwarding) ++ with CONTEXT_WRITER.using(context): ++ pf_obj = RGPortForwarding(context, **port_forwarding) ++ self._check_port_forwarding(context, pf_obj) ++ try: ++ pf_obj.create() ++ except NeutronDbObjectDuplicateEntry: ++ _, conflict = self._find_existing_rg_port_forwarding( ++ context, router_id, port_forwarding) ++ raise exceptions.RouterGatewayPortForwardingAlreadyExists( ++ conflict=conflict ++ ) ++ self.push_api.push(context, [pf_obj], events.CREATED) ++ return pf_obj ++ ++ @db_base_plugin_common.convert_result_to_dict ++ def update_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str, ++ gateway_port_forwarding: dict): ++ ++ router = self._get_router(context, router_id) ++ gw_ip_address = self._get_router_gateway(context, router) ++ pf_obj = RGPortForwarding.get_object(context, id=id) ++ if not pf_obj: ++ raise exceptions.RouterGatewayPortForwardingNotFound(id=id) ++ ++ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME, {}) ++ port_forwarding[apidef.GW_IP_ADDRESS] = gw_ip_address ++ new_port_id = port_forwarding.get(apidef.INTERNAL_PORT_ID) ++ new_internal_ip = port_forwarding.get(apidef.INTERNAL_IP_ADDRESS, None) ++ ++ if new_port_id and new_port_id != pf_obj.internal_port_id: ++ self._check_port_has_binding_floating_ip(context, ++ new_port_id, ++ new_internal_ip) ++ ++ if any([new_internal_ip, new_port_id]): ++ port_forwarding.update({ ++ apidef.INTERNAL_IP_ADDRESS: new_internal_ip ++ if new_internal_ip else ++ str(pf_obj.internal_ip_address), ++ apidef.INTERNAL_PORT_ID: new_port_id ++ if new_port_id else pf_obj.internal_port ++ }) ++ ++ with CONTEXT_WRITER.using(context): ++ pf_obj.update_fields(port_forwarding, reset_changes=True) ++ self._check_port_forwarding(context, pf_obj) ++ try: ++ pf_obj.update() ++ except NeutronDbObjectDuplicateEntry: ++ _, conflict = self._find_existing_rg_port_forwarding( ++ context, router_id, port_forwarding) ++ raise exceptions.RouterGatewayPortForwardingAlreadyExists( ++ conflict=conflict ++ ) ++ self.push_api.push(context, [pf_obj], events.UPDATED) ++ return pf_obj ++ ++ @db_base_plugin_common.make_result_with_fields ++ @db_base_plugin_common.convert_result_to_dict ++ def get_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str, ++ fields: List[str] = None): ++ _, gw_ip_address = self._check_router(context, router_id) ++ pf_obj = RGPortForwarding.get_object(context, id=id) ++ if not pf_obj: ++ raise exceptions.RouterGatewayPortForwardingNotFound(id=id) ++ setattr(pf_obj, apidef.GW_IP_ADDRESS, gw_ip_address) ++ return pf_obj ++ ++ def delete_router_gateway_port_forwarding(self, context: Context, id: str, ++ router_id: str): ++ pf_obj = RGPortForwarding.get_object(context, id=id) ++ if not pf_obj: ++ raise exceptions.RouterGatewayPortForwardingNotFound(id=id) ++ with CONTEXT_WRITER.using(context): ++ pf_obj.delete() ++ self.push_api.push(context, [pf_obj], events.DELETED) ++ ++ @registry.receives(resources.ROUTER, [lib_events.BEFORE_DELETE]) ++ def _receive_router_before_delete(self, resource: str, event: str, ++ trigger: L3RouterPlugin, ++ payload: DBEventPayload): ++ router_id = payload.resource_id ++ context = payload.context ++ port_forwardings = RGPortForwarding.get_objects(context, ++ router_id=router_id) ++ if port_forwardings: ++ ex = exceptions.DeletedRouterWithRGForwarding(router_id=router_id) ++ LOG.info(ex.msg) ++ raise ex ++ ++ @registry.receives(resources.ROUTER_GATEWAY, [lib_events.BEFORE_DELETE, ++ lib_events.BEFORE_UPDATE]) ++ def _receive_router_gateway_before_delete(self, resource: str, event: str, ++ trigger: L3RouterPlugin, ++ payload: DBEventPayload): ++ router_id = payload.resource_id ++ context = payload.context ++ port_forwardings = RGPortForwarding.get_objects(context, ++ router_id=router_id) ++ if port_forwardings: ++ ex = exceptions.DeletedRouterGatewayWithRGForwarding( ++ router_id=router_id) ++ LOG.info(ex.msg) ++ raise ex |