diff options
Diffstat (limited to 'patch005-backport-fix-some-bugs.patch')
-rw-r--r-- | patch005-backport-fix-some-bugs.patch | 1538 |
1 files changed, 1538 insertions, 0 deletions
diff --git a/patch005-backport-fix-some-bugs.patch b/patch005-backport-fix-some-bugs.patch new file mode 100644 index 0000000..a75556f --- /dev/null +++ b/patch005-backport-fix-some-bugs.patch @@ -0,0 +1,1538 @@ +From 15c6889bb0abd014c06ef1452f791db9daa1ea08 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Tue, 11 Jul 2023 17:04:00 +0800 +Subject: [PATCH 1/8] fix receive message activity attempt id not correct + (#7012) + +fix receive message activity attempt id not correct +--- + .../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +- + .../proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++--- + 2 files changed, 4 insertions(+), 4 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +index a504179a9..cf58bb87a 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +@@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity { + subscriptionData, + fifo, + new PopMessageResultFilterImpl(maxAttempts), +- request.getAttemptId(), ++ request.hasAttemptId() ? request.getAttemptId() : null, + timeRemaining + ).thenAccept(popResult -> { + if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) { +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +index 2e562504a..7fd9a9ffd 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; + import static org.mockito.ArgumentMatchers.anyInt; + import static org.mockito.ArgumentMatchers.anyLong; + import static org.mockito.ArgumentMatchers.anyString; ++import static org.mockito.ArgumentMatchers.isNull; + import static org.mockito.Mockito.doNothing; + import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.when; +@@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + .setRequestTimeout(Durations.fromSeconds(3)) + .build()); + when(this.messagingProcessor.popMessage(any(), any(), anyString(), anyString(), anyInt(), anyLong(), +- pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), anyString(), anyLong())) ++ pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + +@@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues())); + } + +- + @Test + public void testReceiveMessage() { + StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class); +@@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + any(), + anyBoolean(), + any(), +- anyString(), ++ isNull(), + anyLong())).thenReturn(CompletableFuture.completedFuture(popResult)); + + this.receiveMessageActivity.receiveMessage( +-- +2.32.0.windows.2 + + +From b4496be68705c1c0b282a07a1adeab4fffd670fe Mon Sep 17 00:00:00 2001 +From: ShuangxiDing <dingshuangxi888@gmail.com> +Date: Tue, 11 Jul 2023 19:09:09 +0800 +Subject: [PATCH 2/8] [ISSUE #7010] Fix the HandshakeHandler returns when + detect haproxy version need more data (#7011) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator + +* Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC server. + +* Support proxy protocol for gRPC and Remoting server. + +* 回滚netty的升级 + +* Support proxy protocol for gRPC and Remoting server. + +* Support proxy protocol for gRPC and Remoting server. + +* Support proxy protocol for gRPC and Remoting server. + +* add grpc-netty-codec-haproxy in bazel + +* add grpc-netty-codec-haproxy in bazel + +* Support proxy protocol for gRPC and Remoting server. + +* Fix Test + +* add grpc-netty-codec-haproxy in bazel + +* add ProxyProtocolTest for Remoting + +* Move AttributeKey from RemotingHelper to AttributeKey. + +* Fix the needs more data for HandshakeHandler. + +* Fix the needs more data for HandshakeHandler. + +* Fix the needs more data for HandshakeHandler. + +* Fix the needs more data for HandshakeHandler. + +--------- + +Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com> +--- + .../remoting/MultiProtocolRemotingServer.java | 2 +- + .../activity/AbstractRemotingActivity.java | 16 +++++++------ + .../activity/ClientManagerActivity.java | 24 ++++++++++--------- + .../AbstractRemotingActivityTest.java | 10 ++++---- + .../remoting/common/RemotingHelper.java | 21 ++++------------ + .../remoting/netty/AttributeKeys.java | 11 ++++++++- + .../remoting/netty/NettyRemotingServer.java | 23 ++++++------------ + 7 files changed, 51 insertions(+), 56 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +index 858b1f022..12d728fff 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +@@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer { + @Override + protected ChannelPipeline configChannel(SocketChannel ch) { + return ch.pipeline() +- .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler()) ++ .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) + .addLast(this.getDefaultEventExecutorGroup(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new ProtocolNegotiationHandler(this.remotingProtocolHandler) +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +index 78cd203ec..ce4a63397 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +@@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity; + + import io.netty.channel.Channel; + import io.netty.channel.ChannelHandlerContext; +-import java.util.HashMap; +-import java.util.Map; +-import java.util.Optional; + import org.apache.rocketmq.acl.common.AclException; + import org.apache.rocketmq.client.exception.MQBrokerException; + import org.apache.rocketmq.client.exception.MQClientException; +@@ -41,11 +38,16 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor; + import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; + import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; + import org.apache.rocketmq.remoting.common.RemotingHelper; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; + import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + ++import java.util.HashMap; ++import java.util.Map; ++import java.util.Optional; ++ + public abstract class AbstractRemotingActivity implements NettyRequestProcessor { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final MessagingProcessor messagingProcessor; +@@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor + .setProtocolType(ChannelProtocolType.REMOTING.getName()) + .setChannel(channel) + .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) +- .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); ++ .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + +- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY, channel)) ++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel)) + .ifPresent(language -> context.setLanguage(language.name())); +- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY, channel)) ++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel)) + .ifPresent(context::setClientID); +- Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY, channel)) ++ Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel)) + .ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version))); + + return context; +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +index 1eb81ce92..c671593a3 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +@@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity; + + import io.netty.channel.Channel; + import io.netty.channel.ChannelHandlerContext; +-import java.util.Set; + import org.apache.rocketmq.broker.client.ClientChannelInfo; + import org.apache.rocketmq.broker.client.ConsumerGroupEvent; + import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; + import org.apache.rocketmq.broker.client.ProducerChangeListener; + import org.apache.rocketmq.broker.client.ProducerGroupEvent; ++import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.processor.MessagingProcessor; ++import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; ++import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; ++import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; + import org.apache.rocketmq.remoting.common.RemotingHelper; ++import org.apache.rocketmq.remoting.exception.RemotingCommandException; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; ++import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; +@@ -33,13 +40,8 @@ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead + import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; + import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; + import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData; +-import org.apache.rocketmq.proxy.common.ProxyContext; +-import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; +-import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; +-import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +-import org.apache.rocketmq.remoting.exception.RemotingCommandException; +-import org.apache.rocketmq.remoting.protocol.RemotingCommand; ++ ++import java.util.Set; + + public class ClientManagerActivity extends AbstractRemotingActivity { + +@@ -108,9 +110,9 @@ public class ClientManagerActivity extends AbstractRemotingActivity { + if (channel instanceof RemotingChannel) { + RemotingChannel remotingChannel = (RemotingChannel) channel; + Channel parent = remotingChannel.parent(); +- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId()); +- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage()); +- RemotingHelper.setPropertyToAttr(parent, RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion()); ++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId()); ++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage()); ++ RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion()); + } + + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java +index 663a83e3c..b2bd3a35f 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java +@@ -21,7 +21,6 @@ import io.netty.channel.Channel; + import io.netty.channel.ChannelFuture; + import io.netty.channel.ChannelHandlerContext; + import io.netty.channel.ChannelPromise; +-import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.acl.common.AclException; + import org.apache.rocketmq.client.exception.MQBrokerException; + import org.apache.rocketmq.client.exception.MQClientException; +@@ -35,6 +34,7 @@ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; + import org.apache.rocketmq.proxy.service.channel.SimpleChannel; + import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; + import org.apache.rocketmq.remoting.common.RemotingHelper; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; + import org.apache.rocketmq.remoting.protocol.LanguageCode; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; +@@ -48,6 +48,8 @@ import org.mockito.Mock; + import org.mockito.Spy; + import org.mockito.junit.MockitoJUnitRunner; + ++import java.util.concurrent.CompletableFuture; ++ + import static org.assertj.core.api.Assertions.assertThat; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyLong; +@@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends InitConfigTest { + } + }; + Channel channel = ctx.channel(); +- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.CLIENT_ID_KEY, CLIENT_ID); +- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA); +- RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, MQVersion.CURRENT_VERSION); ++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, CLIENT_ID); ++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA); ++ RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, MQVersion.CURRENT_VERSION); + } + + @Test +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +index d0750b678..363b22eac 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener; + import io.netty.util.Attribute; + import io.netty.util.AttributeKey; + import org.apache.commons.lang3.StringUtils; +-import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.logging.org.slf4j.Logger; +@@ -31,8 +30,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; + import org.apache.rocketmq.remoting.exception.RemotingConnectException; + import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; + import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; ++import org.apache.rocketmq.remoting.netty.AttributeKeys; + import org.apache.rocketmq.remoting.netty.NettySystemConfig; +-import org.apache.rocketmq.remoting.protocol.LanguageCode; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; +@@ -51,16 +50,6 @@ public class RemotingHelper { + public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0"; + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); +- private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); +- +- private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); +- private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); +- +- public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); +- +- public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version"); +- +- public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode"); + + public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() { + { +@@ -213,7 +202,7 @@ public class RemotingHelper { + if (StringUtils.isNotBlank(addr)) { + return addr; + } +- Attribute<String> att = channel.attr(REMOTE_ADDR_KEY); ++ Attribute<String> att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY); + if (att == null) { + // mocked in unit test + return parseChannelRemoteAddr0(channel); +@@ -227,11 +216,11 @@ public class RemotingHelper { + } + + private static String getProxyProtocolAddress(Channel channel) { +- if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) { ++ if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { + return null; + } +- String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel); +- String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel); ++ String proxyProtocolAddr = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel); ++ String proxyProtocolPort = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel); + if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) { + return null; + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java +index 4e69ab82d..ebdde31f4 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java +@@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty; + + import io.netty.util.AttributeKey; + import org.apache.rocketmq.common.constant.HAProxyConstants; ++import org.apache.rocketmq.remoting.protocol.LanguageCode; + + import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; + + public class AttributeKeys { + ++ public static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); ++ ++ public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); ++ ++ public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version"); ++ ++ public static final AttributeKey<LanguageCode> LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode"); ++ + public static final AttributeKey<String> PROXY_PROTOCOL_ADDR = + AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); + +@@ -40,6 +49,6 @@ public class AttributeKeys { + private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>(); + + public static AttributeKey<String> valueOf(String name) { +- return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf); ++ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf); + } + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 445f06cc6..8ae87a6fa 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel; + import io.netty.channel.nio.NioEventLoopGroup; + import io.netty.channel.socket.SocketChannel; + import io.netty.channel.socket.nio.NioServerSocketChannel; ++import io.netty.handler.codec.ByteToMessageDecoder; + import io.netty.handler.codec.ProtocolDetectionResult; + import io.netty.handler.codec.ProtocolDetectionState; + import io.netty.handler.codec.haproxy.HAProxyMessage; +@@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import java.io.IOException; + import java.net.InetSocketAddress; + import java.security.cert.CertificateException; ++import java.util.List; + import java.util.NoSuchElementException; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +@@ -115,7 +117,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + + // sharable handlers +- private HandshakeHandler handshakeHandler; ++ private TlsModeHandler tlsModeHandler; + private NettyEncoder encoder; + private NettyConnectManageHandler connectionManageHandler; + private NettyServerHandler serverHandler; +@@ -265,7 +267,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + */ + protected ChannelPipeline configChannel(SocketChannel ch) { + return ch.pipeline() +- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) ++ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) + .addLast(defaultEventExecutorGroup, + encoder, + new NettyDecoder(), +@@ -402,7 +404,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + + private void prepareSharableHandlers() { +- handshakeHandler = new HandshakeHandler(); ++ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); + encoder = new NettyEncoder(); + connectionManageHandler = new NettyConnectManageHandler(); + serverHandler = new NettyServerHandler(); +@@ -429,10 +431,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + return defaultEventExecutorGroup; + } + +- public HandshakeHandler getHandshakeHandler() { +- return handshakeHandler; +- } +- + public NettyEncoder getEncoder() { + return encoder; + } +@@ -449,17 +447,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + return distributionHandler; + } + +- @ChannelHandler.Sharable +- public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { +- +- private final TlsModeHandler tlsModeHandler; ++ public class HandshakeHandler extends ByteToMessageDecoder { + + public HandshakeHandler() { +- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); + } + + @Override +- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) { ++ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { + try { + ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf); + if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { +@@ -479,9 +473,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } catch (NoSuchElementException e) { + log.error("Error while removing HandshakeHandler", e); + } +- +- // Hand over this message to the next . +- ctx.fireChannelRead(byteBuf.retain()); + } catch (Exception e) { + log.error("process proxy protocol negotiator failed.", e); + throw e; +-- +2.32.0.windows.2 + + +From 1f0f3b2d6d16de7b315c702d33f7d3557c0fc25c Mon Sep 17 00:00:00 2001 +From: Ji Juntao <juntao.jjt@alibaba-inc.com> +Date: Tue, 11 Jul 2023 21:13:06 +0800 +Subject: [PATCH 3/8] [ISSUE #7013] Polish ColdDataCheckService's logic (#7014) + +* polish coldCtrl + +* remove the catch. +--- + .../src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++++- + 1 file changed, 6 insertions(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +index 5a5c90c5a..e6ee3bacc 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ++++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +@@ -2089,6 +2089,11 @@ public class CommitLog implements Swappable { + } else { + this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs()); + } ++ ++ if (pageSize < 0) { ++ initPageSize(); ++ } ++ + long beginClockTimestamp = this.systemClock.now(); + scanFilesInPageCache(); + long costTime = this.systemClock.now() - beginClockTimestamp; +@@ -2182,7 +2187,7 @@ public class CommitLog implements Swappable { + } + + private void initPageSize() { +- if (pageSize < 0) { ++ if (pageSize < 0 && defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) { + try { + if (!MixAll.isWindows()) { + pageSize = LibC.INSTANCE.getpagesize(); +-- +2.32.0.windows.2 + + +From d206590692bfdffca6bc58327e9533bc4bb68122 Mon Sep 17 00:00:00 2001 +From: Lei Zhiyuan <leizhiyuan@gmail.com> +Date: Thu, 13 Jul 2023 11:17:38 +0800 +Subject: [PATCH 4/8] [ISSUE #6979] Fix opaque will be duplicate in multi + client scene (#6985) + +--- + .../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++-- + 1 file changed, 12 insertions(+), 2 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +index 1b3f0af4e..188cb7b9b 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen + @Override + public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { +- return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis); ++ int originalRequestOpaque = request.getOpaque(); ++ request.setOpaque(RemotingCommand.createNewRequestId()); ++ return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis).thenApply(r -> { ++ request.setOpaque(originalRequestOpaque); ++ return r; ++ }); + } + + @Override + public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { +- return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis); ++ int originalRequestOpaque = request.getOpaque(); ++ request.setOpaque(RemotingCommand.createNewRequestId()); ++ return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis).thenApply(r -> { ++ request.setOpaque(originalRequestOpaque); ++ return r; ++ }); + } + + @Override +-- +2.32.0.windows.2 + + +From 33cb22e1c0fa7ba980567117230fe443ff5dbd62 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Thu, 13 Jul 2023 15:37:24 +0800 +Subject: [PATCH 5/8] [ISSUE #7018] fix append in tiered storage when message + offset incorrect (#7019) + +* fix append in tiered storage when message offset incorrect +--- + .../tieredstore/TieredDispatcher.java | 25 ++++++++++------ + .../tieredstore/file/CompositeFlatFile.java | 30 +++++++++---------- + .../file/CompositeQueueFlatFile.java | 2 +- + .../file/CompositeQueueFlatFileTest.java | 4 +-- + 4 files changed, 34 insertions(+), 27 deletions(-) + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index 2a8e2ed71..6584b0e89 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + doRedispatchRequestToWriteMap( + result, flatFile, dispatchOffset, newCommitLogOffset, size, tagCode, message.getByteBuffer()); + message.release(); +- if (result != AppendResult.SUCCESS) { +- dispatchOffset--; +- break; ++ ++ switch (result) { ++ case SUCCESS: ++ continue; ++ case FILE_CLOSED: ++ tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue()); ++ logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " + ++ "topic: {}, queueId: {}", topic, queueId); ++ return; ++ default: ++ dispatchOffset--; ++ break; + } + } + +@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + + switch (result) { + case SUCCESS: +- break; +- case OFFSET_INCORRECT: + long offset = MessageBufferUtil.getQueueOffset(message); + if (queueOffset != offset) { +- logger.error("[Bug] Commitlog offset incorrect, " + +- "result={}, topic={}, queueId={}, offset={}, msg offset={}", +- result, topic, queueId, queueOffset, offset); ++ logger.error("Message cq offset in commitlog does not meet expectations, " + ++ "result={}, topic={}, queueId={}, cq offset={}, msg offset={}", ++ AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset); + } +- return; ++ break; + case BUFFER_FULL: + logger.debug("Commitlog buffer full, result={}, topic={}, queueId={}, offset={}", + result, topic, queueId, queueOffset); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +index 1243f7721..8f8ba98b1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang3.StringUtils; + import org.apache.commons.lang3.tuple.Pair; +@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess { + * dispatched to the current chunk, indicating the progress of the message distribution. + * It's consume queue current offset. + */ +- protected volatile long dispatchOffset; ++ protected final AtomicLong dispatchOffset; + + protected final ReentrantLock compositeFlatFileLock; + protected final TieredMessageStoreConfig storeConfig; +@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess { + this.storeConfig = fileQueueFactory.getStoreConfig(); + this.readAheadFactor = this.storeConfig.getReadAheadMinFactor(); + this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig); ++ this.dispatchOffset = new AtomicLong(); + this.compositeFlatFileLock = new ReentrantLock(); + this.inFlightRequestMap = new ConcurrentHashMap<>(); + this.commitLog = new TieredCommitLog(fileQueueFactory, filePath); +@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess { + } + + protected void recoverMetadata() { +- if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) { +- consumeQueue.setBaseOffset(this.dispatchOffset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); ++ if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) { ++ consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + } + } + +@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess { + } + + public long getDispatchOffset() { +- return dispatchOffset; ++ return dispatchOffset.get(); + } + + @Override +@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess { + if (!consumeQueue.isInitialized()) { + consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + } +- dispatchOffset = offset; ++ dispatchOffset.set(offset); + } + + @Override +@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess { + return AppendResult.FILE_CLOSED; + } + +- long queueOffset = MessageBufferUtil.getQueueOffset(message); +- if (dispatchOffset != queueOffset) { +- return AppendResult.OFFSET_INCORRECT; +- } +- + AppendResult result = commitLog.append(message, commit); + if (result == AppendResult.SUCCESS) { +- dispatchOffset = queueOffset + 1; ++ dispatchOffset.incrementAndGet(); + } + return result; + } +@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess { + } + + public void destroy() { +- closed = true; +- commitLog.destroy(); +- consumeQueue.destroy(); + try { ++ closed = true; ++ compositeFlatFileLock.lock(); ++ commitLog.destroy(); ++ consumeQueue.destroy(); + metadataStore.deleteFileSegment(filePath, FileSegmentType.COMMIT_LOG); + metadataStore.deleteFileSegment(filePath, FileSegmentType.CONSUME_QUEUE); + } catch (Exception e) { +- LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", e); ++ LOGGER.error("CompositeFlatFile#destroy: delete file failed", e); ++ } finally { ++ compositeFlatFileLock.unlock(); + } + } + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +index c0cf79069..f6c0afed0 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { + if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) { + queueMetadata.setMaxOffset(queueMetadata.getMinOffset()); + } +- this.dispatchOffset = queueMetadata.getMaxOffset(); ++ this.dispatchOffset.set(queueMetadata.getMaxOffset()); + } + + public void persistMetadata() { +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +index 9735a535e..8322c72ed 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest { + CompositeQueueFlatFile flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); + ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer(); + AppendResult result = flatFile.appendCommitLog(message); +- Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result); +- Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); ++ Assert.assertEquals(AppendResult.SUCCESS, result); ++ Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); + Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition()); + + flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); +-- +2.32.0.windows.2 + + +From 70a66eda2c08eb5fca38356659cb6de1ac75e25e Mon Sep 17 00:00:00 2001 +From: ShuangxiDing <dingshuangxi888@gmail.com> +Date: Fri, 14 Jul 2023 16:46:40 +0800 +Subject: [PATCH 6/8] Fix LEAK: HAProxyMessage.release() was not called before + it's garbage-collected (#7025) +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +Call HAProxyMessage.release() after reading it. + +--------- + +Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com> +--- + .../grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++--------- + .../remoting/netty/NettyRemotingServer.java | 46 ++++++++-------- + 2 files changed, 53 insertions(+), 45 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +index ceb9becc0..ee167bd7b 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +@@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HAProxyMessage) { +- replaceEventWithMessage((HAProxyMessage) msg); ++ handleWithMessage((HAProxyMessage) msg); + ctx.fireUserEventTriggered(pne); + } else { + super.channelRead(ctx, msg); +@@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator + * + * @param msg + */ +- private void replaceEventWithMessage(HAProxyMessage msg) { +- Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); +- if (StringUtils.isNotBlank(msg.sourceAddress())) { +- builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); +- } +- if (msg.sourcePort() > 0) { +- builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); +- } +- if (StringUtils.isNotBlank(msg.destinationAddress())) { +- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); +- } +- if (msg.destinationPort() > 0) { +- builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); +- } +- if (CollectionUtils.isNotEmpty(msg.tlvs())) { +- msg.tlvs().forEach(tlv -> { +- Attributes.Key<String> key = AttributeKeys.valueOf( +- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); +- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); +- builder.set(key, value); +- }); ++ private void handleWithMessage(HAProxyMessage msg) { ++ try { ++ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); ++ if (StringUtils.isNotBlank(msg.sourceAddress())) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); ++ } ++ if (msg.sourcePort() > 0) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); ++ } ++ if (StringUtils.isNotBlank(msg.destinationAddress())) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); ++ } ++ if (msg.destinationPort() > 0) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); ++ } ++ if (CollectionUtils.isNotEmpty(msg.tlvs())) { ++ msg.tlvs().forEach(tlv -> { ++ Attributes.Key<String> key = AttributeKeys.valueOf( ++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); ++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ builder.set(key, value); ++ }); ++ } ++ pne = InternalProtocolNegotiationEvent ++ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); ++ } finally { ++ msg.release(); + } +- pne = InternalProtocolNegotiationEvent +- .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); + } + } + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 8ae87a6fa..90e358ce3 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -758,7 +758,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HAProxyMessage) { +- fillChannelWithMessage((HAProxyMessage) msg, ctx.channel()); ++ handleWithMessage((HAProxyMessage) msg, ctx.channel()); + } else { + super.channelRead(ctx, msg); + } +@@ -771,26 +771,30 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + * @param msg + * @param channel + */ +- private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) { +- if (StringUtils.isNotBlank(msg.sourceAddress())) { +- channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); +- } +- if (msg.sourcePort() > 0) { +- channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); +- } +- if (StringUtils.isNotBlank(msg.destinationAddress())) { +- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); +- } +- if (msg.destinationPort() > 0) { +- channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); +- } +- if (CollectionUtils.isNotEmpty(msg.tlvs())) { +- msg.tlvs().forEach(tlv -> { +- AttributeKey<String> key = AttributeKeys.valueOf( +- HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); +- String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); +- channel.attr(key).set(value); +- }); ++ private void handleWithMessage(HAProxyMessage msg, Channel channel) { ++ try { ++ if (StringUtils.isNotBlank(msg.sourceAddress())) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); ++ } ++ if (msg.sourcePort() > 0) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); ++ } ++ if (StringUtils.isNotBlank(msg.destinationAddress())) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); ++ } ++ if (msg.destinationPort() > 0) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); ++ } ++ if (CollectionUtils.isNotEmpty(msg.tlvs())) { ++ msg.tlvs().forEach(tlv -> { ++ AttributeKey<String> key = AttributeKeys.valueOf( ++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); ++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ channel.attr(key).set(value); ++ }); ++ } ++ } finally { ++ msg.release(); + } + } + } +-- +2.32.0.windows.2 + + +From 5914ff8dbb9d37e2cb48ef9c4f0256c6185b4659 Mon Sep 17 00:00:00 2001 +From: lyx <56945247+lyx2000@users.noreply.github.com> +Date: Sat, 15 Jul 2023 18:15:57 +0800 +Subject: [PATCH 7/8] [ISSUE #6968] fix grpc acl bug (#6969) + +* feat(acl): fix acl bug + +Signed-off-by: lyx <1419360299@qq.com> + +# Conflicts: +# proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java + +* add access test for two client + +Signed-off-by: lyx <1419360299@qq.com> + +* use specific acl config + +Signed-off-by: lyx <1419360299@qq.com> + +* Recovering unchange file + +Signed-off-by: lyx <1419360299@qq.com> + +* let test pass + +Signed-off-by: lyx <1419360299@qq.com> + +--------- + +Signed-off-by: lyx <1419360299@qq.com> +--- + .../acl/plain/PlainAccessResource.java | 21 +- + .../acl/RemotingClientAccessTest.java | 189 ++++++++++++++++++ + .../access_acl_conf/acl/plain_acl.yml | 31 +++ + acl/src/test/resources/conf/acl/plain_acl.yml | 1 - + pom.xml | 1 + + .../apache/rocketmq/proxy/ProxyStartup.java | 17 +- + .../proxy/grpc/GrpcServerBuilder.java | 10 +- + .../remoting/RemotingProtocolServer.java | 15 +- + 8 files changed, 255 insertions(+), 30 deletions(-) + create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java + create mode 100644 acl/src/test/resources/access_acl_conf/acl/plain_acl.yml + +diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +index cdbd9ea9b..72aa8ca71 100644 +--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java ++++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +@@ -223,7 +223,7 @@ public class PlainAccessResource implements AccessResource { + if (!request.hasGroup()) { + throw new AclException("Consumer heartbeat doesn't have group"); + } else { +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + } + } + } else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { +@@ -240,15 +240,15 @@ public class PlainAccessResource implements AccessResource { + accessResource.addResourceAndPerm(topic, Permission.PUB); + } else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { + ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addResourceAndPerm(request.getMessageQueue().getTopic(), Permission.SUB); + } else if (AckMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { + AckMessageRequest request = (AckMessageRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); + } else if (ForwardMessageToDeadLetterQueueRequest.getDescriptor().getFullName().equals(rpcFullName)) { + ForwardMessageToDeadLetterQueueRequest request = (ForwardMessageToDeadLetterQueueRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); + } else if (EndTransactionRequest.getDescriptor().getFullName().equals(rpcFullName)) { + EndTransactionRequest request = (EndTransactionRequest) messageV3; +@@ -264,7 +264,7 @@ public class PlainAccessResource implements AccessResource { + } + if (command.getSettings().hasSubscription()) { + Subscription subscription = command.getSettings().getSubscription(); +- accessResource.addResourceAndPerm(subscription.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(subscription.getGroup(), Permission.SUB); + for (SubscriptionEntry entry : subscription.getSubscriptionsList()) { + accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB); + } +@@ -275,17 +275,17 @@ public class PlainAccessResource implements AccessResource { + } + } else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) { + NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + } else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) { + QueryRouteRequest request = (QueryRouteRequest) messageV3; + accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY); + } else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) { + QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); + } else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) { + ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3; +- accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); ++ accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); + } + } catch (Throwable t) { +@@ -299,6 +299,11 @@ public class PlainAccessResource implements AccessResource { + addResourceAndPerm(resourceName, permission); + } + ++ private void addGroupResourceAndPerm(Resource resource, byte permission) { ++ String resourceName = NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName()); ++ addResourceAndPerm(getRetryTopic(resourceName), permission); ++ } ++ + public static PlainAccessResource build(PlainAccessConfig plainAccessConfig, RemoteAddressStrategy remoteAddressStrategy) { + PlainAccessResource plainAccessResource = new PlainAccessResource(); + plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey()); +diff --git a/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java +new file mode 100644 +index 000000000..88c5e09a9 +--- /dev/null ++++ b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java +@@ -0,0 +1,189 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.acl; ++ ++import java.io.File; ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import org.apache.rocketmq.acl.common.AclClientRPCHook; ++import org.apache.rocketmq.acl.common.AclException; ++import org.apache.rocketmq.acl.common.SessionCredentials; ++import org.apache.rocketmq.acl.plain.AclTestHelper; ++import org.apache.rocketmq.acl.plain.PlainAccessResource; ++import org.apache.rocketmq.acl.plain.PlainAccessValidator; ++import org.apache.rocketmq.remoting.exception.RemotingCommandException; ++import org.apache.rocketmq.remoting.protocol.RemotingCommand; ++import org.apache.rocketmq.remoting.protocol.RequestCode; ++import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; ++import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++public class RemotingClientAccessTest { ++ ++ private PlainAccessValidator plainAccessValidator; ++ private AclClientRPCHook aclClient; ++ private SessionCredentials sessionCredentials; ++ ++ private File confHome; ++ ++ private String clientAddress = "10.7.1.3"; ++ ++ @Before ++ public void init() throws IOException { ++ String folder = "access_acl_conf"; ++ confHome = AclTestHelper.copyResources(folder, true); ++ System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath()); ++ System.setProperty("rocketmq.acl.plain.file", "/access_acl_conf/acl/plain_acl.yml".replace("/", File.separator)); ++ ++ plainAccessValidator = new PlainAccessValidator(); ++ sessionCredentials = new SessionCredentials(); ++ sessionCredentials.setAccessKey("rocketmq3"); ++ sessionCredentials.setSecretKey("12345678"); ++ aclClient = new AclClientRPCHook(sessionCredentials); ++ } ++ ++ @After ++ public void cleanUp() { ++ AclTestHelper.recursiveDelete(confHome); ++ } ++ ++ @Test(expected = AclException.class) ++ public void testProduceDenyTopic() { ++ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); ++ messageRequestHeader.setTopic("topicD"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); ++ aclClient.doBeforeRequest(clientAddress, remotingCommand); ++ ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ } ++ ++ @Test ++ public void testProduceAuthorizedTopic() { ++ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); ++ messageRequestHeader.setTopic("topicA"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); ++ aclClient.doBeforeRequest(clientAddress, remotingCommand); ++ ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ } ++ ++ ++ @Test(expected = AclException.class) ++ public void testConsumeDenyTopic() { ++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); ++ pullMessageRequestHeader.setTopic("topicD"); ++ pullMessageRequestHeader.setConsumerGroup("groupB"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); ++ aclClient.doBeforeRequest("", remotingCommand); ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ ++ } ++ ++ @Test ++ public void testConsumeAuthorizedTopic() { ++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); ++ pullMessageRequestHeader.setTopic("topicB"); ++ pullMessageRequestHeader.setConsumerGroup("groupB"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); ++ aclClient.doBeforeRequest("", remotingCommand); ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ } ++ ++ @Test(expected = AclException.class) ++ public void testConsumeInDeniedGroup() { ++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); ++ pullMessageRequestHeader.setTopic("topicB"); ++ pullMessageRequestHeader.setConsumerGroup("groupD"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); ++ aclClient.doBeforeRequest("", remotingCommand); ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ } ++ ++ @Test ++ public void testConsumeInAuthorizedGroup() { ++ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); ++ pullMessageRequestHeader.setTopic("topicB"); ++ pullMessageRequestHeader.setConsumerGroup("groupB"); ++ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); ++ aclClient.doBeforeRequest("", remotingCommand); ++ ByteBuffer buf = remotingCommand.encodeHeader(); ++ buf.getInt(); ++ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); ++ buf.position(0); ++ try { ++ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); ++ plainAccessValidator.validate(accessResource); ++ } catch (RemotingCommandException e) { ++ e.printStackTrace(); ++ Assert.fail("Should not throw IOException"); ++ } ++ } ++ ++} +diff --git a/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml +new file mode 100644 +index 000000000..28a8c4888 +--- /dev/null ++++ b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml +@@ -0,0 +1,31 @@ ++# Licensed to the Apache Software Foundation (ASF) under one or more ++# contributor license agreements. See the NOTICE file distributed with ++# this work for additional information regarding copyright ownership. ++# The ASF licenses this file to You under the Apache License, Version 2.0 ++# (the "License"); you may not use this file except in compliance with ++# the License. You may obtain a copy of the License at ++# ++# http://www.apache.org/licenses/LICENSE-2.0 ++# ++# Unless required by applicable law or agreed to in writing, software ++# distributed under the License is distributed on an "AS IS" BASIS, ++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++# See the License for the specific language governing permissions and ++# limitations under the License. ++ ++accounts: ++ - accessKey: rocketmq3 ++ secretKey: 12345678 ++ admin: false ++ defaultTopicPerm: DENY ++ defaultGroupPerm: DENY ++ topicPerms: ++ - topicA=PUB ++ - topicB=SUB ++ - topicC=PUB|SUB ++ - topicD=DENY ++ groupPerms: ++ - groupB=SUB ++ - groupC=PUB|SUB ++ - groupD=DENY ++ +diff --git a/acl/src/test/resources/conf/acl/plain_acl.yml b/acl/src/test/resources/conf/acl/plain_acl.yml +index 5641a94bf..34e46696d 100644 +--- a/acl/src/test/resources/conf/acl/plain_acl.yml ++++ b/acl/src/test/resources/conf/acl/plain_acl.yml +@@ -41,4 +41,3 @@ accounts: + whiteRemoteAddress: 192.168.1.* + # if it is admin, it could access all resources + admin: true +- +diff --git a/pom.xml b/pom.xml +index 12bc2dbd5..4d5dd1dec 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -147,6 +147,7 @@ + <awaitility.version>4.1.0</awaitility.version> + <truth.version>0.30</truth.version> + <s3mock-junit4.version>2.11.0</s3mock-junit4.version> ++ <rocketmq-client-java.version>5.0.5</rocketmq-client-java.version> + + <!-- Build plugin dependencies --> + <versions-maven-plugin.version>2.2</versions-maven-plugin.version> +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +index ea13bb808..06d5f4525 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +@@ -29,11 +29,14 @@ import org.apache.commons.cli.DefaultParser; + import org.apache.commons.cli.Option; + import org.apache.commons.cli.Options; + import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.acl.AccessValidator; ++import org.apache.rocketmq.acl.plain.PlainAccessValidator; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.broker.BrokerStartup; + import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.thread.ThreadPoolMonitor; ++import org.apache.rocketmq.common.utils.ServiceProvider; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +@@ -75,16 +78,17 @@ public class ProxyStartup { + + MessagingProcessor messagingProcessor = createMessagingProcessor(); + ++ List<AccessValidator> accessValidators = loadAccessValidators(); + // create grpcServer + GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort()) + .addService(createServiceProcessor(messagingProcessor)) + .addService(ChannelzService.newInstance(100)) + .addService(ProtoReflectionService.newInstance()) +- .configInterceptor() ++ .configInterceptor(accessValidators) + .build(); + PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer); + +- RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor); ++ RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators); + PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer); + + // start servers one by one. +@@ -109,6 +113,15 @@ public class ProxyStartup { + log.info(new Date() + " rocketmq-proxy startup successfully"); + } + ++ protected static List<AccessValidator> loadAccessValidators() { ++ List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class); ++ if (accessValidators.isEmpty()) { ++ log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator"); ++ accessValidators.add(new PlainAccessValidator()); ++ } ++ return accessValidators; ++ } ++ + protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception { + if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { + System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +index 437b9216b..9cddd3013 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +@@ -28,9 +28,7 @@ import java.util.List; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.acl.AccessValidator; +-import org.apache.rocketmq.acl.plain.PlainAccessValidator; + import org.apache.rocketmq.common.constant.LoggerName; +-import org.apache.rocketmq.common.utils.ServiceProvider; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.config.ConfigurationManager; +@@ -98,14 +96,8 @@ public class GrpcServerBuilder { + return new GrpcServer(this.serverBuilder.build()); + } + +- public GrpcServerBuilder configInterceptor() { ++ public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) { + // grpc interceptors, including acl, logging etc. +- List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class); +- if (accessValidators.isEmpty()) { +- log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator"); +- accessValidators.add(new PlainAccessValidator()); +- } +- + this.serverBuilder.intercept(new AuthenticationInterceptor(accessValidators)); + + this.serverBuilder +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +index f08094c16..bcc9edd09 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +@@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting; + + import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.channel.Channel; +-import java.util.ArrayList; + import java.util.List; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.CompletableFuture; +@@ -28,15 +27,14 @@ import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.acl.AccessValidator; +-import org.apache.rocketmq.acl.plain.PlainAccessValidator; + import org.apache.rocketmq.client.exception.MQClientException; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.future.FutureTaskExt; + import org.apache.rocketmq.common.thread.ThreadPoolMonitor; + import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; ++import org.apache.rocketmq.common.utils.StartAndShutdown; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.common.utils.StartAndShutdown; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +@@ -86,11 +84,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu + protected final ThreadPoolExecutor defaultExecutor; + protected final ScheduledExecutorService timerExecutor; + +- public RemotingProtocolServer(MessagingProcessor messagingProcessor) { ++ public RemotingProtocolServer(MessagingProcessor messagingProcessor, List<AccessValidator> accessValidators) { + this.messagingProcessor = messagingProcessor; + this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService()); + +- RequestPipeline pipeline = createRequestPipeline(); ++ RequestPipeline pipeline = createRequestPipeline(accessValidators); + this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor); + this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); + this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor); +@@ -254,15 +252,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu + return future; + } + +- protected RequestPipeline createRequestPipeline() { ++ protected RequestPipeline createRequestPipeline(List<AccessValidator> accessValidators) { + RequestPipeline pipeline = (ctx, request, context) -> { + }; +- +- List<AccessValidator> accessValidatorList = new ArrayList<>(); +- accessValidatorList.add(new PlainAccessValidator()); + // add pipeline + // the last pipe add will execute at the first +- return pipeline.pipe(new AuthenticationPipeline(accessValidatorList)); ++ return pipeline.pipe(new AuthenticationPipeline(accessValidators)); + } + + protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { +-- +2.32.0.windows.2 + + +From 440be1ed4ce2af0ab58af6c3019de7075c09c20f Mon Sep 17 00:00:00 2001 +From: fuyou001 <yubao.fyb@alibaba-inc.com> +Date: Mon, 17 Jul 2023 19:23:23 +0800 +Subject: [PATCH 8/8] [ISSUE #7031] fix Pop caused broker memory leak bug + (#7032) + +--- + .../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++- + .../broker/processor/PopMessageProcessor.java | 11 ++++++----- + 2 files changed, 22 insertions(+), 6 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +index d7bc7c694..b7ba8ad4a 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +@@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread { + * @param nextBeginOffset + * @return + */ +- public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { ++ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { + PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true); + ++ if (this.buffer.containsKey(pointWrapper.getMergeKey())) { ++ // when mergeKey conflict ++ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper ++ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey()); ++ return false; ++ } ++ + this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper)); + + putOffsetQueue(pointWrapper); +@@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread { + if (brokerController.getBrokerConfig().isEnablePopLog()) { + POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper); + } ++ return true; + } + + public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime, +@@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread { + return false; + } + ++ if (this.buffer.containsKey(pointWrapper.getMergeKey())) { ++ // when mergeKey conflict ++ // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper ++ POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey()); ++ return false; ++ } ++ + putOffsetQueue(pointWrapper); + this.buffer.put(pointWrapper.getMergeKey(), pointWrapper); + this.counter.incrementAndGet(); +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +index 28549bfed..464f8f4fd 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +@@ -570,7 +570,9 @@ public class PopMessageProcessor implements NettyRequestProcessor { + this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), + requestHeader.getConsumerGroup(), topic, queueId, finalOffset); + } else { +- appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName()); ++ if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) { ++ return atomicRestNum.get() + result.getMessageCount(); ++ } + } + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId, +@@ -685,7 +687,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { + return msgInner; + } + +- private void appendCheckPoint(final PopMessageRequestHeader requestHeader, ++ private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader, + final String topic, final int reviveQid, final int queueId, final long offset, + final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) { + // add check point msg to revive log +@@ -708,10 +710,9 @@ public class PopMessageProcessor implements NettyRequestProcessor { + ); + + if (addBufferSuc) { +- return; ++ return true; + } +- +- this.popBufferMergeService.addCkJustOffset( ++ return this.popBufferMergeService.addCkJustOffset( + ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset() + ); + } +-- +2.32.0.windows.2 + |