summaryrefslogtreecommitdiff
path: root/patch007-backport-fix-some-bugs.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patch007-backport-fix-some-bugs.patch')
-rw-r--r--patch007-backport-fix-some-bugs.patch1425
1 files changed, 1425 insertions, 0 deletions
diff --git a/patch007-backport-fix-some-bugs.patch b/patch007-backport-fix-some-bugs.patch
new file mode 100644
index 0000000..6281d09
--- /dev/null
+++ b/patch007-backport-fix-some-bugs.patch
@@ -0,0 +1,1425 @@
+From 90c5382aee07879a80309f257f04114201ccaac6 Mon Sep 17 00:00:00 2001
+From: ShuangxiDing <dingshuangxi888@gmail.com>
+Date: Fri, 21 Jul 2023 20:28:58 +0800
+Subject: [PATCH 01/10] [ISSUE #7061] Support forward HAProxyMessage for Multi
+ Protocol server. (#7062)
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
+
+* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* 回滚netty的升级
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* add grpc-netty-codec-haproxy in bazel
+
+* add grpc-netty-codec-haproxy in bazel
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Fix Test
+
+* add grpc-netty-codec-haproxy in bazel
+
+* add ProxyProtocolTest for Remoting
+
+* Support HAProxyMessage forward for multi protocol server.
+
+---------
+
+Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
+---
+ .../http2proxy/HAProxyMessageForwarder.java | 129 ++++++++++++++++++
+ .../http2proxy/Http2ProtocolProxyHandler.java | 23 +++-
+ .../http2proxy/Http2ProxyBackendHandler.java | 2 +
+ .../http2proxy/Http2ProxyFrontendHandler.java | 28 ++--
+ 4 files changed, 164 insertions(+), 18 deletions(-)
+ create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+new file mode 100644
+index 000000000..8f139d3d9
+--- /dev/null
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java
+@@ -0,0 +1,129 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
++
++import io.netty.buffer.ByteBuf;
++import io.netty.buffer.Unpooled;
++import io.netty.channel.Channel;
++import io.netty.channel.ChannelHandlerContext;
++import io.netty.channel.ChannelInboundHandlerAdapter;
++import io.netty.handler.codec.haproxy.HAProxyCommand;
++import io.netty.handler.codec.haproxy.HAProxyMessage;
++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
++import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
++import io.netty.handler.codec.haproxy.HAProxyTLV;
++import io.netty.util.Attribute;
++import io.netty.util.DefaultAttributeMap;
++import org.apache.commons.codec.binary.Hex;
++import org.apache.commons.lang3.ArrayUtils;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.commons.lang3.reflect.FieldUtils;
++import org.apache.rocketmq.acl.common.AclUtils;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
++import org.apache.rocketmq.common.constant.LoggerName;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
++
++import java.lang.reflect.Field;
++import java.nio.charset.Charset;
++import java.util.ArrayList;
++import java.util.List;
++
++public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter {
++
++ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
++
++ private static final Field FIELD_ATTRIBUTE =
++ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true);
++
++ private final Channel outboundChannel;
++
++ public HAProxyMessageForwarder(final Channel outboundChannel) {
++ this.outboundChannel = outboundChannel;
++ }
++
++ @Override
++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
++ try {
++ forwardHAProxyMessage(ctx.channel(), outboundChannel);
++ ctx.fireChannelRead(msg);
++ } catch (Exception e) {
++ log.error("Forward HAProxyMessage from Remoting to gRPC server error.", e);
++ throw e;
++ } finally {
++ ctx.pipeline().remove(this);
++ }
++ }
++
++ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChannel) throws Exception {
++ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
++ return;
++ }
++
++ if (!(inboundChannel instanceof DefaultAttributeMap)) {
++ return;
++ }
++
++ Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
++ if (ArrayUtils.isEmpty(attributes)) {
++ return;
++ }
++
++ String sourceAddress = null, destinationAddress = null;
++ int sourcePort = 0, destinationPort = 0;
++ List<HAProxyTLV> haProxyTLVs = new ArrayList<>();
++
++ for (Attribute<?> attribute : attributes) {
++ String attributeKey = attribute.key().name();
++ if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
++ continue;
++ }
++ String attributeValue = (String) attribute.get();
++ if (StringUtils.isEmpty(attributeValue)) {
++ continue;
++ }
++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) {
++ sourceAddress = attributeValue;
++ }
++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) {
++ sourcePort = Integer.parseInt(attributeValue);
++ }
++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) {
++ destinationAddress = attributeValue;
++ }
++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
++ destinationPort = Integer.parseInt(attributeValue);
++ }
++ if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
++ String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
++ ByteBuf byteBuf = Unpooled.buffer();
++ byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset()));
++ HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf);
++ haProxyTLVs.add(haProxyTLV);
++ }
++ }
++
++ HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
++ HAProxyProxiedProtocol.TCP4;
++
++ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
++ proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs);
++ outboundChannel.writeAndFlush(message).sync();
++ }
++}
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+index 913f35c93..c37db92af 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture;
+ import io.netty.channel.ChannelHandlerContext;
+ import io.netty.channel.ChannelInitializer;
+ import io.netty.channel.ChannelOption;
++import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+ import io.netty.handler.ssl.ApplicationProtocolConfig;
+ import io.netty.handler.ssl.ApplicationProtocolNames;
+ import io.netty.handler.ssl.SslContext;
+ import io.netty.handler.ssl.SslContextBuilder;
++import io.netty.handler.ssl.SslHandler;
+ import io.netty.handler.ssl.SslProvider;
+ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+-import javax.net.ssl.SSLException;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
+ import org.apache.rocketmq.proxy.config.ProxyConfig;
+ import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
+ import org.apache.rocketmq.remoting.common.TlsMode;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
+ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+
++import javax.net.ssl.SSLException;
++
+ public class Http2ProtocolProxyHandler implements ProtocolHandler {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+ private static final String LOCAL_HOST = "127.0.0.1";
+@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
+ .handler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+- if (sslContext != null) {
+- ch.pipeline()
+- .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort()));
+- }
+- ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel));
++ ch.pipeline().addLast(null, Http2ProxyBackendHandler.HANDLER_NAME,
++ new Http2ProxyBackendHandler(inboundChannel));
+ }
+ })
+ .option(ChannelOption.AUTO_READ, false)
+@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler {
+ }
+
+ final Channel outboundChannel = f.channel();
++ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
++ ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel));
++ outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE);
++ }
+
+- ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel));
++ SslHandler sslHandler = null;
++ if (sslContext != null) {
++ sslHandler = sslContext.newHandler(outboundChannel.alloc(), LOCAL_HOST, config.getGrpcServerPort());
++ }
++ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler));
+ }
+ }
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+index 0195b0c1c..fd5408fae 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
++ public static final String HANDLER_NAME = "Http2ProxyBackendHandler";
++
+ private final Channel inboundChannel;
+
+ public Http2ProxyBackendHandler(Channel inboundChannel) {
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+index 87147a322..9b37e85e5 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+@@ -19,36 +19,42 @@ package org.apache.rocketmq.proxy.remoting.protocol.http2proxy;
+
+ import io.netty.buffer.Unpooled;
+ import io.netty.channel.Channel;
+-import io.netty.channel.ChannelFuture;
+ import io.netty.channel.ChannelFutureListener;
+ import io.netty.channel.ChannelHandlerContext;
+ import io.netty.channel.ChannelInboundHandlerAdapter;
++import io.netty.handler.ssl.SslHandler;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+ public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
++
++ public static final String HANDLER_NAME = "SslHandler";
++
+ // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
+ // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
+ private final Channel outboundChannel;
++ private final SslHandler sslHandler;
+
+- public Http2ProxyFrontendHandler(final Channel outboundChannel) {
++ public Http2ProxyFrontendHandler(final Channel outboundChannel, final SslHandler sslHandler) {
+ this.outboundChannel = outboundChannel;
++ this.sslHandler = sslHandler;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ if (outboundChannel.isActive()) {
+- outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+- @Override
+- public void operationComplete(ChannelFuture future) {
+- if (future.isSuccess()) {
+- // was able to flush out data, start to read the next chunk
+- ctx.channel().read();
+- } else {
+- future.channel().close();
+- }
++ if (sslHandler != null && outboundChannel.pipeline().get(HANDLER_NAME) == null) {
++ outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, HANDLER_NAME, sslHandler);
++ }
++
++ outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
++ if (future.isSuccess()) {
++ // was able to flush out data, start to read the next chunk
++ ctx.channel().read();
++ } else {
++ future.channel().close();
+ }
+ });
+ }
+--
+2.32.0.windows.2
+
+
+From 8027cfc7cbb6c120d2fc045e0caa8debe1028a31 Mon Sep 17 00:00:00 2001
+From: maclong1989 <814742806@qq.com>
+Date: Sun, 23 Jul 2023 09:15:05 +0800
+Subject: [PATCH 02/10] [ISSUE #7063] doc: fix typo in user_guide.md
+
+Signed-off-by: jiangyl3 <jiangyl3@asiainfo.com>
+Co-authored-by: jiangyl3 <jiangyl3@asiainfo.com>
+---
+ docs/cn/msg_trace/user_guide.md | 2 +-
+ 1 file changed, 1 insertion(+), 1 deletion(-)
+
+diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
+index d8314052b..9cf139fd3 100644
+--- a/docs/cn/msg_trace/user_guide.md
++++ b/docs/cn/msg_trace/user_guide.md
+@@ -35,7 +35,7 @@ namesrvAddr=XX.XX.XX.XX:9876
+ RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。
+
+ ### 2.3 物理IO隔离模式
+-对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
++对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
+
+ ### 2.4 启动开启消息轨迹的Broker
+ `nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
+--
+2.32.0.windows.2
+
+
+From 3102758487f3e21e977424d7f1b7187eb6c069cb Mon Sep 17 00:00:00 2001
+From: =?UTF-8?q?=E5=90=B4=E6=98=9F=E7=81=BF?=
+ <37405937+wuyoudexiao@users.noreply.github.com>
+Date: Tue, 25 Jul 2023 13:47:53 +0800
+Subject: [PATCH 03/10] fix: npe in lockBatchMQ and unlockBatchMQ (#7078)
+
+Co-authored-by: wxc <wuxingcan666@foxmail.com>
+---
+ .../rocketmq/proxy/processor/ConsumerProcessor.java | 11 +++++++----
+ 1 file changed, 7 insertions(+), 4 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+index cc973813b..656a6339d 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor;
+
+ import java.util.ArrayList;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+@@ -425,13 +426,15 @@ public class ConsumerProcessor extends AbstractProcessor {
+ }
+
+ protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) {
+- return mqSet.stream().map(mq -> {
++ Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size());
++ for (MessageQueue mq:mqSet) {
+ try {
+- return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
++ addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ;
+ } catch (Exception e) {
+- return null;
++ log.error("build addressable message queue fail, messageQueue = {}", mq, e);
+ }
+- }).collect(Collectors.toSet());
++ }
++ return addressableMessageQueueSet;
+ }
+
+ protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName(
+--
+2.32.0.windows.2
+
+
+From 047ef7498f2203a2234052603a99a114d8a65e17 Mon Sep 17 00:00:00 2001
+From: rongtong <jinrongtong5@163.com>
+Date: Tue, 25 Jul 2023 14:00:22 +0800
+Subject: [PATCH 04/10] Ensuring consistency between broker and nameserver data
+ when deleting a topic (#7066)
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+Co-authored-by: 尘央 <xinyuzhou.zxy@alibaba-inc.com>
+---
+ .../rocketmq/broker/BrokerController.java | 11 ++
+ .../rocketmq/broker/out/BrokerOuterAPI.java | 62 ++++++++++
+ .../processor/AdminBrokerProcessor.java | 26 ++--
+ .../broker/topic/TopicConfigManager.java | 6 +-
+ .../apache/rocketmq/common/BrokerConfig.java | 14 +++
+ .../common/namesrv/NamesrvConfig.java | 17 +++
+ .../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++--
+ .../routeinfo/RouteInfoManagerNewTest.java | 99 +++++++++++++++
+ .../rocketmq/test/util/MQAdminTestUtils.java | 37 ++++++
+ .../dledger/DLedgerProduceAndConsumeIT.java | 2 +-
+ .../test/route/CreateAndUpdateTopicIT.java | 114 ++++++++++++++++++
+ 11 files changed, 429 insertions(+), 23 deletions(-)
+ rename test/src/test/java/org/apache/rocketmq/test/{base => }/dledger/DLedgerProduceAndConsumeIT.java (99%)
+ create mode 100644 test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+index 196401e26..972457194 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+@@ -1678,6 +1678,17 @@ public class BrokerController {
+ }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
+ }
+
++ public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) {
++ TopicConfig tmpTopic = topicConfig;
++ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
++ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
++ // Copy the topic config and modify the perm
++ tmpTopic = new TopicConfig(topicConfig);
++ tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission());
++ }
++ this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000);
++ }
++
+ public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
+ this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
+ }
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+index b6273e9ed..1793a83c0 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback;
+ import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.Pair;
+ import org.apache.rocketmq.common.ThreadFactoryImpl;
++import org.apache.rocketmq.common.TopicConfig;
+ import org.apache.rocketmq.common.UnlockCallback;
+ import org.apache.rocketmq.common.UtilAll;
+ import org.apache.rocketmq.common.constant.LoggerName;
+@@ -120,12 +121,14 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ
+ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
+ import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
+ import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
++import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
+ import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+ import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+ import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+ import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
+ import org.apache.rocketmq.remoting.protocol.route.BrokerData;
++import org.apache.rocketmq.remoting.protocol.route.QueueData;
+ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+ import org.apache.rocketmq.remoting.rpc.ClientMetadata;
+ import org.apache.rocketmq.remoting.rpc.RpcClient;
+@@ -614,6 +617,65 @@ public class BrokerOuterAPI {
+ throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
+ }
+
++ /**
++ * Register the topic route info of single topic to all name server nodes.
++ * This method is used to replace incremental broker registration feature.
++ */
++ public void registerSingleTopicAll(
++ final String brokerName,
++ final TopicConfig topicConfig,
++ final int timeoutMills) {
++ String topic = topicConfig.getTopicName();
++ RegisterTopicRequestHeader requestHeader = new RegisterTopicRequestHeader();
++ requestHeader.setTopic(topic);
++
++ TopicRouteData topicRouteData = new TopicRouteData();
++ List<QueueData> queueDatas = new ArrayList<>();
++ topicRouteData.setQueueDatas(queueDatas);
++
++ final QueueData queueData = new QueueData();
++ queueData.setBrokerName(brokerName);
++ queueData.setPerm(topicConfig.getPerm());
++ queueData.setReadQueueNums(topicConfig.getReadQueueNums());
++ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
++ queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
++ queueDatas.add(queueData);
++ final byte[] topicRouteBody = topicRouteData.encode();
++
++
++ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
++ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
++ for (final String namesrvAddr : nameServerAddressList) {
++ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, requestHeader);
++ request.setBody(topicRouteBody);
++
++ try {
++ brokerOuterExecutor.execute(() -> {
++ try {
++ RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
++ assert response != null;
++ LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode());
++ } catch (Exception e) {
++ LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e);
++ } finally {
++ countDownLatch.countDown();
++ }
++ });
++ } catch (Exception e) {
++ LOGGER.warn("Execute single topic registration task failed, topic {}, broker name {}", topic, brokerName);
++ countDownLatch.countDown();
++ }
++
++ }
++
++ try {
++ if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
++ LOGGER.warn("Registration single topic to one or more name servers timeout. Timeout threshold: {}ms", timeoutMills);
++ }
++ } catch (InterruptedException ignore) {
++ }
++ }
++
+ public List<Boolean> needRegister(
+ final String clusterName,
+ final String brokerAddr,
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+index 892a71330..569a1c57b 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
+
+ try {
+ this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
++ this.brokerController.registerSingleTopicAll(topicConfig);
++ } else {
++ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
++ }
+ response.setCode(ResponseCode.SUCCESS);
+ } catch (Exception e) {
+ LOGGER.error("Update / create topic failed for [{}]", request, e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ }
++
+ return response;
+ }
+
+@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
+ return response;
+ }
+
+- private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand request) {
++ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
++ RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
+ }
+ MessageStore messageStore = this.brokerController.getMessageStore();
+ if (messageStore instanceof DefaultMessageStore) {
+- DefaultMessageStore defaultMessageStore = (DefaultMessageStore)messageStore;
++ DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore;
+ if (mode == LibC.MADV_NORMAL) {
+ defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
+ } else {
+@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
+ /**
+ * Reset consumer offset.
+ *
+- * @param topic Required, not null.
+- * @param group Required, not null.
+- * @param queueId if target queue ID is negative, all message queues will be reset;
+- * otherwise, only the target queue would get reset.
+- * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being;
+- * otherwise, binary search is performed to locate target offset.
+- * @param offset Target offset to reset to if target queue ID is properly provided.
++ * @param topic Required, not null.
++ * @param group Required, not null.
++ * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
++ * would get reset.
++ * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
++ * binary search is performed to locate target offset.
++ * @param offset Target offset to reset to if target queue ID is properly provided.
+ * @return Affected queues and their new offset
+ */
+ private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+index e5fdd8675..e90530512 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager {
+ log.error("createTopicIfAbsent ", e);
+ }
+ if (createNew && register) {
+- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
++ this.brokerController.registerSingleTopicAll(topicConfig);
++ } else {
++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
++ }
+ }
+ return this.topicConfigTable.get(topicConfig.getTopicName());
+ }
+diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+index a4d82d1c5..02c692e2b 100644
+--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity {
+ */
+ private boolean popResponseReturnActualRetryTopic = false;
+
++ /**
++ * If both the deleteTopicWithBrokerRegistration flag in the NameServer configuration and this flag are set to true,
++ * it guarantees the ultimate consistency of data between the broker and the nameserver during topic deletion.
++ */
++ private boolean enableSingleTopicRegister = false;
++
+ public long getMaxPopPollingSize() {
+ return maxPopPollingSize;
+ }
+@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity {
+ public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) {
+ this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic;
+ }
++
++ public boolean isEnableSingleTopicRegister() {
++ return enableSingleTopicRegister;
++ }
++
++ public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) {
++ this.enableSingleTopicRegister = enableSingleTopicRegister;
++ }
+ }
+diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+index 700febfe2..5b8a6dedb 100644
+--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
++++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+@@ -82,6 +82,15 @@ public class NamesrvConfig {
+
+ private int waitSecondsForService = 45;
+
++ /**
++ * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.
++ *
++ * WARNING:
++ * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.
++ * 2. This flag does not support static topic currently.
++ */
++ private boolean deleteTopicWithBrokerRegistration = false;
++
+ public boolean isOrderMessageEnable() {
+ return orderMessageEnable;
+ }
+@@ -241,4 +250,12 @@ public class NamesrvConfig {
+ public void setWaitSecondsForService(int waitSecondsForService) {
+ this.waitSecondsForService = waitSecondsForService;
+ }
++
++ public boolean isDeleteTopicWithBrokerRegistration() {
++ return deleteTopicWithBrokerRegistration;
++ }
++
++ public void setDeleteTopicWithBrokerRegistration(boolean deleteTopicWithBrokerRegistration) {
++ this.deleteTopicWithBrokerRegistration = deleteTopicWithBrokerRegistration;
++ }
+ }
+diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+index ac27d76ce..0055a1cc8 100644
+--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+@@ -121,9 +121,18 @@ public class RouteInfoManager {
+ if (queueDatas == null || queueDatas.isEmpty()) {
+ return;
+ }
++
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ if (this.topicQueueTable.containsKey(topic)) {
++ Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
++ for (QueueData queueData : queueDatas) {
++ if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
++ log.warn("Register topic contains illegal broker, {}, {}", topic, queueData);
++ return;
++ }
++ queueDataMap.put(queueData.getBrokerName(), queueData);
++ }
+ log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic));
+ } else {
+ // check and construct queue data map
+@@ -299,7 +308,32 @@ public class RouteInfoManager {
+
+ ConcurrentMap<String, TopicConfig> tcTable =
+ topicConfigWrapper.getTopicConfigTable();
++
+ if (tcTable != null) {
++
++ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
++ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
++
++ // Delete the topics that don't exist in tcTable from the current broker
++ // Static topic is not supported currently
++ if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) {
++ final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName);
++ final Set<String> newTopicSet = tcTable.keySet();
++ final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet);
++ for (final String toDeleteTopic : toDeleteTopics) {
++ Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic);
++ final QueueData removedQD = queueDataMap.remove(brokerName);
++ if (removedQD != null) {
++ log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD);
++ }
++
++ if (queueDataMap.isEmpty()) {
++ log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic);
++ topicQueueTable.remove(toDeleteTopic);
++ }
++ }
++ }
++
+ for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
+ if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
+ topicConfigWrapper.getDataVersion(), brokerName,
+@@ -312,19 +346,17 @@ public class RouteInfoManager {
+ this.createAndUpdateQueueData(brokerName, topicConfig);
+ }
+ }
+- }
+
+- if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
+- TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+- Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+- //the topicQueueMappingInfoMap should never be null, but can be empty
+- for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
+- if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+- topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
++ if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
++ //the topicQueueMappingInfoMap should never be null, but can be empty
++ for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {
++ if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
++ topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());
++ }
++ //Note asset brokerName equal entry.getValue().getBname()
++ //here use the mappingDetail.bname
++ topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
+ }
+- //Note asset brokerName equal entry.getValue().getBname()
+- //here use the mappingDetail.bname
+- topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
+ }
+ }
+ }
+@@ -374,6 +406,16 @@ public class RouteInfoManager {
+ return result;
+ }
+
++ private Set<String> topicSetOfBrokerName(final String brokerName) {
++ Set<String> topicOfBroker = new HashSet<>();
++ for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) {
++ if (entry.getValue().containsKey(brokerName)) {
++ topicOfBroker.add(entry.getKey());
++ }
++ }
++ return topicOfBroker;
++ }
++
+ public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) {
+ BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName);
+ try {
+diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+index b53519e5f..6002d1f5a 100644
+--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
++++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
+ import java.time.Duration;
+ import java.util.ArrayList;
+ import java.util.Arrays;
++import java.util.Collections;
+ import java.util.List;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList;
+ import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+ import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
+ import org.apache.rocketmq.remoting.protocol.route.BrokerData;
++import org.apache.rocketmq.remoting.protocol.route.QueueData;
+ import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+ import org.junit.After;
+ import org.junit.Before;
+@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest {
+ .containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, BrokerBasicInfo.slaveBroker().brokerAddr);
+ }
+
++ @Test
++ public void keepTopicWithBrokerRegistration() {
++ RegisterBrokerResult masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++
++ masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++ }
++
++ @Test
++ public void deleteTopicWithBrokerRegistration() {
++ config.setDeleteTopicWithBrokerRegistration(true);
++ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++
++ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++ }
++
++ @Test
++ public void deleteTopicWithBrokerRegistration2() {
++ // Register two brokers and delete a specific one by one
++ config.setDeleteTopicWithBrokerRegistration(true);
++ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
++ final BrokerBasicInfo master2 = BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9);
++
++ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
++ registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1");
++
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2);
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
++
++
++ registerBrokerWithNormalTopic(master1, "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1);
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName())
++ .isEqualTo(master2.brokerName);
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
++
++ registerBrokerWithNormalTopic(master2, "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
++ }
++
++ @Test
++ public void registerSingleTopicWithBrokerRegistration() {
++ config.setDeleteTopicWithBrokerRegistration(true);
++ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
++
++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic");
++
++ // Single topic registration failed because there is no broker connection exists
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
++
++ // Register broker with TestTopic first and then register single topic TestTopic1
++ registerBrokerWithNormalTopic(master1, "TestTopic");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++
++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++
++ // Register the two topics to keep the route info
++ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++
++ // Cancel the TestTopic1 with broker registration
++ registerBrokerWithNormalTopic(master1, "TestTopic");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
++
++ // Add TestTopic1 and cancel all the topics with broker un-registration
++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
++
++ routeInfoManager.unregisterBroker(master1.clusterName, master1.brokerAddr, master1.brokerName, 0);
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
++
++
++ }
++
+ private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) {
+ ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest {
+ return registerBrokerResult;
+ }
+
++ private void registerSingleTopicWithBrokerName(String brokerName, String... topics) {
++ for (final String topic : topics) {
++ QueueData queueData = new QueueData();
++ queueData.setBrokerName(brokerName);
++ queueData.setReadQueueNums(8);
++ queueData.setWriteQueueNums(8);
++ queueData.setPerm(6);
++ routeInfoManager.registerTopic(topic, Collections.singletonList(queueData));
++ }
++ }
++
+ static class BrokerBasicInfo {
+ String clusterName;
+ String brokerName;
+diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+index 11b00a72c..d3d5de9e2 100644
+--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
++++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+@@ -17,6 +17,7 @@
+
+ package org.apache.rocketmq.test.util;
+
++import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.Set;
+@@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+ import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+ import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+ import org.apache.rocketmq.remoting.protocol.route.BrokerData;
++import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+ import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
+ import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne;
+ import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
+@@ -319,4 +321,39 @@ public class MQAdminTestUtils {
+ }
+ return consumeStats;
+ }
++
++ /**
++ * Delete topic from broker only without cleaning route info from name server forwardly
++ *
++ * @param nameSrvAddr the namesrv addr to connect
++ * @param brokerName the specific broker
++ * @param topic the specific topic to delete
++ */
++ public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String brokerName, String topic) {
++ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
++ mqAdminExt.setNamesrvAddr(nameSrvAddr);
++
++ try {
++ mqAdminExt.start();
++ String brokerAddr = CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName);
++ mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), topic);
++ } catch (Exception ignored) {
++ } finally {
++ mqAdminExt.shutdown();
++ }
++ }
++
++ public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, String topicName) {
++ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
++ mqAdminExt.setNamesrvAddr(nameSrvAddr);
++ TopicRouteData route = null;
++ try {
++ mqAdminExt.start();
++ route = mqAdminExt.examineTopicRouteInfo(topicName);
++ } catch (Exception ignored) {
++ } finally {
++ mqAdminExt.shutdown();
++ }
++ return route;
++ }
+ }
+diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
+similarity index 99%
+rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+rename to test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
+index 9e142eb61..43fefd616 100644
+--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
++++ b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
+@@ -14,7 +14,7 @@
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-package org.apache.rocketmq.test.base.dledger;
++package org.apache.rocketmq.test.dledger;
+
+ import java.util.UUID;
+ import org.apache.rocketmq.broker.BrokerController;
+diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+new file mode 100644
+index 000000000..7e3c7b871
+--- /dev/null
++++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+@@ -0,0 +1,114 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.test.route;
++
++import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
++import org.apache.rocketmq.test.base.BaseConf;
++import org.apache.rocketmq.test.util.MQAdminTestUtils;
++import org.junit.Test;
++
++import static org.assertj.core.api.Assertions.assertThat;
++
++public class CreateAndUpdateTopicIT extends BaseConf {
++
++ @Test
++ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
++ String topic = "test-topic-without-broker-registration";
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
++
++ final boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null);
++ assertThat(createResult).isTrue();
++
++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic);
++ assertThat(route.getBrokerDatas()).hasSize(3);
++ assertThat(route.getQueueDatas()).hasSize(3);
++
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
++
++ }
++
++ @Test
++ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
++
++ String testTopic1 = "test-topic-keep-route";
++ String testTopic2 = "test-topic-delete-route";
++
++ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null);
++ assertThat(createResult).isTrue();
++
++
++ createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null);
++ assertThat(createResult).isTrue();
++
++
++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
++ assertThat(route.getBrokerDatas()).hasSize(3);
++
++ MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, testTopic2);
++
++ // Deletion is lazy, trigger broker registration
++ brokerController1.registerBrokerAll(false, false, true);
++
++ // The route info of testTopic2 will be removed from broker1 after the registration
++ route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
++ assertThat(route.getBrokerDatas()).hasSize(2);
++ assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
++ assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
++
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
++ }
++
++ @Test
++ public void testStaticTopicNotAffected() throws Exception {
++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
++
++ String testTopic = "test-topic-not-affected";
++ String testStaticTopic = "test-static-topic";
++
++ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic, 8, null);
++ assertThat(createResult).isTrue();
++
++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic);
++ assertThat(route.getBrokerDatas()).hasSize(3);
++ assertThat(route.getQueueDatas()).hasSize(3);
++
++ MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, null, CLUSTER_NAME, NAMESRV_ADDR);
++
++ assertThat(route.getBrokerDatas()).hasSize(3);
++ assertThat(route.getQueueDatas()).hasSize(3);
++
++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
++ }
++}
+--
+2.32.0.windows.2
+
+
+From 32eb1d55570af81641a4a40d96ff5554329b93cb Mon Sep 17 00:00:00 2001
+From: gaoyf <gaoyf@users.noreply.github.com>
+Date: Tue, 25 Jul 2023 15:26:20 +0800
+Subject: [PATCH 05/10] [ISSUE #7068] Fix failed to create syncer topic when
+ the proxy was just started (#7076)
+
+---
+ .../apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java | 1 +
+ 1 file changed, 1 insertion(+)
+
+diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+index f7d9b11ba..c68859b28 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
++++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+@@ -104,6 +104,7 @@ public class MQClientAPIFactory implements StartAndShutdown {
+ rpcHook);
+
+ if (!mqClientAPIExt.updateNameServerAddressList()) {
++ mqClientAPIExt.fetchNameServerAddr();
+ this.scheduledExecutorService.scheduleAtFixedRate(
+ mqClientAPIExt::fetchNameServerAddr,
+ Duration.ofSeconds(10).toMillis(),
+--
+2.32.0.windows.2
+
+
+From d79737788078707168c0258c4af0d800de32c137 Mon Sep 17 00:00:00 2001
+From: Vincent Lee <cool8511@gmail.com>
+Date: Thu, 27 Jul 2023 10:51:51 +0800
+Subject: [PATCH 06/10] [ISSUE #7056] Avoid close success channel if invokeSync
+ most time cost on get connection for channel (#7057)
+
+* fix: avoid close success channel if invokeSync most time cost on get channel
+
+Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325
+
+* fix: ci style
+
+Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8
+---
+ .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++---
+ 1 file changed, 8 insertions(+), 3 deletions(-)
+
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+index 9715b918a..8491f4354 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+@@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
++ private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
+
+ private final NettyClientConfig nettyClientConfig;
+ private final Bootstrap bootstrap = new Bootstrap();
+@@ -524,13 +525,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
+ final Channel channel = this.getAndCreateChannel(addr);
+ String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
+ if (channel != null && channel.isActive()) {
++ long left = timeoutMillis;
+ try {
+ doBeforeRpcHooks(channelRemoteAddr, request);
+ long costTime = System.currentTimeMillis() - beginStartTime;
+- if (timeoutMillis < costTime) {
++ left -= costTime;
++ if (left <= 0) {
+ throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout");
+ }
+- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
++ RemotingCommand response = this.invokeSyncImpl(channel, request, left);
+ doAfterRpcHooks(channelRemoteAddr, request, response);
+ this.updateChannelLastResponseTime(addr);
+ return response;
+@@ -539,7 +542,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
+ this.closeChannel(addr, channel);
+ throw e;
+ } catch (RemotingTimeoutException e) {
+- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
++ // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small
++ boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4;
++ if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) {
+ this.closeChannel(addr, channel);
+ LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);
+ }
+--
+2.32.0.windows.2
+
+
+From d0a69be563785ca815dc31ef1aab4c1bc5588c01 Mon Sep 17 00:00:00 2001
+From: zd46319 <zd46319@163.com>
+Date: Thu, 27 Jul 2023 16:56:41 +0800
+Subject: [PATCH 07/10] [ISSUE #6810] Fix the bug of mistakenly deleting data
+ in clientChannelTable when the channel expire (#7073)
+
+---
+ .../broker/client/ProducerManager.java | 5 ++-
+ .../broker/client/ProducerManagerTest.java | 34 +++++++++++++++++++
+ 2 files changed, 38 insertions(+), 1 deletion(-)
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+index 52d67bf28..f9fe1193e 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+@@ -112,7 +112,10 @@ public class ProducerManager {
+ long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ it.remove();
+- clientChannelTable.remove(info.getClientId());
++ Channel channelInClientTable = clientChannelTable.get(info.getClientId());
++ if (channelInClientTable != null && channelInClientTable.equals(info.getChannel())) {
++ clientChannelTable.remove(info.getClientId());
++ }
+ log.warn(
+ "ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+index dac5468c8..3d6091e02 100644
+--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
++++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+@@ -27,6 +27,7 @@ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.mockito.Mock;
++import org.mockito.Mockito;
+ import org.mockito.junit.MockitoJUnitRunner;
+
+ import static org.assertj.core.api.Assertions.assertThat;
+@@ -79,6 +80,39 @@ public class ProducerManagerTest {
+ assertThat(producerManager.findChannel("clientId")).isNull();
+ }
+
++ @Test
++ public void scanNotActiveChannelWithSameClientId() throws Exception {
++ producerManager.registerProducer(group, clientInfo);
++ Channel channel1 = Mockito.mock(Channel.class);
++ ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, clientInfo.getClientId(), LanguageCode.JAVA, 0);
++ producerManager.registerProducer(group, clientInfo1);
++ AtomicReference<String> groupRef = new AtomicReference<>();
++ AtomicReference<ClientChannelInfo> clientChannelInfoRef = new AtomicReference<>();
++ producerManager.appendProducerChangeListener((event, group, clientChannelInfo) -> {
++ switch (event) {
++ case GROUP_UNREGISTER:
++ groupRef.set(group);
++ break;
++ case CLIENT_UNREGISTER:
++ clientChannelInfoRef.set(clientChannelInfo);
++ break;
++ default:
++ break;
++ }
++ });
++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
++ assertThat(producerManager.findChannel("clientId")).isNotNull();
++ Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
++ field.setAccessible(true);
++ long channelExpiredTimeout = field.getLong(producerManager);
++ clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - channelExpiredTimeout - 10);
++ when(channel.close()).thenReturn(mock(ChannelFuture.class));
++ producerManager.scanNotActiveChannel();
++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull();
++ assertThat(producerManager.findChannel("clientId")).isNotNull();
++ }
++
+ @Test
+ public void doChannelCloseEvent() throws Exception {
+ producerManager.registerProducer(group, clientInfo);
+--
+2.32.0.windows.2
+
+
+From d429bd72dfae0901f4325c8e9c6ce631286e40d4 Mon Sep 17 00:00:00 2001
+From: cnScarb <jjhfen00@163.com>
+Date: Fri, 28 Jul 2023 09:46:39 +0800
+Subject: [PATCH 08/10] [ISSUE #7039] Fix retry message filter when subtype is
+ TAG (#7040)
+
+---
+ .../broker/filter/ExpressionForRetryMessageFilter.java | 6 +++---
+ 1 file changed, 3 insertions(+), 3 deletions(-)
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
+index d2d1087ef..bc01b21cb 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
+@@ -45,12 +45,12 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
+ return true;
+ }
+
+- boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+-
+- if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
++ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ return true;
+ }
+
++ boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
++
+ ConsumerFilterData realFilterData = this.consumerFilterData;
+ Map<String, String> tempProperties = properties;
+ boolean decoded = false;
+--
+2.32.0.windows.2
+
+
+From 8baa51e85e569429293720b2ba7fcaee745abecc Mon Sep 17 00:00:00 2001
+From: Zack_Aayush <60972989+AayushSaini101@users.noreply.github.com>
+Date: Sun, 30 Jul 2023 09:02:02 +0530
+Subject: [PATCH 09/10] [ISSUE #7091] Update the cd command in README (#7096)
+
+* Update the cd command
+
+* Removed extra space
+
+---------
+
+Co-authored-by: Aayush <aaayush@redhat.com>
+---
+ README.md | 2 +-
+ 1 file changed, 1 insertion(+), 1 deletion(-)
+
+diff --git a/README.md b/README.md
+index 393ef88e6..56d253ce1 100644
+--- a/README.md
++++ b/README.md
+@@ -63,7 +63,7 @@ $ unzip rocketmq-all-5.1.3-bin-release.zip
+
+ Prepare a terminal and change to the extracted `bin` directory:
+ ```shell
+-$ cd rocketmq-all-5.1.3/bin
++$ cd rocketmq-all-5.1.3-bin-release/bin
+ ```
+
+ **1) Start NameServer**
+--
+2.32.0.windows.2
+
+
+From 8bcc94829d2ef2597a8eeab3c6b7099432a0bea1 Mon Sep 17 00:00:00 2001
+From: weihubeats <weihubeats@163.com>
+Date: Tue, 1 Aug 2023 10:15:07 +0800
+Subject: [PATCH 10/10] [ISSUE #7077] Schedule CQ offset invalid. offset=77,
+ cqMinOffset=0, cqMaxOffset=74, queueId=1 (#7084)
+
+* Adding null does not update
+
+* delete slave put correctDelayOffset
+
+* Remove duplicate delayOffset file loading
+
+* add loadWhenSyncDelayOffset
+
+* add method
+
+* add method
+---
+ .../rocketmq/broker/schedule/ScheduleMessageService.java | 6 ++++++
+ .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 2 +-
+ 2 files changed, 7 insertions(+), 1 deletion(-)
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+index 2a4ace098..26f09dcd0 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+@@ -223,6 +223,12 @@ public class ScheduleMessageService extends ConfigManager {
+ result = result && this.correctDelayOffset();
+ return result;
+ }
++
++ public boolean loadWhenSyncDelayOffset() {
++ boolean result = super.load();
++ result = result && this.parseDelayLevel();
++ return result;
++ }
+
+ public boolean correctDelayOffset() {
+ try {
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+index b9de5173b..53cdecdf8 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+@@ -152,7 +152,7 @@ public class SlaveSynchronize {
+ .getMessageStoreConfig().getStorePathRootDir());
+ try {
+ MixAll.string2File(delayOffset, fileName);
+- this.brokerController.getScheduleMessageService().load();
++ this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset();
+ } catch (IOException e) {
+ LOGGER.error("Persist file Exception, {}", fileName, e);
+ }
+--
+2.32.0.windows.2
+