From 15c6889bb0abd014c06ef1452f791db9daa1ea08 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 11 Jul 2023 17:04:00 +0800 Subject: [PATCH 1/8] fix receive message activity attempt id not correct (#7012) fix receive message activity attempt id not correct --- .../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +- .../proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index a504179a9..cf58bb87a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity { subscriptionData, fifo, new PopMessageResultFilterImpl(maxAttempts), - request.getAttemptId(), + request.hasAttemptId() ? request.getAttemptId() : null, timeRemaining ).thenAccept(popResult -> { if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 2e562504a..7fd9a9ffd 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { .setRequestTimeout(Durations.fromSeconds(3)) .build()); when(this.messagingProcessor.popMessage(any(), any(), anyString(), anyString(), anyInt(), anyLong(), - pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), anyString(), anyLong())) + pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); @@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { assertEquals(Code.ILLEGAL_INVISIBLE_TIME, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues())); } - @Test public void testReceiveMessage() { StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); @@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { any(), anyBoolean(), any(), - anyString(), + isNull(), anyLong())).thenReturn(CompletableFuture.completedFuture(popResult)); this.receiveMessageActivity.receiveMessage( -- 2.32.0.windows.2 From b4496be68705c1c0b282a07a1adeab4fffd670fe Mon Sep 17 00:00:00 2001 From: ShuangxiDing Date: Tue, 11 Jul 2023 19:09:09 +0800 Subject: [PATCH 2/8] [ISSUE #7010] Fix the HandshakeHandler returns when detect haproxy version need more data (#7011) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator * Support dynamic modification of grpc tls mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * [ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC server. * Support proxy protocol for gRPC and Remoting server. * 回滚netty的升级 * Support proxy protocol for gRPC and Remoting server. * Support proxy protocol for gRPC and Remoting server. * Support proxy protocol for gRPC and Remoting server. * add grpc-netty-codec-haproxy in bazel * add grpc-netty-codec-haproxy in bazel * Support proxy protocol for gRPC and Remoting server. * Fix Test * add grpc-netty-codec-haproxy in bazel * add ProxyProtocolTest for Remoting * Move AttributeKey from RemotingHelper to AttributeKey. * Fix the needs more data for HandshakeHandler. * Fix the needs more data for HandshakeHandler. * Fix the needs more data for HandshakeHandler. * Fix the needs more data for HandshakeHandler. --------- Co-authored-by: 徒钟 --- .../remoting/MultiProtocolRemotingServer.java | 2 +- .../activity/AbstractRemotingActivity.java | 16 +++++++------ .../activity/ClientManagerActivity.java | 24 ++++++++++--------- .../AbstractRemotingActivityTest.java | 10 ++++---- .../remoting/common/RemotingHelper.java | 21 ++++------------ .../remoting/netty/AttributeKeys.java | 11 ++++++++- .../remoting/netty/NettyRemotingServer.java | 23 ++++++------------ 7 files changed, 51 insertions(+), 56 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java index 858b1f022..12d728fff 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -78,7 +78,7 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer { @Override protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() - .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler()) + .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(this.getDefaultEventExecutorGroup(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new ProtocolNegotiationHandler(this.remotingProtocolHandler) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java index 78cd203ec..ce4a63397 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -19,9 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -41,11 +38,16 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + public abstract class AbstractRemotingActivity implements NettyRequestProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MessagingProcessor messagingProcessor; @@ -126,13 +128,13 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor .setProtocolType(ChannelProtocolType.REMOTING.getName()) .setChannel(channel) .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) - .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); + .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.LANGUAGE_CODE_KEY, channel)) + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel)) .ifPresent(language -> context.setLanguage(language.name())); - Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.CLIENT_ID_KEY, channel)) + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel)) .ifPresent(context::setClientID); - Optional.ofNullable(RemotingHelper.getAttributeValue(RemotingHelper.VERSION_KEY, channel)) + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel)) .ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version))); return context; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java index 1eb81ce92..c671593a3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java @@ -19,13 +19,20 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.util.Set; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerChangeListener; import org.apache.rocketmq.broker.client.ProducerGroupEvent; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AttributeKeys; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; @@ -33,13 +40,8 @@ import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHead import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData; -import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; -import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; -import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +import java.util.Set; public class ClientManagerActivity extends AbstractRemotingActivity { @@ -108,9 +110,9 @@ public class ClientManagerActivity extends AbstractRemotingActivity { if (channel instanceof RemotingChannel) { RemotingChannel remotingChannel = (RemotingChannel) channel; Channel parent = remotingChannel.parent(); - RemotingHelper.setPropertyToAttr(parent, RemotingHelper.CLIENT_ID_KEY, clientChannelInfo.getClientId()); - RemotingHelper.setPropertyToAttr(parent, RemotingHelper.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage()); - RemotingHelper.setPropertyToAttr(parent, RemotingHelper.VERSION_KEY, clientChannelInfo.getVersion()); + RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId()); + RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage()); + RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, clientChannelInfo.getVersion()); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java index 663a83e3c..b2bd3a35f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -35,6 +34,7 @@ import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; @@ -48,6 +48,8 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.util.concurrent.CompletableFuture; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -82,9 +84,9 @@ public class AbstractRemotingActivityTest extends InitConfigTest { } }; Channel channel = ctx.channel(); - RemotingHelper.setPropertyToAttr(channel, RemotingHelper.CLIENT_ID_KEY, CLIENT_ID); - RemotingHelper.setPropertyToAttr(channel, RemotingHelper.LANGUAGE_CODE_KEY, LanguageCode.JAVA); - RemotingHelper.setPropertyToAttr(channel, RemotingHelper.VERSION_KEY, MQVersion.CURRENT_VERSION); + RemotingHelper.setPropertyToAttr(channel, AttributeKeys.CLIENT_ID_KEY, CLIENT_ID); + RemotingHelper.setPropertyToAttr(channel, AttributeKeys.LANGUAGE_CODE_KEY, LanguageCode.JAVA); + RemotingHelper.setPropertyToAttr(channel, AttributeKeys.VERSION_KEY, MQVersion.CURRENT_VERSION); } @Test diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index d0750b678..363b22eac 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.constant.HAProxyConstants; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -31,8 +30,8 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.netty.NettySystemConfig; -import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; @@ -51,16 +50,6 @@ public class RemotingHelper { public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0"; private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); - private static final AttributeKey REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); - - private static final AttributeKey PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); - private static final AttributeKey PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); - - public static final AttributeKey CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); - - public static final AttributeKey VERSION_KEY = AttributeKey.valueOf("Version"); - - public static final AttributeKey LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode"); public static final Map REQUEST_CODE_MAP = new HashMap() { { @@ -213,7 +202,7 @@ public class RemotingHelper { if (StringUtils.isNotBlank(addr)) { return addr; } - Attribute att = channel.attr(REMOTE_ADDR_KEY); + Attribute att = channel.attr(AttributeKeys.REMOTE_ADDR_KEY); if (att == null) { // mocked in unit test return parseChannelRemoteAddr0(channel); @@ -227,11 +216,11 @@ public class RemotingHelper { } private static String getProxyProtocolAddress(Channel channel) { - if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) { + if (!channel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { return null; } - String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel); - String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel); + String proxyProtocolAddr = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_ADDR, channel); + String proxyProtocolPort = getAttributeValue(AttributeKeys.PROXY_PROTOCOL_PORT, channel); if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) { return null; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java index 4e69ab82d..ebdde31f4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java @@ -19,12 +19,21 @@ package org.apache.rocketmq.remoting.netty; import io.netty.util.AttributeKey; import org.apache.rocketmq.common.constant.HAProxyConstants; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class AttributeKeys { + public static final AttributeKey REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); + + public static final AttributeKey CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); + + public static final AttributeKey VERSION_KEY = AttributeKey.valueOf("Version"); + + public static final AttributeKey LANGUAGE_CODE_KEY = AttributeKey.valueOf("LanguageCode"); + public static final AttributeKey PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); @@ -40,6 +49,6 @@ public class AttributeKeys { private static final Map> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>(); public static AttributeKey valueOf(String name) { - return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf); + return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKey::valueOf); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 445f06cc6..8ae87a6fa 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -37,6 +37,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ProtocolDetectionResult; import io.netty.handler.codec.ProtocolDetectionState; import io.netty.handler.codec.haproxy.HAProxyMessage; @@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import java.io.IOException; import java.net.InetSocketAddress; import java.security.cert.CertificateException; +import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -115,7 +117,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // sharable handlers - private HandshakeHandler handshakeHandler; + private TlsModeHandler tlsModeHandler; private NettyEncoder encoder; private NettyConnectManageHandler connectionManageHandler; private NettyServerHandler serverHandler; @@ -265,7 +267,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti */ protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() - .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), @@ -402,7 +404,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } private void prepareSharableHandlers() { - handshakeHandler = new HandshakeHandler(); + tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); encoder = new NettyEncoder(); connectionManageHandler = new NettyConnectManageHandler(); serverHandler = new NettyServerHandler(); @@ -429,10 +431,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return defaultEventExecutorGroup; } - public HandshakeHandler getHandshakeHandler() { - return handshakeHandler; - } - public NettyEncoder getEncoder() { return encoder; } @@ -449,17 +447,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return distributionHandler; } - @ChannelHandler.Sharable - public class HandshakeHandler extends SimpleChannelInboundHandler { - - private final TlsModeHandler tlsModeHandler; + public class HandshakeHandler extends ByteToMessageDecoder { public HandshakeHandler() { - tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); } @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) { + protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List out) throws Exception { try { ProtocolDetectionResult detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf); if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { @@ -479,9 +473,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } catch (NoSuchElementException e) { log.error("Error while removing HandshakeHandler", e); } - - // Hand over this message to the next . - ctx.fireChannelRead(byteBuf.retain()); } catch (Exception e) { log.error("process proxy protocol negotiator failed.", e); throw e; -- 2.32.0.windows.2 From 1f0f3b2d6d16de7b315c702d33f7d3557c0fc25c Mon Sep 17 00:00:00 2001 From: Ji Juntao Date: Tue, 11 Jul 2023 21:13:06 +0800 Subject: [PATCH 3/8] [ISSUE #7013] Polish ColdDataCheckService's logic (#7014) * polish coldCtrl * remove the catch. --- .../src/main/java/org/apache/rocketmq/store/CommitLog.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5a5c90c5a..e6ee3bacc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -2089,6 +2089,11 @@ public class CommitLog implements Swappable { } else { this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs()); } + + if (pageSize < 0) { + initPageSize(); + } + long beginClockTimestamp = this.systemClock.now(); scanFilesInPageCache(); long costTime = this.systemClock.now() - beginClockTimestamp; @@ -2182,7 +2187,7 @@ public class CommitLog implements Swappable { } private void initPageSize() { - if (pageSize < 0) { + if (pageSize < 0 && defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) { try { if (!MixAll.isWindows()) { pageSize = LibC.INSTANCE.getpagesize(); -- 2.32.0.windows.2 From d206590692bfdffca6bc58327e9533bc4bb68122 Mon Sep 17 00:00:00 2001 From: Lei Zhiyuan Date: Thu, 13 Jul 2023 11:17:38 +0800 Subject: [PATCH 4/8] [ISSUE #6979] Fix opaque will be duplicate in multi client scene (#6985) --- .../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 1b3f0af4e..188cb7b9b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen @Override public CompletableFuture request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { - return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis); + int originalRequestOpaque = request.getOpaque(); + request.setOpaque(RemotingCommand.createNewRequestId()); + return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis).thenApply(r -> { + request.setOpaque(originalRequestOpaque); + return r; + }); } @Override public CompletableFuture requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { - return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis); + int originalRequestOpaque = request.getOpaque(); + request.setOpaque(RemotingCommand.createNewRequestId()); + return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis).thenApply(r -> { + request.setOpaque(originalRequestOpaque); + return r; + }); } @Override -- 2.32.0.windows.2 From 33cb22e1c0fa7ba980567117230fe443ff5dbd62 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Thu, 13 Jul 2023 15:37:24 +0800 Subject: [PATCH 5/8] [ISSUE #7018] fix append in tiered storage when message offset incorrect (#7019) * fix append in tiered storage when message offset incorrect --- .../tieredstore/TieredDispatcher.java | 25 ++++++++++------ .../tieredstore/file/CompositeFlatFile.java | 30 +++++++++---------- .../file/CompositeQueueFlatFile.java | 2 +- .../file/CompositeQueueFlatFileTest.java | 4 +-- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 2a8e2ed71..6584b0e89 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch doRedispatchRequestToWriteMap( result, flatFile, dispatchOffset, newCommitLogOffset, size, tagCode, message.getByteBuffer()); message.release(); - if (result != AppendResult.SUCCESS) { - dispatchOffset--; - break; + + switch (result) { + case SUCCESS: + continue; + case FILE_CLOSED: + tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue()); + logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " + + "topic: {}, queueId: {}", topic, queueId); + return; + default: + dispatchOffset--; + break; } } @@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch switch (result) { case SUCCESS: - break; - case OFFSET_INCORRECT: long offset = MessageBufferUtil.getQueueOffset(message); if (queueOffset != offset) { - logger.error("[Bug] Commitlog offset incorrect, " + - "result={}, topic={}, queueId={}, offset={}, msg offset={}", - result, topic, queueId, queueOffset, offset); + logger.error("Message cq offset in commitlog does not meet expectations, " + + "result={}, topic={}, queueId={}, cq offset={}, msg offset={}", + AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset); } - return; + break; case BUFFER_FULL: logger.debug("Commitlog buffer full, result={}, topic={}, queueId={}, offset={}", result, topic, queueId, queueOffset); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java index 1243f7721..8f8ba98b1 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess { * dispatched to the current chunk, indicating the progress of the message distribution. * It's consume queue current offset. */ - protected volatile long dispatchOffset; + protected final AtomicLong dispatchOffset; protected final ReentrantLock compositeFlatFileLock; protected final TieredMessageStoreConfig storeConfig; @@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess { this.storeConfig = fileQueueFactory.getStoreConfig(); this.readAheadFactor = this.storeConfig.getReadAheadMinFactor(); this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig); + this.dispatchOffset = new AtomicLong(); this.compositeFlatFileLock = new ReentrantLock(); this.inFlightRequestMap = new ConcurrentHashMap<>(); this.commitLog = new TieredCommitLog(fileQueueFactory, filePath); @@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess { } protected void recoverMetadata() { - if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) { - consumeQueue.setBaseOffset(this.dispatchOffset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) { + consumeQueue.setBaseOffset(this.dispatchOffset.get() * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); } } @@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess { } public long getDispatchOffset() { - return dispatchOffset; + return dispatchOffset.get(); } @Override @@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess { if (!consumeQueue.isInitialized()) { consumeQueue.setBaseOffset(offset * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); } - dispatchOffset = offset; + dispatchOffset.set(offset); } @Override @@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess { return AppendResult.FILE_CLOSED; } - long queueOffset = MessageBufferUtil.getQueueOffset(message); - if (dispatchOffset != queueOffset) { - return AppendResult.OFFSET_INCORRECT; - } - AppendResult result = commitLog.append(message, commit); if (result == AppendResult.SUCCESS) { - dispatchOffset = queueOffset + 1; + dispatchOffset.incrementAndGet(); } return result; } @@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess { } public void destroy() { - closed = true; - commitLog.destroy(); - consumeQueue.destroy(); try { + closed = true; + compositeFlatFileLock.lock(); + commitLog.destroy(); + consumeQueue.destroy(); metadataStore.deleteFileSegment(filePath, FileSegmentType.COMMIT_LOG); metadataStore.deleteFileSegment(filePath, FileSegmentType.CONSUME_QUEUE); } catch (Exception e) { - LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", e); + LOGGER.error("CompositeFlatFile#destroy: delete file failed", e); + } finally { + compositeFlatFileLock.unlock(); } } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java index c0cf79069..f6c0afed0 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java @@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile { if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) { queueMetadata.setMaxOffset(queueMetadata.getMinOffset()); } - this.dispatchOffset = queueMetadata.getMaxOffset(); + this.dispatchOffset.set(queueMetadata.getMaxOffset()); } public void persistMetadata() { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java index 9735a535e..8322c72ed 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java @@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest { CompositeQueueFlatFile flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer(); AppendResult result = flatFile.appendCommitLog(message); - Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result); - Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); + Assert.assertEquals(AppendResult.SUCCESS, result); + Assert.assertEquals(122L, flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition()); Assert.assertEquals(0L, flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition()); flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq); -- 2.32.0.windows.2 From 70a66eda2c08eb5fca38356659cb6de1ac75e25e Mon Sep 17 00:00:00 2001 From: ShuangxiDing Date: Fri, 14 Jul 2023 16:46:40 +0800 Subject: [PATCH 6/8] Fix LEAK: HAProxyMessage.release() was not called before it's garbage-collected (#7025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Call HAProxyMessage.release() after reading it. --------- Co-authored-by: 徒钟 --- .../grpc/ProxyAndTlsProtocolNegotiator.java | 52 ++++++++++--------- .../remoting/netty/NettyRemotingServer.java | 46 ++++++++-------- 2 files changed, 53 insertions(+), 45 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java index ceb9becc0..ee167bd7b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java @@ -160,7 +160,7 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HAProxyMessage) { - replaceEventWithMessage((HAProxyMessage) msg); + handleWithMessage((HAProxyMessage) msg); ctx.fireUserEventTriggered(pne); } else { super.channelRead(ctx, msg); @@ -174,30 +174,34 @@ public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator * * @param msg */ - private void replaceEventWithMessage(HAProxyMessage msg) { - Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); - if (StringUtils.isNotBlank(msg.sourceAddress())) { - builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); - } - if (msg.sourcePort() > 0) { - builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); - } - if (StringUtils.isNotBlank(msg.destinationAddress())) { - builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); - } - if (msg.destinationPort() > 0) { - builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); - } - if (CollectionUtils.isNotEmpty(msg.tlvs())) { - msg.tlvs().forEach(tlv -> { - Attributes.Key key = AttributeKeys.valueOf( - HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); - builder.set(key, value); - }); + private void handleWithMessage(HAProxyMessage msg) { + try { + Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); + if (StringUtils.isNotBlank(msg.sourceAddress())) { + builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); + } + if (msg.sourcePort() > 0) { + builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); + } + if (StringUtils.isNotBlank(msg.destinationAddress())) { + builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); + } + if (msg.destinationPort() > 0) { + builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { + Attributes.Key key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + builder.set(key, value); + }); + } + pne = InternalProtocolNegotiationEvent + .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); + } finally { + msg.release(); } - pne = InternalProtocolNegotiationEvent - .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 8ae87a6fa..90e358ce3 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -758,7 +758,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HAProxyMessage) { - fillChannelWithMessage((HAProxyMessage) msg, ctx.channel()); + handleWithMessage((HAProxyMessage) msg, ctx.channel()); } else { super.channelRead(ctx, msg); } @@ -771,26 +771,30 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti * @param msg * @param channel */ - private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) { - if (StringUtils.isNotBlank(msg.sourceAddress())) { - channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); - } - if (msg.sourcePort() > 0) { - channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); - } - if (StringUtils.isNotBlank(msg.destinationAddress())) { - channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); - } - if (msg.destinationPort() > 0) { - channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); - } - if (CollectionUtils.isNotEmpty(msg.tlvs())) { - msg.tlvs().forEach(tlv -> { - AttributeKey key = AttributeKeys.valueOf( - HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); - String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); - channel.attr(key).set(value); - }); + private void handleWithMessage(HAProxyMessage msg, Channel channel) { + try { + if (StringUtils.isNotBlank(msg.sourceAddress())) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); + } + if (msg.sourcePort() > 0) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); + } + if (StringUtils.isNotBlank(msg.destinationAddress())) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); + } + if (msg.destinationPort() > 0) { + channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); + } + if (CollectionUtils.isNotEmpty(msg.tlvs())) { + msg.tlvs().forEach(tlv -> { + AttributeKey key = AttributeKeys.valueOf( + HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); + String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); + channel.attr(key).set(value); + }); + } + } finally { + msg.release(); } } } -- 2.32.0.windows.2 From 5914ff8dbb9d37e2cb48ef9c4f0256c6185b4659 Mon Sep 17 00:00:00 2001 From: lyx <56945247+lyx2000@users.noreply.github.com> Date: Sat, 15 Jul 2023 18:15:57 +0800 Subject: [PATCH 7/8] [ISSUE #6968] fix grpc acl bug (#6969) * feat(acl): fix acl bug Signed-off-by: lyx <1419360299@qq.com> # Conflicts: # proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java * add access test for two client Signed-off-by: lyx <1419360299@qq.com> * use specific acl config Signed-off-by: lyx <1419360299@qq.com> * Recovering unchange file Signed-off-by: lyx <1419360299@qq.com> * let test pass Signed-off-by: lyx <1419360299@qq.com> --------- Signed-off-by: lyx <1419360299@qq.com> --- .../acl/plain/PlainAccessResource.java | 21 +- .../acl/RemotingClientAccessTest.java | 189 ++++++++++++++++++ .../access_acl_conf/acl/plain_acl.yml | 31 +++ acl/src/test/resources/conf/acl/plain_acl.yml | 1 - pom.xml | 1 + .../apache/rocketmq/proxy/ProxyStartup.java | 17 +- .../proxy/grpc/GrpcServerBuilder.java | 10 +- .../remoting/RemotingProtocolServer.java | 15 +- 8 files changed, 255 insertions(+), 30 deletions(-) create mode 100644 acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java create mode 100644 acl/src/test/resources/access_acl_conf/acl/plain_acl.yml diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java index cdbd9ea9b..72aa8ca71 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java @@ -223,7 +223,7 @@ public class PlainAccessResource implements AccessResource { if (!request.hasGroup()) { throw new AclException("Consumer heartbeat doesn't have group"); } else { - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); } } } else if (SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { @@ -240,15 +240,15 @@ public class PlainAccessResource implements AccessResource { accessResource.addResourceAndPerm(topic, Permission.PUB); } else if (ReceiveMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { ReceiveMessageRequest request = (ReceiveMessageRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); accessResource.addResourceAndPerm(request.getMessageQueue().getTopic(), Permission.SUB); } else if (AckMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) { AckMessageRequest request = (AckMessageRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); } else if (ForwardMessageToDeadLetterQueueRequest.getDescriptor().getFullName().equals(rpcFullName)) { ForwardMessageToDeadLetterQueueRequest request = (ForwardMessageToDeadLetterQueueRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); } else if (EndTransactionRequest.getDescriptor().getFullName().equals(rpcFullName)) { EndTransactionRequest request = (EndTransactionRequest) messageV3; @@ -264,7 +264,7 @@ public class PlainAccessResource implements AccessResource { } if (command.getSettings().hasSubscription()) { Subscription subscription = command.getSettings().getSubscription(); - accessResource.addResourceAndPerm(subscription.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(subscription.getGroup(), Permission.SUB); for (SubscriptionEntry entry : subscription.getSubscriptionsList()) { accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB); } @@ -275,17 +275,17 @@ public class PlainAccessResource implements AccessResource { } } else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) { NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); } else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) { QueryRouteRequest request = (QueryRouteRequest) messageV3; accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY); } else if (QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) { QueryAssignmentRequest request = (QueryAssignmentRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); } else if (ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName)) { ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) messageV3; - accessResource.addResourceAndPerm(request.getGroup(), Permission.SUB); + accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB); accessResource.addResourceAndPerm(request.getTopic(), Permission.SUB); } } catch (Throwable t) { @@ -299,6 +299,11 @@ public class PlainAccessResource implements AccessResource { addResourceAndPerm(resourceName, permission); } + private void addGroupResourceAndPerm(Resource resource, byte permission) { + String resourceName = NamespaceUtil.wrapNamespace(resource.getResourceNamespace(), resource.getName()); + addResourceAndPerm(getRetryTopic(resourceName), permission); + } + public static PlainAccessResource build(PlainAccessConfig plainAccessConfig, RemoteAddressStrategy remoteAddressStrategy) { PlainAccessResource plainAccessResource = new PlainAccessResource(); plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey()); diff --git a/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java new file mode 100644 index 000000000..88c5e09a9 --- /dev/null +++ b/acl/src/test/java/org/apache/rocketmq/acl/RemotingClientAccessTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.acl; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.AclException; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.acl.plain.AclTestHelper; +import org.apache.rocketmq.acl.plain.PlainAccessResource; +import org.apache.rocketmq.acl.plain.PlainAccessValidator; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RemotingClientAccessTest { + + private PlainAccessValidator plainAccessValidator; + private AclClientRPCHook aclClient; + private SessionCredentials sessionCredentials; + + private File confHome; + + private String clientAddress = "10.7.1.3"; + + @Before + public void init() throws IOException { + String folder = "access_acl_conf"; + confHome = AclTestHelper.copyResources(folder, true); + System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath()); + System.setProperty("rocketmq.acl.plain.file", "/access_acl_conf/acl/plain_acl.yml".replace("/", File.separator)); + + plainAccessValidator = new PlainAccessValidator(); + sessionCredentials = new SessionCredentials(); + sessionCredentials.setAccessKey("rocketmq3"); + sessionCredentials.setSecretKey("12345678"); + aclClient = new AclClientRPCHook(sessionCredentials); + } + + @After + public void cleanUp() { + AclTestHelper.recursiveDelete(confHome); + } + + @Test(expected = AclException.class) + public void testProduceDenyTopic() { + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicD"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + aclClient.doBeforeRequest(clientAddress, remotingCommand); + + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + } + + @Test + public void testProduceAuthorizedTopic() { + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicA"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + aclClient.doBeforeRequest(clientAddress, remotingCommand); + + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + } + + + @Test(expected = AclException.class) + public void testConsumeDenyTopic() { + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + pullMessageRequestHeader.setTopic("topicD"); + pullMessageRequestHeader.setConsumerGroup("groupB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + + } + + @Test + public void testConsumeAuthorizedTopic() { + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + pullMessageRequestHeader.setTopic("topicB"); + pullMessageRequestHeader.setConsumerGroup("groupB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + } + + @Test(expected = AclException.class) + public void testConsumeInDeniedGroup() { + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + pullMessageRequestHeader.setTopic("topicB"); + pullMessageRequestHeader.setConsumerGroup("groupD"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + } + + @Test + public void testConsumeInAuthorizedGroup() { + PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); + pullMessageRequestHeader.setTopic("topicB"); + pullMessageRequestHeader.setConsumerGroup("groupB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); + aclClient.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + try { + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6"); + plainAccessValidator.validate(accessResource); + } catch (RemotingCommandException e) { + e.printStackTrace(); + Assert.fail("Should not throw IOException"); + } + } + +} diff --git a/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml new file mode 100644 index 000000000..28a8c4888 --- /dev/null +++ b/acl/src/test/resources/access_acl_conf/acl/plain_acl.yml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +accounts: + - accessKey: rocketmq3 + secretKey: 12345678 + admin: false + defaultTopicPerm: DENY + defaultGroupPerm: DENY + topicPerms: + - topicA=PUB + - topicB=SUB + - topicC=PUB|SUB + - topicD=DENY + groupPerms: + - groupB=SUB + - groupC=PUB|SUB + - groupD=DENY + diff --git a/acl/src/test/resources/conf/acl/plain_acl.yml b/acl/src/test/resources/conf/acl/plain_acl.yml index 5641a94bf..34e46696d 100644 --- a/acl/src/test/resources/conf/acl/plain_acl.yml +++ b/acl/src/test/resources/conf/acl/plain_acl.yml @@ -41,4 +41,3 @@ accounts: whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true - diff --git a/pom.xml b/pom.xml index 12bc2dbd5..4d5dd1dec 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,7 @@ 4.1.0 0.30 2.11.0 + 5.0.5 2.2 diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index ea13bb808..06d5f4525 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -29,11 +29,14 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.ServiceProvider; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; @@ -75,16 +78,17 @@ public class ProxyStartup { MessagingProcessor messagingProcessor = createMessagingProcessor(); + List accessValidators = loadAccessValidators(); // create grpcServer GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor, ConfigurationManager.getProxyConfig().getGrpcServerPort()) .addService(createServiceProcessor(messagingProcessor)) .addService(ChannelzService.newInstance(100)) .addService(ProtoReflectionService.newInstance()) - .configInterceptor() + .configInterceptor(accessValidators) .build(); PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer); - RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor); + RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor, accessValidators); PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer); // start servers one by one. @@ -109,6 +113,15 @@ public class ProxyStartup { log.info(new Date() + " rocketmq-proxy startup successfully"); } + protected static List loadAccessValidators() { + List accessValidators = ServiceProvider.load(AccessValidator.class); + if (accessValidators.isEmpty()) { + log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator"); + accessValidators.add(new PlainAccessValidator()); + } + return accessValidators; + } + protected static void initConfiguration(CommandLineArgument commandLineArgument) throws Exception { if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) { System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index 437b9216b..9cddd3013 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -28,9 +28,7 @@ import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; -import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.utils.ServiceProvider; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; @@ -98,14 +96,8 @@ public class GrpcServerBuilder { return new GrpcServer(this.serverBuilder.build()); } - public GrpcServerBuilder configInterceptor() { + public GrpcServerBuilder configInterceptor(List accessValidators) { // grpc interceptors, including acl, logging etc. - List accessValidators = ServiceProvider.load(AccessValidator.class); - if (accessValidators.isEmpty()) { - log.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator"); - accessValidators.add(new PlainAccessValidator()); - } - this.serverBuilder.intercept(new AuthenticationInterceptor(accessValidators)); this.serverBuilder diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index f08094c16..bcc9edd09 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -28,15 +27,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.AccessValidator; -import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; @@ -86,11 +84,11 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu protected final ThreadPoolExecutor defaultExecutor; protected final ScheduledExecutorService timerExecutor; - public RemotingProtocolServer(MessagingProcessor messagingProcessor) { + public RemotingProtocolServer(MessagingProcessor messagingProcessor, List accessValidators) { this.messagingProcessor = messagingProcessor; this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService()); - RequestPipeline pipeline = createRequestPipeline(); + RequestPipeline pipeline = createRequestPipeline(accessValidators); this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor); this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor); @@ -254,15 +252,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu return future; } - protected RequestPipeline createRequestPipeline() { + protected RequestPipeline createRequestPipeline(List accessValidators) { RequestPipeline pipeline = (ctx, request, context) -> { }; - - List accessValidatorList = new ArrayList<>(); - accessValidatorList.add(new PlainAccessValidator()); // add pipeline // the last pipe add will execute at the first - return pipeline.pipe(new AuthenticationPipeline(accessValidatorList)); + return pipeline.pipe(new AuthenticationPipeline(accessValidators)); } protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { -- 2.32.0.windows.2 From 440be1ed4ce2af0ab58af6c3019de7075c09c20f Mon Sep 17 00:00:00 2001 From: fuyou001 Date: Mon, 17 Jul 2023 19:23:23 +0800 Subject: [PATCH 8/8] [ISSUE #7031] fix Pop caused broker memory leak bug (#7032) --- .../broker/processor/PopBufferMergeService.java | 17 ++++++++++++++++- .../broker/processor/PopMessageProcessor.java | 11 ++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index d7bc7c694..b7ba8ad4a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -429,9 +429,16 @@ public class PopBufferMergeService extends ServiceThread { * @param nextBeginOffset * @return */ - public void addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { + public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true); + if (this.buffer.containsKey(pointWrapper.getMergeKey())) { + // when mergeKey conflict + // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper + POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ckJustOffset. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey()); + return false; + } + this.putCkToStore(pointWrapper, !checkQueueOk(pointWrapper)); putOffsetQueue(pointWrapper); @@ -440,6 +447,7 @@ public class PopBufferMergeService extends ServiceThread { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]add ck just offset, {}", pointWrapper); } + return true; } public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime, @@ -492,6 +500,13 @@ public class PopBufferMergeService extends ServiceThread { return false; } + if (this.buffer.containsKey(pointWrapper.getMergeKey())) { + // when mergeKey conflict + // will cause PopBufferMergeService.scanCommitOffset cannot poll PopCheckPointWrapper + POP_LOGGER.warn("[PopBuffer]mergeKey conflict when add ck. ck:{}, mergeKey:{}", pointWrapper, pointWrapper.getMergeKey()); + return false; + } + putOffsetQueue(pointWrapper); this.buffer.put(pointWrapper.getMergeKey(), pointWrapper); this.counter.incrementAndGet(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 28549bfed..464f8f4fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -570,7 +570,9 @@ public class PopMessageProcessor implements NettyRequestProcessor { this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic, queueId, finalOffset); } else { - appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName()); + if (!appendCheckPoint(requestHeader, topic, reviveQid, queueId, finalOffset, result, popTime, this.brokerController.getBrokerConfig().getBrokerName())) { + return atomicRestNum.get() + result.getMessageCount(); + } } ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, finalOffset); ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId, @@ -685,7 +687,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { return msgInner; } - private void appendCheckPoint(final PopMessageRequestHeader requestHeader, + private boolean appendCheckPoint(final PopMessageRequestHeader requestHeader, final String topic, final int reviveQid, final int queueId, final long offset, final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) { // add check point msg to revive log @@ -708,10 +710,9 @@ public class PopMessageProcessor implements NettyRequestProcessor { ); if (addBufferSuc) { - return; + return true; } - - this.popBufferMergeService.addCkJustOffset( + return this.popBufferMergeService.addCkJustOffset( ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset() ); } -- 2.32.0.windows.2