summaryrefslogtreecommitdiff
path: root/patch005-backport-fix-some-bugs.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patch005-backport-fix-some-bugs.patch')
-rw-r--r--patch005-backport-fix-some-bugs.patch1538
1 files changed, 1538 insertions, 0 deletions
diff --git a/patch005-backport-fix-some-bugs.patch b/patch005-backport-fix-some-bugs.patch
new file mode 100644
index 0000000..a75556f
--- /dev/null
+++ b/patch005-backport-fix-some-bugs.patch
@@ -0,0 +1,1538 @@
+From 15c6889bb0abd014c06ef1452f791db9daa1ea08 Mon Sep 17 00:00:00 2001
+From: lizhimins <707364882@qq.com>
+Date: Tue, 11 Jul 2023 17:04:00 +0800
+Subject: [PATCH 1/8] fix receive message activity attempt id not correct
+ (#7012)
+
+fix receive message activity attempt id not correct
+---
+ .../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
+ .../proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++---
+ 2 files changed, 4 insertions(+), 4 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+index a504179a9..cf58bb87a 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+@@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
+ subscriptionData,
+ fifo,
+ new PopMessageResultFilterImpl(maxAttempts),
+- request.getAttemptId(),
++ request.hasAttemptId() ? request.getAttemptId() : null,
+ timeRemaining
+ ).thenAccept(popResult -> {
+ if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
+diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+index 2e562504a..7fd9a9ffd 100644
+--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
+ import static org.mockito.ArgumentMatchers.anyInt;
+ import static org.mockito.ArgumentMatchers.anyLong;
+ import static org.mockito.ArgumentMatchers.anyString;
++import static org.mockito.ArgumentMatchers.isNull;
+ import static org.mockito.Mockito.doNothing;
+ import static org.mockito.Mockito.mock;
+ import static org.mockito.Mockito.when;
+@@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
+ .setRequestTimeout(Durations.fromSeconds(3))
+ .build());
+ when(this.messagingProcessor.popMessage(any(), any(), anyString(), anyString(), anyInt(), anyLong(),
+- pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), anyString(), anyLong()))
++ pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
+
+
+@@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
+ assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues()));
+ }
+
+-
+ @Test
+ public void testReceiveMessage() {
+ StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
+@@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
+ any(),
+ anyBoolean(),
+ any(),
+- anyString(),
++ isNull(),
+ anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
+
+ this.receiveMessageActivity.receiveMessage(
+--
+2.32.0.windows.2
+
+
+From b4496be68705c1c0b282a07a1adeab4fffd670fe Mon Sep 17 00:00:00 2001
+From: ShuangxiDing <dingshuangxi888@gmail.com>
+Date: Tue, 11 Jul 2023 19:09:09 +0800
+Subject: [PATCH 2/8] [ISSUE #7010] Fix the HandshakeHandler returns when
+ detect haproxy version need more data (#7011)
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
+
+* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* 回滚netty的升级
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* add grpc-netty-codec-haproxy in bazel
+
+* add grpc-netty-codec-haproxy in bazel
+
+* Support proxy protocol for gRPC and Remoting server.
+
+* Fix Test
+
+* add grpc-netty-codec-haproxy in bazel
+
+* add ProxyProtocolTest for Remoting
+
+* Move AttributeKey from RemotingHelper to AttributeKey.
+
+* Fix the needs more data for HandshakeHandler.
+
+* Fix the needs more data for HandshakeHandler.
+
+* Fix the needs more data for HandshakeHandler.
+
+* Fix the needs more data for HandshakeHandler.
+
+---------
+
+Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
+---
+ .../remoting/MultiProtocolRemotingServer.java | 2 +-
+ .../activity/AbstractRemotingActivity.java | 16 +++++++------
+ .../activity/ClientManagerActivity.java | 24 ++++++++++---------
+ .../AbstractRemotingActivityTest.java | 10 ++++----
+ .../remoting/common/RemotingHelper.java | 21 ++++------------
+ .../remoting/netty/AttributeKeys.java | 11 ++++++++-
+ .../remoting/netty/NettyRemotingServer.java | 23 ++++++------------
+ 7 files changed, 51 insertions(+), 56 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+index 858b1f022..12d728fff 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer {
+ @Override
+ protected ChannelPipeline configChannel(SocketChannel ch) {
+ return ch.pipeline()
+- .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler())
++ .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
+ .addLast(this.getDefaultEventExecutorGroup(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new ProtocolNegotiationHandler(this.remotingProtocolHandler)
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+index 78cd203ec..ce4a63397 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
+
+ import io.netty.channel.Channel;
+ import io.netty.channel.ChannelHandlerContext;
+-import java.util.HashMap;
+-import java.util.Map;
+-import java.util.Optional;
+ import org.apache.rocketmq.acl.common.AclException;
+ import org.apache.rocketmq.client.exception.MQBrokerException;
+ import org.apache.rocketmq.client.exception.MQClientException;
+@@ -41,11 +38,16 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+ import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+ import org.apache.rocketmq.remoting.common.RemotingHelper;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
+ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.remoting.protocol.RequestCode;
+ import org.apache.rocketmq.remoting.protocol.ResponseCode;
+
++import java.util.HashMap;
++import java.util.Map;
++import java.util.Optional;
++
+ public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ protected final MessagingProcessor messagingProcessor;
+@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
+ .setProtocolType(ChannelProtocolType.REMOTING.getName())
+ .setChannel(channel)
+ .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
+- .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
++ .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY, channel))
++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel))
+ .ifPresent(language -> context.setLanguage(language.name()));
+- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY, channel))
++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel))
+ .ifPresent(context::setClientID);
+- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY, channel))
++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel))
+ .ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version)));
+
+ return context;
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+index 1eb81ce92..c671593a3 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity;
+
+ import io.netty.channel.Channel;
+ import io.netty.channel.ChannelHandlerContext;
+-import java.util.Set;
+ import org.apache.rocketmq.broker.client.ClientChannelInfo;
+ import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+ import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+ import org.apache.rocketmq.broker.client.ProducerChangeListener;
+ import org.apache.rocketmq.broker.client.ProducerGroupEvent;
++import org.apache.rocketmq.proxy.common.ProxyContext;
++import org.apache.rocketmq.proxy.processor.MessagingProcessor;
++import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
++import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
++import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+ import org.apache.rocketmq.remoting.common.RemotingHelper;
++import org.apache.rocketmq.remoting.exception.RemotingCommandException;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
++import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.remoting.protocol.RequestCode;
+ import org.apache.rocketmq.remoting.protocol.ResponseCode;
+ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
+@@ -33,13 +40,8 @@ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead
+ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
+ import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
+ import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
+-import org.apache.rocketmq.proxy.common.ProxyContext;
+-import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
+-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
++
++import java.util.Set;
+
+ public class ClientManagerActivity extends AbstractRemotingActivity {
+
+@@ -108,9 +110,9 @@ public class ClientManagerActivity extends AbstractRemotingActivity {
+ if (channel instanceof RemotingChannel) {
+ RemotingChannel remotingChannel = (RemotingChannel) channel;
+ Channel parent = remotingChannel.parent();
+- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId());
+- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
+- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion());
++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion());
+ }
+
+ }
+diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+index 663a83e3c..b2bd3a35f 100644
+--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+@@ -21,7 +21,6 @@ import io.netty.channel.Channel;
+ import io.netty.channel.ChannelFuture;
+ import io.netty.channel.ChannelHandlerContext;
+ import io.netty.channel.ChannelPromise;
+-import java.util.concurrent.CompletableFuture;
+ import org.apache.rocketmq.acl.common.AclException;
+ import org.apache.rocketmq.client.exception.MQBrokerException;
+ import org.apache.rocketmq.client.exception.MQClientException;
+@@ -35,6 +34,7 @@ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
+ import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+ import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+ import org.apache.rocketmq.remoting.common.RemotingHelper;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
+ import org.apache.rocketmq.remoting.protocol.LanguageCode;
+ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.remoting.protocol.RequestCode;
+@@ -48,6 +48,8 @@ import org.mockito.Mock;
+ import org.mockito.Spy;
+ import org.mockito.junit.MockitoJUnitRunner;
+
++import java.util.concurrent.CompletableFuture;
++
+ import static org.assertj.core.api.Assertions.assertThat;
+ import static org.mockito.ArgumentMatchers.any;
+ import static org.mockito.ArgumentMatchers.anyLong;
+@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends InitConfigTest {
+ }
+ };
+ Channel channel = ctx.channel();
+- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.CLIENT_ID_KEY, CLIENT_ID);
+- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
+- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, MQVersion.CURRENT_VERSION);
++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, CLIENT_ID);
++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA);
++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, MQVersion.CURRENT_VERSION);
+ }
+
+ @Test
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+index d0750b678..363b22eac 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener;
+ import io.netty.util.Attribute;
+ import io.netty.util.AttributeKey;
+ import org.apache.commons.lang3.StringUtils;
+-import org.apache.rocketmq.common.constant.HAProxyConstants;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.common.utils.NetworkUtil;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+@@ -31,8 +30,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
++import org.apache.rocketmq.remoting.netty.AttributeKeys;
+ import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.remoting.protocol.RequestCode;
+ import org.apache.rocketmq.remoting.protocol.ResponseCode;
+@@ -51,16 +50,6 @@ public class RemotingHelper {
+ public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+- private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
+-
+- private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+- private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
+-
+- public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
+-
+- public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
+-
+- public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
+
+ public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() {
+ {
+@@ -213,7 +202,7 @@ public class RemotingHelper {
+ if (StringUtils.isNotBlank(addr)) {
+ return addr;
+ }
+- Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
++ Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY);
+ if (att == null) {
+ // mocked in unit test
+ return parseChannelRemoteAddr0(channel);
+@@ -227,11 +216,11 @@ public class RemotingHelper {
+ }
+
+ private static String getProxyProtocolAddress(Channel channel) {
+- if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
++ if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
+ return null;
+ }
+- String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
+- String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
++ String proxyProtocolAddr = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel);
++ String proxyProtocolPort = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel);
+ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
+ return null;
+ }
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+index 4e69ab82d..ebdde31f4 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty;
+
+ import io.netty.util.AttributeKey;
+ import org.apache.rocketmq.common.constant.HAProxyConstants;
++import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+ import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+
+ public class AttributeKeys {
+
++ public static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
++
++ public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
++
++ public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
++
++ public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode");
++
+ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
+ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
+
+@@ -40,6 +49,6 @@ public class AttributeKeys {
+ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
+
+ public static AttributeKey<String> valueOf(String name) {
+- return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
++ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf);
+ }
+ }
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+index 445f06cc6..8ae87a6fa 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
+ import io.netty.channel.nio.NioEventLoopGroup;
+ import io.netty.channel.socket.SocketChannel;
+ import io.netty.channel.socket.nio.NioServerSocketChannel;
++import io.netty.handler.codec.ByteToMessageDecoder;
+ import io.netty.handler.codec.ProtocolDetectionResult;
+ import io.netty.handler.codec.ProtocolDetectionState;
+ import io.netty.handler.codec.haproxy.HAProxyMessage;
+@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import java.io.IOException;
+ import java.net.InetSocketAddress;
+ import java.security.cert.CertificateException;
++import java.util.List;
+ import java.util.NoSuchElementException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+@@ -115,7 +117,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+
+ // sharable handlers
+- private HandshakeHandler handshakeHandler;
++ private TlsModeHandler tlsModeHandler;
+ private NettyEncoder encoder;
+ private NettyConnectManageHandler connectionManageHandler;
+ private NettyServerHandler serverHandler;
+@@ -265,7 +267,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ */
+ protected ChannelPipeline configChannel(SocketChannel ch) {
+ return ch.pipeline()
+- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
++ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
+ .addLast(defaultEventExecutorGroup,
+ encoder,
+ new NettyDecoder(),
+@@ -402,7 +404,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ }
+
+ private void prepareSharableHandlers() {
+- handshakeHandler = new HandshakeHandler();
++ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
+ encoder = new NettyEncoder();
+ connectionManageHandler = new NettyConnectManageHandler();
+ serverHandler = new NettyServerHandler();
+@@ -429,10 +431,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ return defaultEventExecutorGroup;
+ }
+
+- public HandshakeHandler getHandshakeHandler() {
+- return handshakeHandler;
+- }
+-
+ public NettyEncoder getEncoder() {
+ return encoder;
+ }
+@@ -449,17 +447,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ return distributionHandler;
+ }
+
+- @ChannelHandler.Sharable
+- public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+-
+- private final TlsModeHandler tlsModeHandler;
++ public class HandshakeHandler extends ByteToMessageDecoder {
+
+ public HandshakeHandler() {
+- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
+ }
+
+ @Override
+- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
++ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
+ try {
+ ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
+ if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+@@ -479,9 +473,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ } catch (NoSuchElementException e) {
+ log.error("Error while removing HandshakeHandler", e);
+ }
+-
+- // Hand over this message to the next .
+- ctx.fireChannelRead(byteBuf.retain());
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+--
+2.32.0.windows.2
+
+
+From 1f0f3b2d6d16de7b315c702d33f7d3557c0fc25c Mon Sep 17 00:00:00 2001
+From: Ji Juntao <juntao.jjt@alibaba-inc.com>
+Date: Tue, 11 Jul 2023 21:13:06 +0800
+Subject: [PATCH 3/8] [ISSUE #7013] Polish ColdDataCheckService's logic (#7014)
+
+* polish coldCtrl
+
+* remove the catch.
+---
+ .../src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++++-
+ 1 file changed, 6 insertions(+), 1 deletion(-)
+
+diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+index 5a5c90c5a..e6ee3bacc 100644
+--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+@@ -2089,6 +2089,11 @@ public class CommitLog implements Swappable {
+ } else {
+ this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+ }
++
++ if (pageSize < 0) {
++ initPageSize();
++ }
++
+ long beginClockTimestamp = this.systemClock.now();
+ scanFilesInPageCache();
+ long costTime = this.systemClock.now() - beginClockTimestamp;
+@@ -2182,7 +2187,7 @@ public class CommitLog implements Swappable {
+ }
+
+ private void initPageSize() {
+- if (pageSize < 0) {
++ if (pageSize < 0 && defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ try {
+ if (!MixAll.isWindows()) {
+ pageSize = LibC.INSTANCE.getpagesize();
+--
+2.32.0.windows.2
+
+
+From d206590692bfdffca6bc58327e9533bc4bb68122 Mon Sep 17 00:00:00 2001
+From: Lei Zhiyuan <leizhiyuan@gmail.com>
+Date: Thu, 13 Jul 2023 11:17:38 +0800
+Subject: [PATCH 4/8] [ISSUE #6979] Fix opaque will be duplicate in multi
+ client scene (#6985)
+
+---
+ .../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++--
+ 1 file changed, 12 insertions(+), 2 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+index 1b3f0af4e..188cb7b9b 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
+ @Override
+ public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+- return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis);
++ int originalRequestOpaque = request.getOpaque();
++ request.setOpaque(RemotingCommand.createNewRequestId());
++ return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
++ request.setOpaque(originalRequestOpaque);
++ return r;
++ });
+ }
+
+ @Override
+ public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+- return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis);
++ int originalRequestOpaque = request.getOpaque();
++ request.setOpaque(RemotingCommand.createNewRequestId());
++ return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis).thenApply(r -> {
++ request.setOpaque(originalRequestOpaque);
++ return r;
++ });
+ }
+
+ @Override
+--
+2.32.0.windows.2
+
+
+From 33cb22e1c0fa7ba980567117230fe443ff5dbd62 Mon Sep 17 00:00:00 2001
+From: lizhimins <707364882@qq.com>
+Date: Thu, 13 Jul 2023 15:37:24 +0800
+Subject: [PATCH 5/8] [ISSUE #7018] fix append in tiered storage when message
+ offset incorrect (#7019)
+
+* fix append in tiered storage when message offset incorrect
+---
+ .../tieredstore/TieredDispatcher.java | 25 ++++++++++------
+ .../tieredstore/file/CompositeFlatFile.java | 30 +++++++++----------
+ .../file/CompositeQueueFlatFile.java | 2 +-
+ .../file/CompositeQueueFlatFileTest.java | 4 +--
+ 4 files changed, 34 insertions(+), 27 deletions(-)
+
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+index 2a8e2ed71..6584b0e89 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
+ doRedispatchRequestToWriteMap(
+ result, flatFile, dispatchOffset, newCommitLogOffset, size, tagCode, message.getByteBuffer());
+ message.release();
+- if (result != AppendResult.SUCCESS) {
+- dispatchOffset--;
+- break;
++
++ switch (result) {
++ case SUCCESS:
++ continue;
++ case FILE_CLOSED:
++ tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
++ logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " +
++ "topic: {}, queueId: {}", topic, queueId);
++ return;
++ default:
++ dispatchOffset--;
++ break;
+ }
+ }
+
+@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
+
+ switch (result) {
+ case SUCCESS:
+- break;
+- case OFFSET_INCORRECT:
+ long offset = MessageBufferUtil.getQueueOffset(message);
+ if (queueOffset != offset) {
+- logger.error("[Bug] Commitlog offset incorrect, " +
+- "result={}, topic={}, queueId={}, offset={}, msg offset={}",
+- result, topic, queueId, queueOffset, offset);
++ logger.error("Message cq offset in commitlog does not meet expectations, " +
++ "result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
++ AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
+ }
+- return;
++ break;
+ case BUFFER_FULL:
+ logger.debug("Commitlog buffer full, result={}, topic={}, queueId={}, offset={}",
+ result, topic, queueId, queueOffset);
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+index 1243f7721..8f8ba98b1 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.ReentrantLock;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.tuple.Pair;
+@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess {
+ * dispatched to the current chunk, indicating the progress of the message distribution.
+ * It's consume queue current offset.
+ */
+- protected volatile long dispatchOffset;
++ protected final AtomicLong dispatchOffset;
+
+ protected final ReentrantLock compositeFlatFileLock;
+ protected final TieredMessageStoreConfig storeConfig;
+@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess {
+ this.storeConfig = fileQueueFactory.getStoreConfig();
+ this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
+ this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
++ this.dispatchOffset = new AtomicLong();
+ this.compositeFlatFileLock = new ReentrantLock();
+ this.inFlightRequestMap = new ConcurrentHashMap<>();
+ this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
+@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess {
+ }
+
+ protected void recoverMetadata() {
+- if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) {
+- consumeQueue.setBaseOffset(this.dispatchOffset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
++ if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
++ consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ }
+ }
+
+@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess {
+ }
+
+ public long getDispatchOffset() {
+- return dispatchOffset;
++ return dispatchOffset.get();
+ }
+
+ @Override
+@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess {
+ if (!consumeQueue.isInitialized()) {
+ consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ }
+- dispatchOffset = offset;
++ dispatchOffset.set(offset);
+ }
+
+ @Override
+@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess {
+ return AppendResult.FILE_CLOSED;
+ }
+
+- long queueOffset = MessageBufferUtil.getQueueOffset(message);
+- if (dispatchOffset != queueOffset) {
+- return AppendResult.OFFSET_INCORRECT;
+- }
+-
+ AppendResult result = commitLog.append(message, commit);
+ if (result == AppendResult.SUCCESS) {
+- dispatchOffset = queueOffset + 1;
++ dispatchOffset.incrementAndGet();
+ }
+ return result;
+ }
+@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess {
+ }
+
+ public void destroy() {
+- closed = true;
+- commitLog.destroy();
+- consumeQueue.destroy();
+ try {
++ closed = true;
++ compositeFlatFileLock.lock();
++ commitLog.destroy();
++ consumeQueue.destroy();
+ metadataStore.deleteFileSegment(filePath, FileSegmentType.COMMIT_LOG);
+ metadataStore.deleteFileSegment(filePath, FileSegmentType.CONSUME_QUEUE);
+ } catch (Exception e) {
+- LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", e);
++ LOGGER.error("CompositeFlatFile#destroy: delete file failed", e);
++ } finally {
++ compositeFlatFileLock.unlock();
+ }
+ }
+ }
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+index c0cf79069..f6c0afed0 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
+ if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
+ queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
+ }
+- this.dispatchOffset = queueMetadata.getMaxOffset();
++ this.dispatchOffset.set(queueMetadata.getMaxOffset());
+ }
+
+ public void persistMetadata() {
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+index 9735a535e..8322c72ed 100644
+--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest {
+ CompositeQueueFlatFile flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
+ ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
+ AppendResult result = flatFile.appendCommitLog(message);
+- Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
+- Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
++ Assert.assertEquals(AppendResult.SUCCESS, result);
++ Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+ Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
+
+ flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
+--
+2.32.0.windows.2
+
+
+From 70a66eda2c08eb5fca38356659cb6de1ac75e25e Mon Sep 17 00:00:00 2001
+From: ShuangxiDing <dingshuangxi888@gmail.com>
+Date: Fri, 14 Jul 2023 16:46:40 +0800
+Subject: [PATCH 6/8] Fix LEAK: HAProxyMessage.release() was not called before
+ it's garbage-collected (#7025)
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+Call HAProxyMessage.release() after reading it.
+
+---------
+
+Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
+---
+ .../grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++---------
+ .../remoting/netty/NettyRemotingServer.java | 46 ++++++++--------
+ 2 files changed, 53 insertions(+), 45 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+index ceb9becc0..ee167bd7b 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof HAProxyMessage) {
+- replaceEventWithMessage((HAProxyMessage) msg);
++ handleWithMessage((HAProxyMessage) msg);
+ ctx.fireUserEventTriggered(pne);
+ } else {
+ super.channelRead(ctx, msg);
+@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator
+ *
+ * @param msg
+ */
+- private void replaceEventWithMessage(HAProxyMessage msg) {
+- Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
+- if (StringUtils.isNotBlank(msg.sourceAddress())) {
+- builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
+- }
+- if (msg.sourcePort() > 0) {
+- builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
+- }
+- if (StringUtils.isNotBlank(msg.destinationAddress())) {
+- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
+- }
+- if (msg.destinationPort() > 0) {
+- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
+- }
+- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+- msg.tlvs().forEach(tlv -> {
+- Attributes.Key<String> key = AttributeKeys.valueOf(
+- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+- builder.set(key, value);
+- });
++ private void handleWithMessage(HAProxyMessage msg) {
++ try {
++ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
++ if (StringUtils.isNotBlank(msg.sourceAddress())) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
++ }
++ if (msg.sourcePort() > 0) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
++ }
++ if (StringUtils.isNotBlank(msg.destinationAddress())) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
++ }
++ if (msg.destinationPort() > 0) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
++ }
++ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
++ msg.tlvs().forEach(tlv -> {
++ Attributes.Key<String> key = AttributeKeys.valueOf(
++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
++ builder.set(key, value);
++ });
++ }
++ pne = InternalProtocolNegotiationEvent
++ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
++ } finally {
++ msg.release();
+ }
+- pne = InternalProtocolNegotiationEvent
+- .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+ }
+ }
+
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+index 8ae87a6fa..90e358ce3 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+@@ -758,7 +758,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof HAProxyMessage) {
+- fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
++ handleWithMessage((HAProxyMessage) msg, ctx.channel());
+ } else {
+ super.channelRead(ctx, msg);
+ }
+@@ -771,26 +771,30 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ * @param msg
+ * @param channel
+ */
+- private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
+- if (StringUtils.isNotBlank(msg.sourceAddress())) {
+- channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
+- }
+- if (msg.sourcePort() > 0) {
+- channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
+- }
+- if (StringUtils.isNotBlank(msg.destinationAddress())) {
+- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
+- }
+- if (msg.destinationPort() > 0) {
+- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
+- }
+- if (CollectionUtils.isNotEmpty(msg.tlvs())) {
+- msg.tlvs().forEach(tlv -> {
+- AttributeKey<String> key = AttributeKeys.valueOf(
+- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
+- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
+- channel.attr(key).set(value);
+- });
++ private void handleWithMessage(HAProxyMessage msg, Channel channel) {
++ try {
++ if (StringUtils.isNotBlank(msg.sourceAddress())) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
++ }
++ if (msg.sourcePort() > 0) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
++ }
++ if (StringUtils.isNotBlank(msg.destinationAddress())) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
++ }
++ if (msg.destinationPort() > 0) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
++ }
++ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
++ msg.tlvs().forEach(tlv -> {
++ AttributeKey<String> key = AttributeKeys.valueOf(
++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
++ channel.attr(key).set(value);
++ });
++ }
++ } finally {
++ msg.release();
+ }
+ }
+ }
+--
+2.32.0.windows.2
+
+
+From 5914ff8dbb9d37e2cb48ef9c4f0256c6185b4659 Mon Sep 17 00:00:00 2001
+From: lyx <56945247+lyx2000@users.noreply.github.com>
+Date: Sat, 15 Jul 2023 18:15:57 +0800
+Subject: [PATCH 7/8] [ISSUE #6968] fix grpc acl bug (#6969)
+
+* feat(acl): fix acl bug
+
+Signed-off-by: lyx <1419360299@qq.com>
+
+# Conflicts:
+# proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+
+* add access test for two client
+
+Signed-off-by: lyx <1419360299@qq.com>
+
+* use specific acl config
+
+Signed-off-by: lyx <1419360299@qq.com>
+
+* Recovering unchange file
+
+Signed-off-by: lyx <1419360299@qq.com>
+
+* let test pass
+
+Signed-off-by: lyx <1419360299@qq.com>
+
+---------
+
+Signed-off-by: lyx <1419360299@qq.com>
+---
+ .../acl/plain/PlainAccessResource.java | 21 +-
+ .../acl/RemotingClientAccessTest.java | 189 ++++++++++++++++++
+ .../access_acl_conf/acl/plain_acl.yml | 31 +++
+ acl/src/test/resources/conf/acl/plain_acl.yml | 1 -
+ pom.xml | 1 +
+ .../apache/rocketmq/proxy/ProxyStartup.java | 17 +-
+ .../proxy/grpc/GrpcServerBuilder.java | 10 +-
+ .../remoting/RemotingProtocolServer.java | 15 +-
+ 8 files changed, 255 insertions(+), 30 deletions(-)
+ create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
+ create mode 100644 acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
+
+diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+index cdbd9ea9b..72aa8ca71 100644
+--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
++++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+@@ -223,7 +223,7 @@ public class PlainAccessResource implements AccessResource {
+ if (!request.hasGroup()) {
+ throw new AclException("Consumer heartbeat doesn't have group");
+ } else {
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ }
+ }
+ } else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+@@ -240,15 +240,15 @@ public class PlainAccessResource implements AccessResource {
+ accessResource.addResourceAndPerm(topic, Permission.PUB);
+ } else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getMessageQueue().getTopic(), Permission.SUB);
+ } else if (AckMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ AckMessageRequest request = (AckMessageRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+ } else if (ForwardMessageToDeadLetterQueueRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ ForwardMessageToDeadLetterQueueRequest request = (ForwardMessageToDeadLetterQueueRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+ } else if (EndTransactionRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ EndTransactionRequest request = (EndTransactionRequest) messageV3;
+@@ -264,7 +264,7 @@ public class PlainAccessResource implements AccessResource {
+ }
+ if (command.getSettings().hasSubscription()) {
+ Subscription subscription = command.getSettings().getSubscription();
+- accessResource.addResourceAndPerm(subscription.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(subscription.getGroup(), Permission.SUB);
+ for (SubscriptionEntry entry : subscription.getSubscriptionsList()) {
+ accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
+ }
+@@ -275,17 +275,17 @@ public class PlainAccessResource implements AccessResource {
+ }
+ } else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ } else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryRouteRequest request = (QueryRouteRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
+ } else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+ } else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3;
+- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB);
++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB);
+ }
+ } catch (Throwable t) {
+@@ -299,6 +299,11 @@ public class PlainAccessResource implements AccessResource {
+ addResourceAndPerm(resourceName, permission);
+ }
+
++ private void addGroupResourceAndPerm(Resource resource, byte permission) {
++ String resourceName = NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName());
++ addResourceAndPerm(getRetryTopic(resourceName), permission);
++ }
++
+ public static PlainAccessResource build(PlainAccessConfig plainAccessConfig, RemoteAddressStrategy remoteAddressStrategy) {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
+diff --git a/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
+new file mode 100644
+index 000000000..88c5e09a9
+--- /dev/null
++++ b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java
+@@ -0,0 +1,189 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.acl;
++
++import java.io.File;
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import org.apache.rocketmq.acl.common.AclClientRPCHook;
++import org.apache.rocketmq.acl.common.AclException;
++import org.apache.rocketmq.acl.common.SessionCredentials;
++import org.apache.rocketmq.acl.plain.AclTestHelper;
++import org.apache.rocketmq.acl.plain.PlainAccessResource;
++import org.apache.rocketmq.acl.plain.PlainAccessValidator;
++import org.apache.rocketmq.remoting.exception.RemotingCommandException;
++import org.apache.rocketmq.remoting.protocol.RemotingCommand;
++import org.apache.rocketmq.remoting.protocol.RequestCode;
++import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
++import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++public class RemotingClientAccessTest {
++
++ private PlainAccessValidator plainAccessValidator;
++ private AclClientRPCHook aclClient;
++ private SessionCredentials sessionCredentials;
++
++ private File confHome;
++
++ private String clientAddress = "10.7.1.3";
++
++ @Before
++ public void init() throws IOException {
++ String folder = "access_acl_conf";
++ confHome = AclTestHelper.copyResources(folder, true);
++ System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
++ System.setProperty("rocketmq.acl.plain.file", "/access_acl_conf/acl/plain_acl.yml".replace("/", File.separator));
++
++ plainAccessValidator = new PlainAccessValidator();
++ sessionCredentials = new SessionCredentials();
++ sessionCredentials.setAccessKey("rocketmq3");
++ sessionCredentials.setSecretKey("12345678");
++ aclClient = new AclClientRPCHook(sessionCredentials);
++ }
++
++ @After
++ public void cleanUp() {
++ AclTestHelper.recursiveDelete(confHome);
++ }
++
++ @Test(expected = AclException.class)
++ public void testProduceDenyTopic() {
++ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
++ messageRequestHeader.setTopic("topicD");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
++ aclClient.doBeforeRequest(clientAddress, remotingCommand);
++
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++ }
++
++ @Test
++ public void testProduceAuthorizedTopic() {
++ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
++ messageRequestHeader.setTopic("topicA");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
++ aclClient.doBeforeRequest(clientAddress, remotingCommand);
++
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++ }
++
++
++ @Test(expected = AclException.class)
++ public void testConsumeDenyTopic() {
++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
++ pullMessageRequestHeader.setTopic("topicD");
++ pullMessageRequestHeader.setConsumerGroup("groupB");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
++ aclClient.doBeforeRequest("", remotingCommand);
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++
++ }
++
++ @Test
++ public void testConsumeAuthorizedTopic() {
++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
++ pullMessageRequestHeader.setTopic("topicB");
++ pullMessageRequestHeader.setConsumerGroup("groupB");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
++ aclClient.doBeforeRequest("", remotingCommand);
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++ }
++
++ @Test(expected = AclException.class)
++ public void testConsumeInDeniedGroup() {
++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
++ pullMessageRequestHeader.setTopic("topicB");
++ pullMessageRequestHeader.setConsumerGroup("groupD");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
++ aclClient.doBeforeRequest("", remotingCommand);
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++ }
++
++ @Test
++ public void testConsumeInAuthorizedGroup() {
++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
++ pullMessageRequestHeader.setTopic("topicB");
++ pullMessageRequestHeader.setConsumerGroup("groupB");
++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
++ aclClient.doBeforeRequest("", remotingCommand);
++ ByteBuffer buf = remotingCommand.encodeHeader();
++ buf.getInt();
++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
++ buf.position(0);
++ try {
++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
++ plainAccessValidator.validate(accessResource);
++ } catch (RemotingCommandException e) {
++ e.printStackTrace();
++ Assert.fail("Should not throw IOException");
++ }
++ }
++
++}
+diff --git a/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
+new file mode 100644
+index 000000000..28a8c4888
+--- /dev/null
++++ b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml
+@@ -0,0 +1,31 @@
++# Licensed to the Apache Software Foundation (ASF) under one or more
++# contributor license agreements. See the NOTICE file distributed with
++# this work for additional information regarding copyright ownership.
++# The ASF licenses this file to You under the Apache License, Version 2.0
++# (the "License"); you may not use this file except in compliance with
++# the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++accounts:
++ - accessKey: rocketmq3
++ secretKey: 12345678
++ admin: false
++ defaultTopicPerm: DENY
++ defaultGroupPerm: DENY
++ topicPerms:
++ - topicA=PUB
++ - topicB=SUB
++ - topicC=PUB|SUB
++ - topicD=DENY
++ groupPerms:
++ - groupB=SUB
++ - groupC=PUB|SUB
++ - groupD=DENY
++
+diff --git a/acl/src/test/resources/conf/acl/plain_acl.yml b/acl/src/test/resources/conf/acl/plain_acl.yml
+index 5641a94bf..34e46696d 100644
+--- a/acl/src/test/resources/conf/acl/plain_acl.yml
++++ b/acl/src/test/resources/conf/acl/plain_acl.yml
+@@ -41,4 +41,3 @@ accounts:
+ whiteRemoteAddress: 192.168.1.*
+ # if it is admin, it could access all resources
+ admin: true
+-
+diff --git a/pom.xml b/pom.xml
+index 12bc2dbd5..4d5dd1dec 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -147,6 +147,7 @@
+ <awaitility.version>4.1.0</awaitility.version>
+ <truth.version>0.30</truth.version>
+ <s3mock-junit4.version>2.11.0</s3mock-junit4.version>
++ <rocketmq-client-java.version>5.0.5</rocketmq-client-java.version>
+
+ <!-- Build plugin dependencies -->
+ <versions-maven-plugin.version>2.2</versions-maven-plugin.version>
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+index ea13bb808..06d5f4525 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+@@ -29,11 +29,14 @@ import org.apache.commons.cli.DefaultParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.acl.AccessValidator;
++import org.apache.rocketmq.acl.plain.PlainAccessValidator;
+ import org.apache.rocketmq.broker.BrokerController;
+ import org.apache.rocketmq.broker.BrokerStartup;
+ import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
++import org.apache.rocketmq.common.utils.ServiceProvider;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+@@ -75,16 +78,17 @@ public class ProxyStartup {
+
+ MessagingProcessor messagingProcessor = createMessagingProcessor();
+
++ List<AccessValidator> accessValidators = loadAccessValidators();
+ // create grpcServer
+ GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort())
+ .addService(createServiceProcessor(messagingProcessor))
+ .addService(ChannelzService.newInstance(100))
+ .addService(ProtoReflectionService.newInstance())
+- .configInterceptor()
++ .configInterceptor(accessValidators)
+ .build();
+ PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
+
+- RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor);
++ RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators);
+ PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
+
+ // start servers one by one.
+@@ -109,6 +113,15 @@ public class ProxyStartup {
+ log.info(new Date() + " rocketmq-proxy startup successfully");
+ }
+
++ protected static List<AccessValidator> loadAccessValidators() {
++ List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
++ if (accessValidators.isEmpty()) {
++ log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
++ accessValidators.add(new PlainAccessValidator());
++ }
++ return accessValidators;
++ }
++
+ protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception {
+ if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
+ System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+index 437b9216b..9cddd3013 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+@@ -28,9 +28,7 @@ import java.util.List;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ import org.apache.rocketmq.acl.AccessValidator;
+-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
+ import org.apache.rocketmq.common.constant.LoggerName;
+-import org.apache.rocketmq.common.utils.ServiceProvider;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.proxy.config.ConfigurationManager;
+@@ -98,14 +96,8 @@ public class GrpcServerBuilder {
+ return new GrpcServer(this.serverBuilder.build());
+ }
+
+- public GrpcServerBuilder configInterceptor() {
++ public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) {
+ // grpc interceptors, including acl, logging etc.
+- List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
+- if (accessValidators.isEmpty()) {
+- log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
+- accessValidators.add(new PlainAccessValidator());
+- }
+-
+ this.serverBuilder.intercept(new AuthenticationInterceptor(accessValidators));
+
+ this.serverBuilder
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+index f08094c16..bcc9edd09 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting;
+
+ import com.google.common.util.concurrent.ThreadFactoryBuilder;
+ import io.netty.channel.Channel;
+-import java.util.ArrayList;
+ import java.util.List;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.CompletableFuture;
+@@ -28,15 +27,14 @@ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ import org.apache.rocketmq.acl.AccessValidator;
+-import org.apache.rocketmq.acl.plain.PlainAccessValidator;
+ import org.apache.rocketmq.client.exception.MQClientException;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.common.future.FutureTaskExt;
+ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+ import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
++import org.apache.rocketmq.common.utils.StartAndShutdown;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+-import org.apache.rocketmq.common.utils.StartAndShutdown;
+ import org.apache.rocketmq.proxy.config.ConfigurationManager;
+ import org.apache.rocketmq.proxy.config.ProxyConfig;
+ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+@@ -86,11 +84,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
+ protected final ThreadPoolExecutor defaultExecutor;
+ protected final ScheduledExecutorService timerExecutor;
+
+- public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
++ public RemotingProtocolServer(MessagingProcessor messagingProcessor, List<AccessValidator> accessValidators) {
+ this.messagingProcessor = messagingProcessor;
+ this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService());
+
+- RequestPipeline pipeline = createRequestPipeline();
++ RequestPipeline pipeline = createRequestPipeline(accessValidators);
+ this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor);
+ this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager);
+ this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor);
+@@ -254,15 +252,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
+ return future;
+ }
+
+- protected RequestPipeline createRequestPipeline() {
++ protected RequestPipeline createRequestPipeline(List<AccessValidator> accessValidators) {
+ RequestPipeline pipeline = (ctx, request, context) -> {
+ };
+-
+- List<AccessValidator> accessValidatorList = new ArrayList<>();
+- accessValidatorList.add(new PlainAccessValidator());
+ // add pipeline
+ // the last pipe add will execute at the first
+- return pipeline.pipe(new AuthenticationPipeline(accessValidatorList));
++ return pipeline.pipe(new AuthenticationPipeline(accessValidators));
+ }
+
+ protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor {
+--
+2.32.0.windows.2
+
+
+From 440be1ed4ce2af0ab58af6c3019de7075c09c20f Mon Sep 17 00:00:00 2001
+From: fuyou001 <yubao.fyb@alibaba-inc.com>
+Date: Mon, 17 Jul 2023 19:23:23 +0800
+Subject: [PATCH 8/8] [ISSUE #7031] fix Pop caused broker memory leak bug
+ (#7032)
+
+---
+ .../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++-
+ .../broker/processor/PopMessageProcessor.java | 11 ++++++-----
+ 2 files changed, 22 insertions(+), 6 deletions(-)
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+index d7bc7c694..b7ba8ad4a 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread {
+ * @param nextBeginOffset
+ * @return
+ */
+- public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
++ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
+ PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true);
+
++ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
++ // when mergeKey conflict
++ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
++ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
++ return false;
++ }
++
+ this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper));
+
+ putOffsetQueue(pointWrapper);
+@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread {
+ if (brokerController.getBrokerConfig().isEnablePopLog()) {
+ POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper);
+ }
++ return true;
+ }
+
+ public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime,
+@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread {
+ return false;
+ }
+
++ if (this.buffer.containsKey(pointWrapper.getMergeKey())) {
++ // when mergeKey conflict
++ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper
++ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey());
++ return false;
++ }
++
+ putOffsetQueue(pointWrapper);
+ this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
+ this.counter.incrementAndGet();
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+index 28549bfed..464f8f4fd 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+@@ -570,7 +570,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
+ this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ requestHeader.getConsumerGroup(), topic, queueId, finalOffset);
+ } else {
+- appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName());
++ if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) {
++ return atomicRestNum.get() + result.getMessageCount();
++ }
+ }
+ ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset);
+ ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId,
+@@ -685,7 +687,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
+ return msgInner;
+ }
+
+- private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
++ private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader,
+ final String topic, final int reviveQid, final int queueId, final long offset,
+ final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) {
+ // add check point msg to revive log
+@@ -708,10 +710,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
+ );
+
+ if (addBufferSuc) {
+- return;
++ return true;
+ }
+-
+- this.popBufferMergeService.addCkJustOffset(
++ return this.popBufferMergeService.addCkJustOffset(
+ ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
+ );
+ }
+--
+2.32.0.windows.2
+