diff options
Diffstat (limited to 'patch007-backport-fix-some-bugs.patch')
-rw-r--r-- | patch007-backport-fix-some-bugs.patch | 1425 |
1 files changed, 1425 insertions, 0 deletions
diff --git a/patch007-backport-fix-some-bugs.patch b/patch007-backport-fix-some-bugs.patch new file mode 100644 index 0000000..6281d09 --- /dev/null +++ b/patch007-backport-fix-some-bugs.patch @@ -0,0 +1,1425 @@ +From 90c5382aee07879a80309f257f04114201ccaac6 Mon Sep 17 00:00:00 2001 +From: ShuangxiDing <dingshuangxi888@gmail.com> +Date: Fri, 21 Jul 2023 20:28:58 +0800 +Subject: [PATCH 01/10] [ISSUE #7061] Support forward HAProxyMessage for Multi + Protocol server. (#7062) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator + +* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC and Remoting server. + +* 回滚netty的升级 + +* Support proxy protocol for gRPC and Remoting server. + +* Support proxy protocol for gRPC and Remoting server. + +* Support proxy protocol for gRPC and Remoting server. + +* add grpc-netty-codec-haproxy in bazel + +* add grpc-netty-codec-haproxy in bazel + +* Support proxy protocol for gRPC and Remoting server. + +* Fix Test + +* add grpc-netty-codec-haproxy in bazel + +* add ProxyProtocolTest for Remoting + +* Support HAProxyMessage forward for multi protocol server. + +--------- + +Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com> +--- + .../http2proxy/HAProxyMessageForwarder.java | 129 ++++++++++++++++++ + .../http2proxy/Http2ProtocolProxyHandler.java | 23 +++- + .../http2proxy/Http2ProxyBackendHandler.java | 2 + + .../http2proxy/Http2ProxyFrontendHandler.java | 28 ++-- + 4 files changed, 164 insertions(+), 18 deletions(-) + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java +new file mode 100644 +index 000000000..8f139d3d9 +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java +@@ -0,0 +1,129 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; ++ ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.Unpooled; ++import io.netty.channel.Channel; ++import io.netty.channel.ChannelHandlerContext; ++import io.netty.channel.ChannelInboundHandlerAdapter; ++import io.netty.handler.codec.haproxy.HAProxyCommand; ++import io.netty.handler.codec.haproxy.HAProxyMessage; ++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; ++import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; ++import io.netty.handler.codec.haproxy.HAProxyTLV; ++import io.netty.util.Attribute; ++import io.netty.util.DefaultAttributeMap; ++import org.apache.commons.codec.binary.Hex; ++import org.apache.commons.lang3.ArrayUtils; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.commons.lang3.reflect.FieldUtils; ++import org.apache.rocketmq.acl.common.AclUtils; ++import org.apache.rocketmq.common.constant.HAProxyConstants; ++import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; ++ ++import java.lang.reflect.Field; ++import java.nio.charset.Charset; ++import java.util.ArrayList; ++import java.util.List; ++ ++public class HAProxyMessageForwarder extends ChannelInboundHandlerAdapter { ++ ++ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); ++ ++ private static final Field FIELD_ATTRIBUTE = ++ FieldUtils.getField(DefaultAttributeMap.class, "attributes", true); ++ ++ private final Channel outboundChannel; ++ ++ public HAProxyMessageForwarder(final Channel outboundChannel) { ++ this.outboundChannel = outboundChannel; ++ } ++ ++ @Override ++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ++ try { ++ forwardHAProxyMessage(ctx.channel(), outboundChannel); ++ ctx.fireChannelRead(msg); ++ } catch (Exception e) { ++ log.error("Forward HAProxyMessage from Remoting to gRPC server error.", e); ++ throw e; ++ } finally { ++ ctx.pipeline().remove(this); ++ } ++ } ++ ++ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChannel) throws Exception { ++ if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { ++ return; ++ } ++ ++ if (!(inboundChannel instanceof DefaultAttributeMap)) { ++ return; ++ } ++ ++ Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel); ++ if (ArrayUtils.isEmpty(attributes)) { ++ return; ++ } ++ ++ String sourceAddress = null, destinationAddress = null; ++ int sourcePort = 0, destinationPort = 0; ++ List<HAProxyTLV> haProxyTLVs = new ArrayList<>(); ++ ++ for (Attribute<?> attribute : attributes) { ++ String attributeKey = attribute.key().name(); ++ if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_PREFIX)) { ++ continue; ++ } ++ String attributeValue = (String) attribute.get(); ++ if (StringUtils.isEmpty(attributeValue)) { ++ continue; ++ } ++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_ADDR) { ++ sourceAddress = attributeValue; ++ } ++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_PORT) { ++ sourcePort = Integer.parseInt(attributeValue); ++ } ++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR) { ++ destinationAddress = attributeValue; ++ } ++ if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) { ++ destinationPort = Integer.parseInt(attributeValue); ++ } ++ if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) { ++ String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX); ++ ByteBuf byteBuf = Unpooled.buffer(); ++ byteBuf.writeBytes(attributeValue.getBytes(Charset.defaultCharset())); ++ HAProxyTLV haProxyTLV = new HAProxyTLV(Hex.decodeHex(typeString)[0], byteBuf); ++ haProxyTLVs.add(haProxyTLV); ++ } ++ } ++ ++ HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 : ++ HAProxyProxiedProtocol.TCP4; ++ ++ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, ++ proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs); ++ outboundChannel.writeAndFlush(message).sync(); ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +index 913f35c93..c37db92af 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +@@ -24,13 +24,14 @@ import io.netty.channel.ChannelFuture; + import io.netty.channel.ChannelHandlerContext; + import io.netty.channel.ChannelInitializer; + import io.netty.channel.ChannelOption; ++import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; + import io.netty.handler.ssl.ApplicationProtocolConfig; + import io.netty.handler.ssl.ApplicationProtocolNames; + import io.netty.handler.ssl.SslContext; + import io.netty.handler.ssl.SslContextBuilder; ++import io.netty.handler.ssl.SslHandler; + import io.netty.handler.ssl.SslProvider; + import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +-import javax.net.ssl.SSLException; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -38,8 +39,11 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; + import org.apache.rocketmq.remoting.common.TlsMode; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; + import org.apache.rocketmq.remoting.netty.TlsSystemConfig; + ++import javax.net.ssl.SSLException; ++ + public class Http2ProtocolProxyHandler implements ProtocolHandler { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + private static final String LOCAL_HOST = "127.0.0.1"; +@@ -101,11 +105,8 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { + .handler(new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel ch) throws Exception { +- if (sslContext != null) { +- ch.pipeline() +- .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort())); +- } +- ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel)); ++ ch.pipeline().addLast(null, Http2ProxyBackendHandler.HANDLER_NAME, ++ new Http2ProxyBackendHandler(inboundChannel)); + } + }) + .option(ChannelOption.AUTO_READ, false) +@@ -120,7 +121,15 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { + } + + final Channel outboundChannel = f.channel(); ++ if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { ++ ctx.pipeline().addLast(new HAProxyMessageForwarder(outboundChannel)); ++ outboundChannel.pipeline().addFirst(HAProxyMessageEncoder.INSTANCE); ++ } + +- ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel)); ++ SslHandler sslHandler = null; ++ if (sslContext != null) { ++ sslHandler = sslContext.newHandler(outboundChannel.alloc(), LOCAL_HOST, config.getGrpcServerPort()); ++ } ++ ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel, sslHandler)); + } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java +index 0195b0c1c..fd5408fae 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java +@@ -29,6 +29,8 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + ++ public static final String HANDLER_NAME = "Http2ProxyBackendHandler"; ++ + private final Channel inboundChannel; + + public Http2ProxyBackendHandler(Channel inboundChannel) { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java +index 87147a322..9b37e85e5 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java +@@ -19,36 +19,42 @@ package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + + import io.netty.buffer.Unpooled; + import io.netty.channel.Channel; +-import io.netty.channel.ChannelFuture; + import io.netty.channel.ChannelFutureListener; + import io.netty.channel.ChannelHandlerContext; + import io.netty.channel.ChannelInboundHandlerAdapter; ++import io.netty.handler.ssl.SslHandler; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); ++ ++ public static final String HANDLER_NAME = "SslHandler"; ++ + // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as + // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. + private final Channel outboundChannel; ++ private final SslHandler sslHandler; + +- public Http2ProxyFrontendHandler(final Channel outboundChannel) { ++ public Http2ProxyFrontendHandler(final Channel outboundChannel, final SslHandler sslHandler) { + this.outboundChannel = outboundChannel; ++ this.sslHandler = sslHandler; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + if (outboundChannel.isActive()) { +- outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { +- @Override +- public void operationComplete(ChannelFuture future) { +- if (future.isSuccess()) { +- // was able to flush out data, start to read the next chunk +- ctx.channel().read(); +- } else { +- future.channel().close(); +- } ++ if (sslHandler != null && outboundChannel.pipeline().get(HANDLER_NAME) == null) { ++ outboundChannel.pipeline().addBefore(Http2ProxyBackendHandler.HANDLER_NAME, HANDLER_NAME, sslHandler); ++ } ++ ++ outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { ++ if (future.isSuccess()) { ++ // was able to flush out data, start to read the next chunk ++ ctx.channel().read(); ++ } else { ++ future.channel().close(); + } + }); + } +-- +2.32.0.windows.2 + + +From 8027cfc7cbb6c120d2fc045e0caa8debe1028a31 Mon Sep 17 00:00:00 2001 +From: maclong1989 <814742806@qq.com> +Date: Sun, 23 Jul 2023 09:15:05 +0800 +Subject: [PATCH 02/10] [ISSUE #7063] doc: fix typo in user_guide.md + +Signed-off-by: jiangyl3 <jiangyl3@asiainfo.com> +Co-authored-by: jiangyl3 <jiangyl3@asiainfo.com> +--- + docs/cn/msg_trace/user_guide.md | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md +index d8314052b..9cf139fd3 100644 +--- a/docs/cn/msg_trace/user_guide.md ++++ b/docs/cn/msg_trace/user_guide.md +@@ -35,7 +35,7 @@ namesrvAddr=XX.XX.XX.XX:9876 + RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。 + + ### 2.3 物理IO隔离模式 +-对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。 ++对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。 + + ### 2.4 启动开启消息轨迹的Broker + `nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &` +-- +2.32.0.windows.2 + + +From 3102758487f3e21e977424d7f1b7187eb6c069cb Mon Sep 17 00:00:00 2001 +From: =?UTF-8?q?=E5=90=B4=E6=98=9F=E7=81=BF?= + <37405937+wuyoudexiao@users.noreply.github.com> +Date: Tue, 25 Jul 2023 13:47:53 +0800 +Subject: [PATCH 03/10] fix: npe in lockBatchMQ and unlockBatchMQ (#7078) + +Co-authored-by: wxc <wuxingcan666@foxmail.com> +--- + .../rocketmq/proxy/processor/ConsumerProcessor.java | 11 +++++++---- + 1 file changed, 7 insertions(+), 4 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +index cc973813b..656a6339d 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.processor; + + import java.util.ArrayList; + import java.util.HashMap; ++import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; +@@ -425,13 +426,15 @@ public class ConsumerProcessor extends AbstractProcessor { + } + + protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) { +- return mqSet.stream().map(mq -> { ++ Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size()); ++ for (MessageQueue mq:mqSet) { + try { +- return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq); ++ addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ; + } catch (Exception e) { +- return null; ++ log.error("build addressable message queue fail, messageQueue = {}", mq, e); + } +- }).collect(Collectors.toSet()); ++ } ++ return addressableMessageQueueSet; + } + + protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName( +-- +2.32.0.windows.2 + + +From 047ef7498f2203a2234052603a99a114d8a65e17 Mon Sep 17 00:00:00 2001 +From: rongtong <jinrongtong5@163.com> +Date: Tue, 25 Jul 2023 14:00:22 +0800 +Subject: [PATCH 04/10] Ensuring consistency between broker and nameserver data + when deleting a topic (#7066) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +Co-authored-by: 尘央 <xinyuzhou.zxy@alibaba-inc.com> +--- + .../rocketmq/broker/BrokerController.java | 11 ++ + .../rocketmq/broker/out/BrokerOuterAPI.java | 62 ++++++++++ + .../processor/AdminBrokerProcessor.java | 26 ++-- + .../broker/topic/TopicConfigManager.java | 6 +- + .../apache/rocketmq/common/BrokerConfig.java | 14 +++ + .../common/namesrv/NamesrvConfig.java | 17 +++ + .../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++-- + .../routeinfo/RouteInfoManagerNewTest.java | 99 +++++++++++++++ + .../rocketmq/test/util/MQAdminTestUtils.java | 37 ++++++ + .../dledger/DLedgerProduceAndConsumeIT.java | 2 +- + .../test/route/CreateAndUpdateTopicIT.java | 114 ++++++++++++++++++ + 11 files changed, 429 insertions(+), 23 deletions(-) + rename test/src/test/java/org/apache/rocketmq/test/{base => }/dledger/DLedgerProduceAndConsumeIT.java (99%) + create mode 100644 test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +index 196401e26..972457194 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +@@ -1678,6 +1678,17 @@ public class BrokerController { + }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS)); + } + ++ public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) { ++ TopicConfig tmpTopic = topicConfig; ++ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) ++ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ++ // Copy the topic config and modify the perm ++ tmpTopic = new TopicConfig(topicConfig); ++ tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission()); ++ } ++ this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000); ++ } ++ + public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { + this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion); + } +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +index b6273e9ed..1793a83c0 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.common.TopicConfig; + import org.apache.rocketmq.common.UnlockCallback; + import org.apache.rocketmq.common.UtilAll; + import org.apache.rocketmq.common.constant.LoggerName; +@@ -120,12 +121,14 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ + import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader; ++import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; + import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; + import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; + import org.apache.rocketmq.remoting.protocol.route.BrokerData; ++import org.apache.rocketmq.remoting.protocol.route.QueueData; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + import org.apache.rocketmq.remoting.rpc.ClientMetadata; + import org.apache.rocketmq.remoting.rpc.RpcClient; +@@ -614,6 +617,65 @@ public class BrokerOuterAPI { + throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); + } + ++ /** ++ * Register the topic route info of single topic to all name server nodes. ++ * This method is used to replace incremental broker registration feature. ++ */ ++ public void registerSingleTopicAll( ++ final String brokerName, ++ final TopicConfig topicConfig, ++ final int timeoutMills) { ++ String topic = topicConfig.getTopicName(); ++ RegisterTopicRequestHeader requestHeader = new RegisterTopicRequestHeader(); ++ requestHeader.setTopic(topic); ++ ++ TopicRouteData topicRouteData = new TopicRouteData(); ++ List<QueueData> queueDatas = new ArrayList<>(); ++ topicRouteData.setQueueDatas(queueDatas); ++ ++ final QueueData queueData = new QueueData(); ++ queueData.setBrokerName(brokerName); ++ queueData.setPerm(topicConfig.getPerm()); ++ queueData.setReadQueueNums(topicConfig.getReadQueueNums()); ++ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); ++ queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); ++ queueDatas.add(queueData); ++ final byte[] topicRouteBody = topicRouteData.encode(); ++ ++ ++ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); ++ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); ++ for (final String namesrvAddr : nameServerAddressList) { ++ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, requestHeader); ++ request.setBody(topicRouteBody); ++ ++ try { ++ brokerOuterExecutor.execute(() -> { ++ try { ++ RemotingCommand response = BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); ++ assert response != null; ++ LOGGER.info("Register single topic %s to broker %s with response code %s", topic, brokerName, response.getCode()); ++ } catch (Exception e) { ++ LOGGER.warn(String.format("Register single topic %s to broker %s exception", topic, brokerName), e); ++ } finally { ++ countDownLatch.countDown(); ++ } ++ }); ++ } catch (Exception e) { ++ LOGGER.warn("Execute single topic registration task failed, topic {}, broker name {}", topic, brokerName); ++ countDownLatch.countDown(); ++ } ++ ++ } ++ ++ try { ++ if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) { ++ LOGGER.warn("Registration single topic to one or more name servers timeout. Timeout threshold: {}ms", timeoutMills); ++ } ++ } catch (InterruptedException ignore) { ++ } ++ } ++ + public List<Boolean> needRegister( + final String clusterName, + final String brokerAddr, +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +index 892a71330..569a1c57b 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + + try { + this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); +- this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); ++ } + response.setCode(ResponseCode.SUCCESS); + } catch (Exception e) { + LOGGER.error("Update / create topic failed for [{}]", request, e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(e.getMessage()); + } ++ + return response; + } + +@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + return response; + } + +- private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand request) { ++ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, ++ RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + +@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + } + MessageStore messageStore = this.brokerController.getMessageStore(); + if (messageStore instanceof DefaultMessageStore) { +- DefaultMessageStore defaultMessageStore = (DefaultMessageStore)messageStore; ++ DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore; + if (mode == LibC.MADV_NORMAL) { + defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true); + } else { +@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { + /** + * Reset consumer offset. + * +- * @param topic Required, not null. +- * @param group Required, not null. +- * @param queueId if target queue ID is negative, all message queues will be reset; +- * otherwise, only the target queue would get reset. +- * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; +- * otherwise, binary search is performed to locate target offset. +- * @param offset Target offset to reset to if target queue ID is properly provided. ++ * @param topic Required, not null. ++ * @param group Required, not null. ++ * @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue ++ * would get reset. ++ * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise, ++ * binary search is performed to locate target offset. ++ * @param offset Target offset to reset to if target queue ID is properly provided. + * @return Affected queues and their new offset + */ + private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +index e5fdd8675..e90530512 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager { + log.error("createTopicIfAbsent ", e); + } + if (createNew && register) { +- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { ++ this.brokerController.registerSingleTopicAll(topicConfig); ++ } else { ++ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); ++ } + } + return this.topicConfigTable.get(topicConfig.getTopicName()); + } +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index a4d82d1c5..02c692e2b 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity { + */ + private boolean popResponseReturnActualRetryTopic = false; + ++ /** ++ * If both the deleteTopicWithBrokerRegistration flag in the NameServer configuration and this flag are set to true, ++ * it guarantees the ultimate consistency of data between the broker and the nameserver during topic deletion. ++ */ ++ private boolean enableSingleTopicRegister = false; ++ + public long getMaxPopPollingSize() { + return maxPopPollingSize; + } +@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity { + public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) { + this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic; + } ++ ++ public boolean isEnableSingleTopicRegister() { ++ return enableSingleTopicRegister; ++ } ++ ++ public void setEnableSingleTopicRegister(boolean enableSingleTopicRegister) { ++ this.enableSingleTopicRegister = enableSingleTopicRegister; ++ } + } +diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java +index 700febfe2..5b8a6dedb 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java +@@ -82,6 +82,15 @@ public class NamesrvConfig { + + private int waitSecondsForService = 45; + ++ /** ++ * If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server. ++ * ++ * WARNING: ++ * 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly. ++ * 2. This flag does not support static topic currently. ++ */ ++ private boolean deleteTopicWithBrokerRegistration = false; ++ + public boolean isOrderMessageEnable() { + return orderMessageEnable; + } +@@ -241,4 +250,12 @@ public class NamesrvConfig { + public void setWaitSecondsForService(int waitSecondsForService) { + this.waitSecondsForService = waitSecondsForService; + } ++ ++ public boolean isDeleteTopicWithBrokerRegistration() { ++ return deleteTopicWithBrokerRegistration; ++ } ++ ++ public void setDeleteTopicWithBrokerRegistration(boolean deleteTopicWithBrokerRegistration) { ++ this.deleteTopicWithBrokerRegistration = deleteTopicWithBrokerRegistration; ++ } + } +diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +index ac27d76ce..0055a1cc8 100644 +--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ++++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +@@ -121,9 +121,18 @@ public class RouteInfoManager { + if (queueDatas == null || queueDatas.isEmpty()) { + return; + } ++ + try { + this.lock.writeLock().lockInterruptibly(); + if (this.topicQueueTable.containsKey(topic)) { ++ Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); ++ for (QueueData queueData : queueDatas) { ++ if (!this.brokerAddrTable.containsKey(queueData.getBrokerName())) { ++ log.warn("Register topic contains illegal broker, {}, {}", topic, queueData); ++ return; ++ } ++ queueDataMap.put(queueData.getBrokerName(), queueData); ++ } + log.info("Topic route already exist.{}, {}", topic, this.topicQueueTable.get(topic)); + } else { + // check and construct queue data map +@@ -299,7 +308,32 @@ public class RouteInfoManager { + + ConcurrentMap<String, TopicConfig> tcTable = + topicConfigWrapper.getTopicConfigTable(); ++ + if (tcTable != null) { ++ ++ TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); ++ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); ++ ++ // Delete the topics that don't exist in tcTable from the current broker ++ // Static topic is not supported currently ++ if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && topicQueueMappingInfoMap.isEmpty()) { ++ final Set<String> oldTopicSet = topicSetOfBrokerName(brokerName); ++ final Set<String> newTopicSet = tcTable.keySet(); ++ final Sets.SetView<String> toDeleteTopics = Sets.difference(oldTopicSet, newTopicSet); ++ for (final String toDeleteTopic : toDeleteTopics) { ++ Map<String, QueueData> queueDataMap = topicQueueTable.get(toDeleteTopic); ++ final QueueData removedQD = queueDataMap.remove(brokerName); ++ if (removedQD != null) { ++ log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, toDeleteTopic, removedQD); ++ } ++ ++ if (queueDataMap.isEmpty()) { ++ log.info("deleteTopic, remove the topic all queue {}", toDeleteTopic); ++ topicQueueTable.remove(toDeleteTopic); ++ } ++ } ++ } ++ + for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { + if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr, + topicConfigWrapper.getDataVersion(), brokerName, +@@ -312,19 +346,17 @@ public class RouteInfoManager { + this.createAndUpdateQueueData(brokerName, topicConfig); + } + } +- } + +- if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { +- TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); +- Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); +- //the topicQueueMappingInfoMap should never be null, but can be empty +- for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) { +- if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { +- topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>()); ++ if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ++ //the topicQueueMappingInfoMap should never be null, but can be empty ++ for (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) { ++ if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { ++ topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>()); ++ } ++ //Note asset brokerName equal entry.getValue().getBname() ++ //here use the mappingDetail.bname ++ topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); + } +- //Note asset brokerName equal entry.getValue().getBname() +- //here use the mappingDetail.bname +- topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); + } + } + } +@@ -374,6 +406,16 @@ public class RouteInfoManager { + return result; + } + ++ private Set<String> topicSetOfBrokerName(final String brokerName) { ++ Set<String> topicOfBroker = new HashSet<>(); ++ for (final Entry<String, Map<String, QueueData>> entry : this.topicQueueTable.entrySet()) { ++ if (entry.getValue().containsKey(brokerName)) { ++ topicOfBroker.add(entry.getKey()); ++ } ++ } ++ return topicOfBroker; ++ } ++ + public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName) { + BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, brokerName); + try { +diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java +index b53519e5f..6002d1f5a 100644 +--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java ++++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java +@@ -22,6 +22,7 @@ import io.netty.channel.Channel; + import java.time.Duration; + import java.util.ArrayList; + import java.util.Arrays; ++import java.util.Collections; + import java.util.List; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; + import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader; + import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult; + import org.apache.rocketmq.remoting.protocol.route.BrokerData; ++import org.apache.rocketmq.remoting.protocol.route.QueueData; + import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + import org.junit.After; + import org.junit.Before; +@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest { + .containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, BrokerBasicInfo.slaveBroker().brokerAddr); + } + ++ @Test ++ public void keepTopicWithBrokerRegistration() { ++ RegisterBrokerResult masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ ++ masterResult = registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ } ++ ++ @Test ++ public void deleteTopicWithBrokerRegistration() { ++ config.setDeleteTopicWithBrokerRegistration(true); ++ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ ++ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ } ++ ++ @Test ++ public void deleteTopicWithBrokerRegistration2() { ++ // Register two brokers and delete a specific one by one ++ config.setDeleteTopicWithBrokerRegistration(true); ++ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker(); ++ final BrokerBasicInfo master2 = BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9); ++ ++ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1"); ++ registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1"); ++ ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); ++ ++ ++ registerBrokerWithNormalTopic(master1, "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName()) ++ .isEqualTo(master2.brokerName); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); ++ ++ registerBrokerWithNormalTopic(master2, "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2); ++ } ++ ++ @Test ++ public void registerSingleTopicWithBrokerRegistration() { ++ config.setDeleteTopicWithBrokerRegistration(true); ++ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker(); ++ ++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic"); ++ ++ // Single topic registration failed because there is no broker connection exists ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); ++ ++ // Register broker with TestTopic first and then register single topic TestTopic1 ++ registerBrokerWithNormalTopic(master1, "TestTopic"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ ++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ ++ // Register the two topics to keep the route info ++ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ ++ // Cancel the TestTopic1 with broker registration ++ registerBrokerWithNormalTopic(master1, "TestTopic"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull(); ++ ++ // Add TestTopic1 and cancel all the topics with broker un-registration ++ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1"); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull(); ++ ++ routeInfoManager.unregisterBroker(master1.clusterName, master1.brokerAddr, master1.brokerName, 0); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull(); ++ assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull(); ++ ++ ++ } ++ + private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) { + ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>(); + TopicConfig baseTopic = new TopicConfig("baseTopic"); +@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest { + return registerBrokerResult; + } + ++ private void registerSingleTopicWithBrokerName(String brokerName, String... topics) { ++ for (final String topic : topics) { ++ QueueData queueData = new QueueData(); ++ queueData.setBrokerName(brokerName); ++ queueData.setReadQueueNums(8); ++ queueData.setWriteQueueNums(8); ++ queueData.setPerm(6); ++ routeInfoManager.registerTopic(topic, Collections.singletonList(queueData)); ++ } ++ } ++ + static class BrokerBasicInfo { + String clusterName; + String brokerName; +diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +index 11b00a72c..d3d5de9e2 100644 +--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java ++++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +@@ -17,6 +17,7 @@ + + package org.apache.rocketmq.test.util; + ++import java.util.Collections; + import java.util.HashMap; + import java.util.Map; + import java.util.Set; +@@ -38,6 +39,7 @@ import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; + import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; + import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; + import org.apache.rocketmq.remoting.protocol.route.BrokerData; ++import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; + import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; + import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne; + import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; +@@ -319,4 +321,39 @@ public class MQAdminTestUtils { + } + return consumeStats; + } ++ ++ /** ++ * Delete topic from broker only without cleaning route info from name server forwardly ++ * ++ * @param nameSrvAddr the namesrv addr to connect ++ * @param brokerName the specific broker ++ * @param topic the specific topic to delete ++ */ ++ public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String brokerName, String topic) { ++ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); ++ mqAdminExt.setNamesrvAddr(nameSrvAddr); ++ ++ try { ++ mqAdminExt.start(); ++ String brokerAddr = CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName); ++ mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), topic); ++ } catch (Exception ignored) { ++ } finally { ++ mqAdminExt.shutdown(); ++ } ++ } ++ ++ public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, String topicName) { ++ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); ++ mqAdminExt.setNamesrvAddr(nameSrvAddr); ++ TopicRouteData route = null; ++ try { ++ mqAdminExt.start(); ++ route = mqAdminExt.examineTopicRouteInfo(topicName); ++ } catch (Exception ignored) { ++ } finally { ++ mqAdminExt.shutdown(); ++ } ++ return route; ++ } + } +diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java +similarity index 99% +rename from test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java +rename to test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java +index 9e142eb61..43fefd616 100644 +--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java ++++ b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java +@@ -14,7 +14,7 @@ + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-package org.apache.rocketmq.test.base.dledger; ++package org.apache.rocketmq.test.dledger; + + import java.util.UUID; + import org.apache.rocketmq.broker.BrokerController; +diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +new file mode 100644 +index 000000000..7e3c7b871 +--- /dev/null ++++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java +@@ -0,0 +1,114 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.test.route; ++ ++import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; ++import org.apache.rocketmq.test.base.BaseConf; ++import org.apache.rocketmq.test.util.MQAdminTestUtils; ++import org.junit.Test; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class CreateAndUpdateTopicIT extends BaseConf { ++ ++ @Test ++ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() { ++ String topic = "test-topic-without-broker-registration"; ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); ++ ++ final boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null); ++ assertThat(createResult).isTrue(); ++ ++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic); ++ assertThat(route.getBrokerDatas()).hasSize(3); ++ assertThat(route.getQueueDatas()).hasSize(3); ++ ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); ++ ++ } ++ ++ @Test ++ public void testDeleteTopicFromNameSrvWithBrokerRegistration() { ++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true); ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); ++ ++ String testTopic1 = "test-topic-keep-route"; ++ String testTopic2 = "test-topic-delete-route"; ++ ++ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic1, 8, null); ++ assertThat(createResult).isTrue(); ++ ++ ++ createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic2, 8, null); ++ assertThat(createResult).isTrue(); ++ ++ ++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2); ++ assertThat(route.getBrokerDatas()).hasSize(3); ++ ++ MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, testTopic2); ++ ++ // Deletion is lazy, trigger broker registration ++ brokerController1.registerBrokerAll(false, false, true); ++ ++ // The route info of testTopic2 will be removed from broker1 after the registration ++ route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2); ++ assertThat(route.getBrokerDatas()).hasSize(2); ++ assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME); ++ assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME); ++ ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); ++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); ++ } ++ ++ @Test ++ public void testStaticTopicNotAffected() throws Exception { ++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true); ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true); ++ ++ String testTopic = "test-topic-not-affected"; ++ String testStaticTopic = "test-static-topic"; ++ ++ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, testTopic, 8, null); ++ assertThat(createResult).isTrue(); ++ ++ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic); ++ assertThat(route.getBrokerDatas()).hasSize(3); ++ assertThat(route.getQueueDatas()).hasSize(3); ++ ++ MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, null, CLUSTER_NAME, NAMESRV_ADDR); ++ ++ assertThat(route.getBrokerDatas()).hasSize(3); ++ assertThat(route.getQueueDatas()).hasSize(3); ++ ++ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false); ++ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false); ++ namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false); ++ } ++} +-- +2.32.0.windows.2 + + +From 32eb1d55570af81641a4a40d96ff5554329b93cb Mon Sep 17 00:00:00 2001 +From: gaoyf <gaoyf@users.noreply.github.com> +Date: Tue, 25 Jul 2023 15:26:20 +0800 +Subject: [PATCH 05/10] [ISSUE #7068] Fix failed to create syncer topic when + the proxy was just started (#7076) + +--- + .../apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java +index f7d9b11ba..c68859b28 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java +@@ -104,6 +104,7 @@ public class MQClientAPIFactory implements StartAndShutdown { + rpcHook); + + if (!mqClientAPIExt.updateNameServerAddressList()) { ++ mqClientAPIExt.fetchNameServerAddr(); + this.scheduledExecutorService.scheduleAtFixedRate( + mqClientAPIExt::fetchNameServerAddr, + Duration.ofSeconds(10).toMillis(), +-- +2.32.0.windows.2 + + +From d79737788078707168c0258c4af0d800de32c137 Mon Sep 17 00:00:00 2001 +From: Vincent Lee <cool8511@gmail.com> +Date: Thu, 27 Jul 2023 10:51:51 +0800 +Subject: [PATCH 06/10] [ISSUE #7056] Avoid close success channel if invokeSync + most time cost on get connection for channel (#7057) + +* fix: avoid close success channel if invokeSync most time cost on get channel + +Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325 + +* fix: ci style + +Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8 +--- + .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++--- + 1 file changed, 8 insertions(+), 3 deletions(-) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +index 9715b918a..8491f4354 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +@@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + + private static final long LOCK_TIMEOUT_MILLIS = 3000; ++ private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100; + + private final NettyClientConfig nettyClientConfig; + private final Bootstrap bootstrap = new Bootstrap(); +@@ -524,13 +525,15 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + final Channel channel = this.getAndCreateChannel(addr); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { ++ long left = timeoutMillis; + try { + doBeforeRpcHooks(channelRemoteAddr, request); + long costTime = System.currentTimeMillis() - beginStartTime; +- if (timeoutMillis < costTime) { ++ left -= costTime; ++ if (left <= 0) { + throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout"); + } +- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); ++ RemotingCommand response = this.invokeSyncImpl(channel, request, left); + doAfterRpcHooks(channelRemoteAddr, request, response); + this.updateChannelLastResponseTime(addr); + return response; +@@ -539,7 +542,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + this.closeChannel(addr, channel); + throw e; + } catch (RemotingTimeoutException e) { +- if (nettyClientConfig.isClientCloseSocketIfTimeout()) { ++ // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small ++ boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4; ++ if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) { + this.closeChannel(addr, channel); + LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr); + } +-- +2.32.0.windows.2 + + +From d0a69be563785ca815dc31ef1aab4c1bc5588c01 Mon Sep 17 00:00:00 2001 +From: zd46319 <zd46319@163.com> +Date: Thu, 27 Jul 2023 16:56:41 +0800 +Subject: [PATCH 07/10] [ISSUE #6810] Fix the bug of mistakenly deleting data + in clientChannelTable when the channel expire (#7073) + +--- + .../broker/client/ProducerManager.java | 5 ++- + .../broker/client/ProducerManagerTest.java | 34 +++++++++++++++++++ + 2 files changed, 38 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +index 52d67bf28..f9fe1193e 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +@@ -112,7 +112,10 @@ public class ProducerManager { + long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + it.remove(); +- clientChannelTable.remove(info.getClientId()); ++ Channel channelInClientTable = clientChannelTable.get(info.getClientId()); ++ if (channelInClientTable != null && channelInClientTable.equals(info.getChannel())) { ++ clientChannelTable.remove(info.getClientId()); ++ } + log.warn( + "ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); +diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +index dac5468c8..3d6091e02 100644 +--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java ++++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +@@ -27,6 +27,7 @@ import org.junit.Before; + import org.junit.Test; + import org.junit.runner.RunWith; + import org.mockito.Mock; ++import org.mockito.Mockito; + import org.mockito.junit.MockitoJUnitRunner; + + import static org.assertj.core.api.Assertions.assertThat; +@@ -79,6 +80,39 @@ public class ProducerManagerTest { + assertThat(producerManager.findChannel("clientId")).isNull(); + } + ++ @Test ++ public void scanNotActiveChannelWithSameClientId() throws Exception { ++ producerManager.registerProducer(group, clientInfo); ++ Channel channel1 = Mockito.mock(Channel.class); ++ ClientChannelInfo clientInfo1 = new ClientChannelInfo(channel1, clientInfo.getClientId(), LanguageCode.JAVA, 0); ++ producerManager.registerProducer(group, clientInfo1); ++ AtomicReference<String> groupRef = new AtomicReference<>(); ++ AtomicReference<ClientChannelInfo> clientChannelInfoRef = new AtomicReference<>(); ++ producerManager.appendProducerChangeListener((event, group, clientChannelInfo) -> { ++ switch (event) { ++ case GROUP_UNREGISTER: ++ groupRef.set(group); ++ break; ++ case CLIENT_UNREGISTER: ++ clientChannelInfoRef.set(clientChannelInfo); ++ break; ++ default: ++ break; ++ } ++ }); ++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); ++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull(); ++ assertThat(producerManager.findChannel("clientId")).isNotNull(); ++ Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT"); ++ field.setAccessible(true); ++ long channelExpiredTimeout = field.getLong(producerManager); ++ clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - channelExpiredTimeout - 10); ++ when(channel.close()).thenReturn(mock(ChannelFuture.class)); ++ producerManager.scanNotActiveChannel(); ++ assertThat(producerManager.getGroupChannelTable().get(group).get(channel1)).isNotNull(); ++ assertThat(producerManager.findChannel("clientId")).isNotNull(); ++ } ++ + @Test + public void doChannelCloseEvent() throws Exception { + producerManager.registerProducer(group, clientInfo); +-- +2.32.0.windows.2 + + +From d429bd72dfae0901f4325c8e9c6ce631286e40d4 Mon Sep 17 00:00:00 2001 +From: cnScarb <jjhfen00@163.com> +Date: Fri, 28 Jul 2023 09:46:39 +0800 +Subject: [PATCH 08/10] [ISSUE #7039] Fix retry message filter when subtype is + TAG (#7040) + +--- + .../broker/filter/ExpressionForRetryMessageFilter.java | 6 +++--- + 1 file changed, 3 insertions(+), 3 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +index d2d1087ef..bc01b21cb 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +@@ -45,12 +45,12 @@ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { + return true; + } + +- boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); +- +- if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) { ++ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + return true; + } + ++ boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); ++ + ConsumerFilterData realFilterData = this.consumerFilterData; + Map<String, String> tempProperties = properties; + boolean decoded = false; +-- +2.32.0.windows.2 + + +From 8baa51e85e569429293720b2ba7fcaee745abecc Mon Sep 17 00:00:00 2001 +From: Zack_Aayush <60972989+AayushSaini101@users.noreply.github.com> +Date: Sun, 30 Jul 2023 09:02:02 +0530 +Subject: [PATCH 09/10] [ISSUE #7091] Update the cd command in README (#7096) + +* Update the cd command + +* Removed extra space + +--------- + +Co-authored-by: Aayush <aaayush@redhat.com> +--- + README.md | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/README.md b/README.md +index 393ef88e6..56d253ce1 100644 +--- a/README.md ++++ b/README.md +@@ -63,7 +63,7 @@ $ unzip rocketmq-all-5.1.3-bin-release.zip + + Prepare a terminal and change to the extracted `bin` directory: + ```shell +-$ cd rocketmq-all-5.1.3/bin ++$ cd rocketmq-all-5.1.3-bin-release/bin + ``` + + **1) Start NameServer** +-- +2.32.0.windows.2 + + +From 8bcc94829d2ef2597a8eeab3c6b7099432a0bea1 Mon Sep 17 00:00:00 2001 +From: weihubeats <weihubeats@163.com> +Date: Tue, 1 Aug 2023 10:15:07 +0800 +Subject: [PATCH 10/10] [ISSUE #7077] Schedule CQ offset invalid. offset=77, + cqMinOffset=0, cqMaxOffset=74, queueId=1 (#7084) + +* Adding null does not update + +* delete slave put correctDelayOffset + +* Remove duplicate delayOffset file loading + +* add loadWhenSyncDelayOffset + +* add method + +* add method +--- + .../rocketmq/broker/schedule/ScheduleMessageService.java | 6 ++++++ + .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 2 +- + 2 files changed, 7 insertions(+), 1 deletion(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +index 2a4ace098..26f09dcd0 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +@@ -223,6 +223,12 @@ public class ScheduleMessageService extends ConfigManager { + result = result && this.correctDelayOffset(); + return result; + } ++ ++ public boolean loadWhenSyncDelayOffset() { ++ boolean result = super.load(); ++ result = result && this.parseDelayLevel(); ++ return result; ++ } + + public boolean correctDelayOffset() { + try { +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +index b9de5173b..53cdecdf8 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +@@ -152,7 +152,7 @@ public class SlaveSynchronize { + .getMessageStoreConfig().getStorePathRootDir()); + try { + MixAll.string2File(delayOffset, fileName); +- this.brokerController.getScheduleMessageService().load(); ++ this.brokerController.getScheduleMessageService().loadWhenSyncDelayOffset(); + } catch (IOException e) { + LOGGER.error("Persist file Exception, {}", fileName, e); + } +-- +2.32.0.windows.2 + |