diff options
Diffstat (limited to 'patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch')
-rw-r--r-- | patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch | 1939 |
1 files changed, 1939 insertions, 0 deletions
diff --git a/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch b/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch new file mode 100644 index 0000000..356202b --- /dev/null +++ b/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch @@ -0,0 +1,1939 @@ +From 955428278ccd9bfa0f15e21a8d3040c5213358bd Mon Sep 17 00:00:00 2001 +From: Dongyuan Pan <dongyuanpan0@gmail.com> +Date: Tue, 4 Jul 2023 18:01:48 +0800 +Subject: [PATCH 1/5] [ISSUE #6991] Delete rocketmq.client.logUseSlf4j=true in + JAVA_OPT + +--- + distribution/bin/runbroker.cmd | 1 - + distribution/bin/runbroker.sh | 1 - + 2 files changed, 2 deletions(-) + +diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd +index 15f676aa8..77a0d1ff8 100644 +--- a/distribution/bin/runbroker.cmd ++++ b/distribution/bin/runbroker.cmd +@@ -36,7 +36,6 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow" + set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch" + set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g" + set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking" +-set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true" + set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%" + + "%JAVA%" %JAVA_OPT% %* +\ No newline at end of file +diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh +index a081df79e..e6e2132ab 100644 +--- a/distribution/bin/runbroker.sh ++++ b/distribution/bin/runbroker.sh +@@ -106,7 +106,6 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" + JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" + JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" + JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" +-JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" + #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" + JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" + JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" +-- +2.32.0.windows.2 + + +From 00fc42b8be848fc3f5c550cbab007b92f128dc38 Mon Sep 17 00:00:00 2001 +From: ShuangxiDing <dingshuangxi888@gmail.com> +Date: Tue, 4 Jul 2023 18:02:16 +0800 +Subject: [PATCH 2/5] [ISSUE #6957] Support Proxy Protocol for gRPC and + Remoting Server (#6958) + +--- + WORKSPACE | 1 + + .../common/constant/HAProxyConstants.java | 28 ++++ + pom.xml | 5 + + proxy/BUILD.bazel | 2 + + proxy/pom.xml | 4 + + .../proxy/grpc/GrpcServerBuilder.java | 2 +- + ...ava => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++-- + .../proxy/grpc/constant/AttributeKeys.java | 44 ++++++ + .../grpc/interceptor/HeaderInterceptor.java | 32 +++- + .../remoting/MultiProtocolRemotingServer.java | 5 +- + .../remoting/common/RemotingHelper.java | 42 ++++-- + .../remoting/netty/AttributeKeys.java | 45 ++++++ + .../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++-- + .../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++ + .../org/apache/rocketmq/remoting/TlsTest.java | 28 ++-- + 15 files changed, 563 insertions(+), 59 deletions(-) + create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java + rename proxy/src/main/java/org/apache/rocketmq/proxy/grpc/{OptionalSSLProtocolNegotiator.java => ProxyAndTlsProtocolNegotiator.java} (51%) + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java + create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java + create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java + +diff --git a/WORKSPACE b/WORKSPACE +index fbb694efe..e3a8f37dc 100644 +--- a/WORKSPACE ++++ b/WORKSPACE +@@ -104,6 +104,7 @@ maven_install( + "software.amazon.awssdk:s3:2.20.29", + "com.fasterxml.jackson.core:jackson-databind:2.13.4.2", + "com.adobe.testing:s3mock-junit4:2.11.0", ++ "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0", + ], + fetch_sources = True, + repositories = [ +diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java +new file mode 100644 +index 000000000..c1ae0cca1 +--- /dev/null ++++ b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java +@@ -0,0 +1,28 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.common.constant; ++ ++public class HAProxyConstants { ++ ++ public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_"; ++ public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + "addr"; ++ public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + "port"; ++ public static final String PROXY_PROTOCOL_SERVER_ADDR = PROXY_PROTOCOL_PREFIX + "server_addr"; ++ public static final String PROXY_PROTOCOL_SERVER_PORT = PROXY_PROTOCOL_PREFIX + "server_port"; ++ public static final String PROXY_PROTOCOL_TLV_PREFIX = PROXY_PROTOCOL_PREFIX + "tlv_0x"; ++} +diff --git a/pom.xml b/pom.xml +index a3b474602..12bc2dbd5 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -888,6 +888,11 @@ + </exclusion> + </exclusions> + </dependency> ++ <dependency> ++ <groupId>io.github.aliyunmq</groupId> ++ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId> ++ <version>1.0.0</version> ++ </dependency> + <dependency> + <groupId>com.conversantmedia</groupId> + <artifactId>disruptor</artifactId> +diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel +index fcb85e46f..b4f3c16e2 100644 +--- a/proxy/BUILD.bazel ++++ b/proxy/BUILD.bazel +@@ -46,6 +46,7 @@ java_library( + "@maven//:io_grpc_grpc_services", + "@maven//:io_grpc_grpc_stub", + "@maven//:io_netty_netty_all", ++ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy", + "@maven//:io_openmessaging_storage_dledger", + "@maven//:io_opentelemetry_opentelemetry_api", + "@maven//:io_opentelemetry_opentelemetry_exporter_otlp", +@@ -94,6 +95,7 @@ java_library( + "@maven//:io_grpc_grpc_netty_shaded", + "@maven//:io_grpc_grpc_stub", + "@maven//:io_netty_netty_all", ++ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy", + "@maven//:org_apache_commons_commons_lang3", + "@maven//:io_opentelemetry_opentelemetry_exporter_otlp", + "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus", +diff --git a/proxy/pom.xml b/proxy/pom.xml +index f14155737..3fbea107a 100644 +--- a/proxy/pom.xml ++++ b/proxy/pom.xml +@@ -75,6 +75,10 @@ + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + </dependency> ++ <dependency> ++ <groupId>io.github.aliyunmq</groupId> ++ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId> ++ </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +index 0ca6a1fcb..437b9216b 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +@@ -50,7 +50,7 @@ public class GrpcServerBuilder { + protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) { + serverBuilder = NettyServerBuilder.forPort(port); + +- serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator()); ++ serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); + + // build server + int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum(); +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +similarity index 51% +rename from proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java +rename to proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +index 670e1c1a2..ceb9becc0 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java +@@ -16,36 +16,53 @@ + */ + package org.apache.rocketmq.proxy.grpc; + ++import io.grpc.Attributes; + import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler; + import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; + import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent; + import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator; + import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators; ++import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent; + import io.grpc.netty.shaded.io.netty.buffer.ByteBuf; + import io.grpc.netty.shaded.io.netty.channel.ChannelHandler; + import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; ++import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter; + import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder; ++import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult; ++import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState; ++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage; ++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder; ++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion; + import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; + import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; + import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler; + import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; + import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; + import io.grpc.netty.shaded.io.netty.util.AsciiString; +-import java.io.InputStream; +-import java.nio.file.Files; +-import java.nio.file.Paths; +-import java.util.List; ++import io.grpc.netty.shaded.io.netty.util.CharsetUtil; ++import org.apache.commons.collections.CollectionUtils; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; ++import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; + import org.apache.rocketmq.remoting.common.TlsMode; + import org.apache.rocketmq.remoting.netty.TlsSystemConfig; + +-public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { ++import java.io.InputStream; ++import java.nio.file.Files; ++import java.nio.file.Paths; ++import java.util.List; ++ ++public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + ++ private static final String HA_PROXY_DECODER = "HAProxyDecoder"; ++ private static final String HA_PROXY_HANDLER = "HAProxyHandler"; ++ private static final String TLS_MODE_HANDLER = "TlsModeHandler"; + /** + * the length of the ssl record header (in bytes) + */ +@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + + private static SslContext sslContext; + +- public OptionalSSLProtocolNegotiator() { ++ public ProxyAndTlsProtocolNegotiator() { + sslContext = loadSslContext(); + } + +@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + + @Override + public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { +- return new PortUnificationServerHandler(grpcHandler); ++ return new ProxyAndTlsProtocolHandler(grpcHandler); + } + + @Override +- public void close() {} ++ public void close() { ++ } + + private static SslContext loadSslContext() { + try { +@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath(); + try (InputStream serverKeyInputStream = Files.newInputStream( + Paths.get(tlsKeyPath)); +- InputStream serverCertificateStream = Files.newInputStream( +- Paths.get(tlsCertPath))) { ++ InputStream serverCertificateStream = Files.newInputStream( ++ Paths.get(tlsCertPath))) { + SslContext res = GrpcSslContexts.forServer(serverCertificateStream, + serverKeyInputStream) + .trustManager(InsecureTrustManagerFactory.INSTANCE) +@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + } + } + +- public static class PortUnificationServerHandler extends ByteToMessageDecoder { ++ private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder { ++ ++ private final GrpcHttp2ConnectionHandler grpcHandler; ++ ++ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) { ++ this.grpcHandler = grpcHandler; ++ } ++ ++ @Override ++ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { ++ try { ++ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol( ++ in); ++ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { ++ return; ++ } ++ if (ha.state() == ProtocolDetectionState.DETECTED) { ++ ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) ++ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) ++ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler)); ++ } else { ++ ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler)); ++ } ++ ++ ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); ++ ctx.pipeline().remove(this); ++ } catch (Exception e) { ++ log.error("process proxy protocol negotiator failed.", e); ++ throw e; ++ } ++ } ++ } ++ ++ private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { ++ ++ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); ++ ++ @Override ++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ++ if (msg instanceof HAProxyMessage) { ++ replaceEventWithMessage((HAProxyMessage) msg); ++ ctx.fireUserEventTriggered(pne); ++ } else { ++ super.channelRead(ctx, msg); ++ } ++ ctx.pipeline().remove(this); ++ } ++ ++ /** ++ * The definition of key refers to the implementation of nginx ++ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a> ++ * ++ * @param msg ++ */ ++ private void replaceEventWithMessage(HAProxyMessage msg) { ++ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder(); ++ if (StringUtils.isNotBlank(msg.sourceAddress())) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress()); ++ } ++ if (msg.sourcePort() > 0) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort())); ++ } ++ if (StringUtils.isNotBlank(msg.destinationAddress())) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress()); ++ } ++ if (msg.destinationPort() > 0) { ++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort())); ++ } ++ if (CollectionUtils.isNotEmpty(msg.tlvs())) { ++ msg.tlvs().forEach(tlv -> { ++ Attributes.Key<String> key = AttributeKeys.valueOf( ++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); ++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ builder.set(key, value); ++ }); ++ } ++ pne = InternalProtocolNegotiationEvent ++ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build()); ++ } ++ } ++ ++ private static class TlsModeHandler extends ByteToMessageDecoder { ++ ++ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault(); + + private final ChannelHandler ssl; + private final ChannelHandler plaintext; + +- public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) { ++ public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) { + this.ssl = InternalProtocolNegotiators.serverTls(sslContext) + .newHandler(grpcHandler); + this.plaintext = InternalProtocolNegotiators.serverPlaintext() +@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + } + + @Override +- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) +- throws Exception { ++ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { + try { + TlsMode tlsMode = TlsSystemConfig.tlsMode; + if (TlsMode.ENFORCING.equals(tlsMode)) { +@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator + ctx.pipeline().addAfter(ctx.name(), null, this.plaintext); + } + } +- ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault()); ++ ctx.fireUserEventTriggered(pne); + ctx.pipeline().remove(this); + } catch (Exception e) { + log.error("process ssl protocol negotiator failed.", e); + throw e; + } + } ++ ++ @Override ++ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ++ if (evt instanceof ProtocolNegotiationEvent) { ++ pne = (ProtocolNegotiationEvent) evt; ++ } else { ++ super.userEventTriggered(ctx, evt); ++ } ++ } + } + } +\ No newline at end of file +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java +new file mode 100644 +index 000000000..096a5ba3d +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java +@@ -0,0 +1,44 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.proxy.grpc.constant; ++ ++import io.grpc.Attributes; ++import org.apache.rocketmq.common.constant.HAProxyConstants; ++ ++import java.util.Map; ++import java.util.concurrent.ConcurrentHashMap; ++ ++public class AttributeKeys { ++ ++ public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR = ++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR); ++ ++ public static final Attributes.Key<String> PROXY_PROTOCOL_PORT = ++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT); ++ ++ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR = ++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR); ++ ++ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT = ++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT); ++ ++ private static final Map<String, Attributes.Key<String>> ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>(); ++ ++ public static Attributes.Key<String> valueOf(String name) { ++ return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> Attributes.Key.create(name)); ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java +index 1cbb00361..13893e5ed 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java +@@ -18,11 +18,16 @@ + package org.apache.rocketmq.proxy.grpc.interceptor; + + import com.google.common.net.HostAndPort; ++import io.grpc.Attributes; + import io.grpc.Grpc; + import io.grpc.Metadata; + import io.grpc.ServerCall; + import io.grpc.ServerCallHandler; + import io.grpc.ServerInterceptor; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.constant.HAProxyConstants; ++import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys; ++ + import java.net.InetSocketAddress; + import java.net.SocketAddress; + +@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor { + Metadata headers, + ServerCallHandler<R, W> next + ) { +- SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); +- String remoteAddress = parseSocketAddress(remoteSocketAddress); ++ String remoteAddress = getProxyProtocolAddress(call.getAttributes()); ++ if (StringUtils.isBlank(remoteAddress)) { ++ SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); ++ remoteAddress = parseSocketAddress(remoteSocketAddress); ++ } + headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress); + + SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); + String localAddress = parseSocketAddress(localSocketAddress); + headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress); ++ ++ for (Attributes.Key<?> key : call.getAttributes().keys()) { ++ if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) { ++ continue; ++ } ++ Metadata.Key<String> headerKey ++ = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER); ++ String headerValue = String.valueOf(call.getAttributes().get(key)); ++ headers.put(headerKey, headerValue); ++ } ++ + return next.startCall(call, headers); + } + +@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor { + + return ""; + } ++ ++ private String getProxyProtocolAddress(Attributes attributes) { ++ String proxyProtocolAddr = attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR); ++ String proxyProtocolPort = attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT); ++ if (StringUtils.isBlank(proxyProtocolAddr) || StringUtils.isBlank(proxyProtocolPort)) { ++ return null; ++ } ++ return proxyProtocolAddr + ":" + proxyProtocolPort; ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +index 1142132b7..858b1f022 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting; + import io.netty.channel.ChannelPipeline; + import io.netty.channel.socket.SocketChannel; + import io.netty.handler.timeout.IdleStateHandler; +-import java.io.IOException; +-import java.security.cert.CertificateException; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer; + import org.apache.rocketmq.remoting.netty.NettyServerConfig; + import org.apache.rocketmq.remoting.netty.TlsSystemConfig; + ++import java.io.IOException; ++import java.security.cert.CertificateException; ++ + /** + * support remoting and http2 protocol at one port + */ +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +index 75e25a83a..d0750b678 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture; + import io.netty.channel.ChannelFutureListener; + import io.netty.util.Attribute; + import io.netty.util.AttributeKey; +-import java.io.IOException; +-import java.lang.reflect.Field; +-import java.net.InetSocketAddress; +-import java.net.SocketAddress; +-import java.nio.ByteBuffer; +-import java.nio.channels.SocketChannel; +-import java.util.HashMap; +-import java.util.Map; ++import org.apache.commons.lang3.StringUtils; ++import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.logging.org.slf4j.Logger; +@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; + import org.apache.rocketmq.remoting.protocol.RequestCode; + import org.apache.rocketmq.remoting.protocol.ResponseCode; + ++import java.io.IOException; ++import java.lang.reflect.Field; ++import java.net.InetSocketAddress; ++import java.net.SocketAddress; ++import java.nio.ByteBuffer; ++import java.nio.channels.SocketChannel; ++import java.util.HashMap; ++import java.util.Map; ++ + public class RemotingHelper { + public static final String DEFAULT_CHARSET = "UTF-8"; + public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0"; +@@ -50,6 +53,9 @@ public class RemotingHelper { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr"); + ++ private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); ++ private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); ++ + public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId"); + + public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version"); +@@ -203,12 +209,16 @@ public class RemotingHelper { + if (null == channel) { + return ""; + } ++ String addr = getProxyProtocolAddress(channel); ++ if (StringUtils.isNotBlank(addr)) { ++ return addr; ++ } + Attribute<String> att = channel.attr(REMOTE_ADDR_KEY); + if (att == null) { + // mocked in unit test + return parseChannelRemoteAddr0(channel); + } +- String addr = att.get(); ++ addr = att.get(); + if (addr == null) { + addr = parseChannelRemoteAddr0(channel); + att.set(addr); +@@ -216,6 +226,18 @@ public class RemotingHelper { + return addr; + } + ++ private static String getProxyProtocolAddress(Channel channel) { ++ if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) { ++ return null; ++ } ++ String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel); ++ String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel); ++ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) { ++ return null; ++ } ++ return proxyProtocolAddr + ":" + proxyProtocolPort; ++ } ++ + private static String parseChannelRemoteAddr0(final Channel channel) { + SocketAddress remote = channel.remoteAddress(); + final String addr = remote != null ? remote.toString() : ""; +@@ -255,7 +277,7 @@ public class RemotingHelper { + return ""; + } + +- public static int parseSocketAddressPort(SocketAddress socketAddress) { ++ public static Integer parseSocketAddressPort(SocketAddress socketAddress) { + if (socketAddress instanceof InetSocketAddress) { + return ((InetSocketAddress) socketAddress).getPort(); + } +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java +new file mode 100644 +index 000000000..4e69ab82d +--- /dev/null ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.remoting.netty; ++ ++ ++import io.netty.util.AttributeKey; ++import org.apache.rocketmq.common.constant.HAProxyConstants; ++ ++import java.util.Map; ++import java.util.concurrent.ConcurrentHashMap; ++ ++public class AttributeKeys { ++ ++ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR = ++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR); ++ ++ public static final AttributeKey<String> PROXY_PROTOCOL_PORT = ++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT); ++ ++ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR = ++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR); ++ ++ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT = ++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT); ++ ++ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>(); ++ ++ public static AttributeKey<String> valueOf(String name) { ++ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf); ++ } ++} +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 9f39d672e..94ffd8d07 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler; + import io.netty.channel.ChannelFuture; + import io.netty.channel.ChannelHandler; + import io.netty.channel.ChannelHandlerContext; ++import io.netty.channel.ChannelInboundHandlerAdapter; + import io.netty.channel.ChannelInitializer; + import io.netty.channel.ChannelOption; + import io.netty.channel.ChannelPipeline; +@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel; + import io.netty.channel.nio.NioEventLoopGroup; + import io.netty.channel.socket.SocketChannel; + import io.netty.channel.socket.nio.NioServerSocketChannel; ++import io.netty.handler.codec.ProtocolDetectionResult; ++import io.netty.handler.codec.ProtocolDetectionState; ++import io.netty.handler.codec.haproxy.HAProxyMessage; ++import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; ++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; + import io.netty.handler.timeout.IdleState; + import io.netty.handler.timeout.IdleStateEvent; + import io.netty.handler.timeout.IdleStateHandler; ++import io.netty.util.AttributeKey; ++import io.netty.util.CharsetUtil; + import io.netty.util.HashedWheelTimer; + import io.netty.util.Timeout; + import io.netty.util.TimerTask; + import io.netty.util.concurrent.DefaultEventExecutorGroup; +-import java.io.IOException; +-import java.net.InetSocketAddress; +-import java.security.cert.CertificateException; +-import java.util.NoSuchElementException; +-import java.util.concurrent.ConcurrentHashMap; +-import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; +-import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ScheduledThreadPoolExecutor; +-import java.util.concurrent.ThreadPoolExecutor; +-import java.util.concurrent.TimeUnit; ++import org.apache.commons.collections.CollectionUtils; ++import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.common.Pair; + import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.common.constant.HAProxyConstants; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.logging.org.slf4j.Logger; +@@ -71,6 +70,19 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; + import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + ++import java.io.IOException; ++import java.net.InetSocketAddress; ++import java.security.cert.CertificateException; ++import java.util.NoSuchElementException; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ConcurrentMap; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ScheduledThreadPoolExecutor; ++import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; ++ + @SuppressWarnings("NullableProblems") + public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); +@@ -96,6 +108,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>(); + + public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; ++ public static final String HA_PROXY_DECODER = "HAProxyDecoder"; ++ public static final String HA_PROXY_HANDLER = "HAProxyHandler"; ++ public static final String TLS_MODE_HANDLER = "TlsModeHandler"; + public static final String TLS_HANDLER_NAME = "sslHandler"; + public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + +@@ -387,7 +402,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + + private void prepareSharableHandlers() { +- handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); ++ handshakeHandler = new HandshakeHandler(); + encoder = new NettyEncoder(); + connectionManageHandler = new NettyConnectManageHandler(); + serverHandler = new NettyServerHandler(); +@@ -437,11 +452,51 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + @ChannelHandler.Sharable + public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { + ++ private final TlsModeHandler tlsModeHandler; ++ ++ public HandshakeHandler() { ++ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); ++ } ++ ++ @Override ++ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { ++ try { ++ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in); ++ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { ++ return; ++ } ++ if (ha.state() == ProtocolDetectionState.DETECTED) { ++ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) ++ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) ++ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); ++ } else { ++ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler); ++ } ++ ++ try { ++ // Remove this handler ++ ctx.pipeline().remove(this); ++ } catch (NoSuchElementException e) { ++ log.error("Error while removing HandshakeHandler", e); ++ } ++ ++ // Hand over this message to the next . ++ ctx.fireChannelRead(in.retain()); ++ } catch (Exception e) { ++ log.error("process proxy protocol negotiator failed.", e); ++ throw e; ++ } ++ } ++ } ++ ++ @ChannelHandler.Sharable ++ public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> { ++ + private final TlsMode tlsMode; + + private static final byte HANDSHAKE_MAGIC_CODE = 0x16; + +- HandshakeHandler(TlsMode tlsMode) { ++ TlsModeHandler(TlsMode tlsMode) { + this.tlsMode = tlsMode; + } + +@@ -461,7 +516,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + case ENFORCING: + if (null != sslContext) { + ctx.pipeline() +- .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) ++ .addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) + .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); + log.info("Handlers prepended to channel pipeline to establish SSL connection"); + } else { +@@ -483,7 +538,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + // Remove this handler + ctx.pipeline().remove(this); + } catch (NoSuchElementException e) { +- log.error("Error while removing HandshakeHandler", e); ++ log.error("Error while removing TlsModeHandler", e); + } + + // Hand over this message to the next . +@@ -706,4 +761,46 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + return NettyRemotingServer.this.getCallbackExecutor(); + } + } ++ ++ public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter { ++ ++ @Override ++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ++ if (msg instanceof HAProxyMessage) { ++ fillChannelWithMessage((HAProxyMessage) msg, ctx.channel()); ++ } else { ++ super.channelRead(ctx, msg); ++ } ++ ctx.pipeline().remove(this); ++ } ++ ++ /** ++ * The definition of key refers to the implementation of nginx ++ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a> ++ * @param msg ++ * @param channel ++ */ ++ private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) { ++ if (StringUtils.isNotBlank(msg.sourceAddress())) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress()); ++ } ++ if (msg.sourcePort() > 0) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort())); ++ } ++ if (StringUtils.isNotBlank(msg.destinationAddress())) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress()); ++ } ++ if (msg.destinationPort() > 0) { ++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort())); ++ } ++ if (CollectionUtils.isNotEmpty(msg.tlvs())) { ++ msg.tlvs().forEach(tlv -> { ++ AttributeKey<String> key = AttributeKeys.valueOf( ++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue())); ++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8)); ++ channel.attr(key).set(value); ++ }); ++ } ++ } ++ } + } +diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java +new file mode 100644 +index 000000000..c39fd2132 +--- /dev/null ++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java +@@ -0,0 +1,116 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.remoting; ++ ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.Unpooled; ++import io.netty.channel.Channel; ++import io.netty.handler.codec.haproxy.HAProxyCommand; ++import io.netty.handler.codec.haproxy.HAProxyMessage; ++import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; ++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; ++import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; ++import org.apache.rocketmq.common.utils.NetworkUtil; ++import org.apache.rocketmq.remoting.netty.NettyClientConfig; ++import org.apache.rocketmq.remoting.netty.NettyRemotingClient; ++import org.apache.rocketmq.remoting.protocol.LanguageCode; ++import org.apache.rocketmq.remoting.protocol.RemotingCommand; ++import org.junit.Before; ++import org.junit.Test; ++import org.junit.runner.RunWith; ++import org.mockito.junit.MockitoJUnitRunner; ++ ++import java.io.IOException; ++import java.lang.reflect.Method; ++import java.net.Socket; ++import java.time.Duration; ++import java.util.concurrent.TimeUnit; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++import static org.awaitility.Awaitility.await; ++import static org.junit.Assert.assertNotNull; ++ ++@RunWith(MockitoJUnitRunner.class) ++public class ProxyProtocolTest { ++ ++ private RemotingServer remotingServer; ++ private RemotingClient remotingClient; ++ ++ @Before ++ public void setUp() throws Exception { ++ NettyClientConfig clientConfig = new NettyClientConfig(); ++ clientConfig.setUseTLS(false); ++ ++ remotingServer = RemotingServerTest.createRemotingServer(); ++ remotingClient = RemotingServerTest.createRemotingClient(clientConfig); ++ ++ await().pollDelay(Duration.ofMillis(10)) ++ .pollInterval(Duration.ofMillis(10)) ++ .atMost(20, TimeUnit.SECONDS).until(() -> isHostConnectable(getServerAddress())); ++ } ++ ++ @Test ++ public void testProxyProtocol() throws Exception { ++ sendHAProxyMessage(remotingClient); ++ requestThenAssertResponse(remotingClient); ++ } ++ ++ private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception { ++ RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3); ++ assertNotNull(response); ++ assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA); ++ assertThat(response.getExtFields()).hasSize(2); ++ assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome"); ++ } ++ ++ private void sendHAProxyMessage(RemotingClient remotingClient) throws Exception { ++ Method getAndCreateChannel = NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", String.class); ++ getAndCreateChannel.setAccessible(true); ++ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) remotingClient; ++ Channel channel = (Channel) getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress()); ++ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, ++ HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000); ++ ++ ByteBuf byteBuf = Unpooled.directBuffer(); ++ Method encode = HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, ByteBuf.class); ++ encode.setAccessible(true); ++ encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf); ++ channel.writeAndFlush(byteBuf).sync(); ++ } ++ ++ private static RemotingCommand createRequest() { ++ RequestHeader requestHeader = new RequestHeader(); ++ requestHeader.setCount(1); ++ requestHeader.setMessageTitle("Welcome"); ++ return RemotingCommand.createRequestCommand(0, requestHeader); ++ } ++ ++ ++ private String getServerAddress() { ++ return "localhost:" + remotingServer.localListenPort(); ++ } ++ ++ private boolean isHostConnectable(String addr) { ++ try (Socket socket = new Socket()) { ++ socket.connect(NetworkUtil.string2SocketAddress(addr)); ++ return true; ++ } catch (IOException ignored) { ++ } ++ return false; ++ } ++} +diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +index 3da7abf57..de7edbbfb 100644 +--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java ++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java +@@ -17,19 +17,6 @@ + + package org.apache.rocketmq.remoting; + +-import java.io.BufferedInputStream; +-import java.io.BufferedOutputStream; +-import java.io.BufferedWriter; +-import java.io.File; +-import java.io.FileOutputStream; +-import java.io.FileWriter; +-import java.io.IOException; +-import java.io.InputStream; +-import java.io.PrintWriter; +-import java.net.Socket; +-import java.time.Duration; +-import java.util.UUID; +-import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.common.utils.NetworkUtil; + import org.apache.rocketmq.remoting.common.TlsMode; + import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +@@ -47,6 +34,20 @@ import org.junit.rules.TestName; + import org.junit.runner.RunWith; + import org.mockito.junit.MockitoJUnitRunner; + ++import java.io.BufferedInputStream; ++import java.io.BufferedOutputStream; ++import java.io.BufferedWriter; ++import java.io.File; ++import java.io.FileOutputStream; ++import java.io.FileWriter; ++import java.io.IOException; ++import java.io.InputStream; ++import java.io.PrintWriter; ++import java.net.Socket; ++import java.time.Duration; ++import java.util.UUID; ++import java.util.concurrent.TimeUnit; ++ + import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER; + import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH; + import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD; +@@ -234,6 +235,7 @@ public class TlsTest { + @Test + public void serverAcceptsUntrustedClientCert() throws Exception { + requestThenAssertResponse(); ++// Thread.sleep(1000000L); + } + + /** +-- +2.32.0.windows.2 + + +From 4f840afcb04f5cc328795896198c6fba96ff37ec Mon Sep 17 00:00:00 2001 +From: mxsm <ljbmxsm@gmail.com> +Date: Wed, 5 Jul 2023 11:03:52 +0800 +Subject: [PATCH 3/5] [ISSUE #6960] Added Slot formatting sketch comments + (#6961) + +--- + .../java/org/apache/rocketmq/store/timer/Slot.java | 10 +++++++++- + 1 file changed, 9 insertions(+), 1 deletion(-) + +diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java +index b91193b94..2da846cee 100644 +--- a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java ++++ b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java +@@ -16,9 +16,17 @@ + */ + package org.apache.rocketmq.store.timer; + ++/** ++ * Represents a slot of timing wheel. Format: ++ * ┌────────────┬───────────┬───────────┬───────────┬───────────┐ ++ * │delayed time│ first pos │ last pos │ num │ magic │ ++ * ├────────────┼───────────┼───────────┼───────────┼───────────┤ ++ * │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │ ++ * └────────────┴───────────┴───────────┴───────────┴───────────┘ ++ */ + public class Slot { + public static final short SIZE = 32; +- public final long timeMs; ++ public final long timeMs; //delayed time + public final long firstPos; + public final long lastPos; + public final int num; +-- +2.32.0.windows.2 + + +From 58550f074ec101c0a158ede0df1839950e08837a Mon Sep 17 00:00:00 2001 +From: rongtong <jinrongtong5@163.com> +Date: Mon, 10 Jul 2023 14:13:18 +0800 +Subject: [PATCH 4/5] [ISSUE #7008] Fix the issue of protocol parsing failure + when using haproxy and tls together (#7009) + +--- + .../remoting/netty/NettyRemotingServer.java | 14 +++++++------- + 1 file changed, 7 insertions(+), 7 deletions(-) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +index 94ffd8d07..445f06cc6 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +@@ -459,13 +459,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + + @Override +- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { ++ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) { + try { +- ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in); +- if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { ++ ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf); ++ if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + return; + } +- if (ha.state() == ProtocolDetectionState.DETECTED) { ++ if (detectionResult.state() == ProtocolDetectionState.DETECTED) { + ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder()) + .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler()) + .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler); +@@ -481,7 +481,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + } + + // Hand over this message to the next . +- ctx.fireChannelRead(in.retain()); ++ ctx.fireChannelRead(byteBuf.retain()); + } catch (Exception e) { + log.error("process proxy protocol negotiator failed.", e); + throw e; +@@ -503,8 +503,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { + +- // Peek the first byte to determine if the content is starting with TLS handshake +- byte b = msg.getByte(0); ++ // Peek the current read index byte to determine if the content is starting with TLS handshake ++ byte b = msg.getByte(msg.readerIndex()); + + if (b == HANDSHAKE_MAGIC_CODE) { + switch (tlsMode) { +-- +2.32.0.windows.2 + + +From 8e6b5e62bd4da78c0a7d265891c52685fcffd08a Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan <zhouxzhan@apache.org> +Date: Mon, 10 Jul 2023 20:14:17 +0800 +Subject: [PATCH 5/5] [ISSUE #6999] Add interface ReceiptHandleManager (#7000) + +* Add interface ReceiptHandleManager + +* fix unit test + +* fix +--- + .../processor/ReceiptHandleProcessor.java | 10 +- + .../receipt/DefaultReceiptHandleManager.java | 282 ++++++++++++++++++ + .../service/receipt/ReceiptHandleManager.java | 260 +--------------- + ...a => DefaultReceiptHandleManagerTest.java} | 34 +-- + 4 files changed, 307 insertions(+), 279 deletions(-) + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java + rename proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/{ReceiptHandleManagerTest.java => DefaultReceiptHandleManagerTest.java} (93%) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +index 9c7e8dea9..fc49e7622 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; +-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager; ++import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager; + import org.apache.rocketmq.proxy.service.ServiceManager; + + public class ReceiptHandleProcessor extends AbstractProcessor { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); +- protected ReceiptHandleManager receiptHandleManager; ++ protected DefaultReceiptHandleManager receiptHandleManager; + + public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { + super(messagingProcessor, serviceManager); +@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { + event.getFuture().complete(v); + }); + }; +- this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); ++ this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); + } + + protected ProxyContext createContext(String actionName) { +@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends AbstractProcessor { + } + + public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { +- receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle); + } + + public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) { +- return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle); ++ return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); + } + + public static class ReceiptHandleGroupKey { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +new file mode 100644 +index 000000000..c7633d658 +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +@@ -0,0 +1,282 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.service.receipt; ++ ++import com.google.common.base.Stopwatch; ++import io.netty.channel.Channel; ++import java.util.Map; ++import java.util.Set; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ConcurrentMap; ++import java.util.concurrent.Executors; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; ++import org.apache.rocketmq.broker.client.ClientChannelInfo; ++import org.apache.rocketmq.broker.client.ConsumerGroupEvent; ++import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; ++import org.apache.rocketmq.broker.client.ConsumerManager; ++import org.apache.rocketmq.client.consumer.AckResult; ++import org.apache.rocketmq.client.consumer.AckStatus; ++import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.consumer.ReceiptHandle; ++import org.apache.rocketmq.common.state.StateEventListener; ++import org.apache.rocketmq.common.thread.ThreadPoolMonitor; ++import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; ++import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; ++import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.proxy.common.RenewEvent; ++import org.apache.rocketmq.proxy.common.MessageReceiptHandle; ++import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.common.ProxyException; ++import org.apache.rocketmq.proxy.common.ProxyExceptionCode; ++import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; ++import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; ++import org.apache.rocketmq.proxy.common.channel.ChannelHelper; ++import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; ++import org.apache.rocketmq.proxy.config.ConfigurationManager; ++import org.apache.rocketmq.proxy.config.ProxyConfig; ++import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; ++import org.apache.rocketmq.proxy.service.metadata.MetadataService; ++import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; ++import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; ++ ++public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager { ++ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); ++ protected final MetadataService metadataService; ++ protected final ConsumerManager consumerManager; ++ protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; ++ protected final StateEventListener<RenewEvent> eventListener; ++ protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); ++ protected final ScheduledExecutorService scheduledExecutorService = ++ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); ++ protected final ThreadPoolExecutor renewalWorkerService; ++ ++ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { ++ this.metadataService = metadataService; ++ this.consumerManager = consumerManager; ++ this.eventListener = eventListener; ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( ++ proxyConfig.getRenewThreadPoolNums(), ++ proxyConfig.getRenewMaxThreadPoolNums(), ++ 1, TimeUnit.MINUTES, ++ "RenewalWorkerThread", ++ proxyConfig.getRenewThreadPoolQueueCapacity() ++ ); ++ consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { ++ @Override ++ public void handle(ConsumerGroupEvent event, String group, Object... args) { ++ if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { ++ if (args == null || args.length < 1) { ++ return; ++ } ++ if (args[0] instanceof ClientChannelInfo) { ++ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; ++ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { ++ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy ++ return; ++ } ++ clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); ++ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); ++ } ++ } ++ } ++ ++ @Override ++ public void shutdown() { ++ ++ } ++ }); ++ this.receiptHandleGroupMap = new ConcurrentHashMap<>(); ++ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); ++ this.appendStartAndShutdown(new StartAndShutdown() { ++ @Override ++ public void start() throws Exception { ++ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, ++ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); ++ } ++ ++ @Override ++ public void shutdown() throws Exception { ++ scheduledExecutorService.shutdown(); ++ clearAllHandle(); ++ } ++ }); ++ } ++ ++ public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { ++ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), ++ k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); ++ } ++ ++ public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) { ++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); ++ if (handleGroup == null) { ++ return null; ++ } ++ return handleGroup.remove(msgID, receiptHandle); ++ } ++ ++ protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { ++ return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; ++ } ++ ++ protected void scheduleRenewTask() { ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ try { ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { ++ ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); ++ if (clientIsOffline(key)) { ++ clearGroup(key); ++ continue; ++ } ++ ++ ReceiptHandleGroup group = entry.getValue(); ++ group.scan((msgID, handleStr, v) -> { ++ long current = System.currentTimeMillis(); ++ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); ++ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { ++ return; ++ } ++ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); ++ }); ++ } ++ } catch (Exception e) { ++ log.error("unexpect error when schedule renew task", e); ++ } ++ ++ log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); ++ } ++ ++ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { ++ try { ++ group.computeIfPresent(msgID, handleStr, this::startRenewMessage); ++ } catch (Exception e) { ++ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); ++ } ++ } ++ ++ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { ++ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ long current = System.currentTimeMillis(); ++ try { ++ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { ++ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); ++ return CompletableFuture.completedFuture(null); ++ } ++ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); ++ future.whenComplete((ackResult, throwable) -> { ++ if (throwable != null) { ++ log.error("error when renew. handle:{}", messageReceiptHandle, throwable); ++ if (renewExceptionNeedRetry(throwable)) { ++ messageReceiptHandle.incrementAndGetRenewRetryTimes(); ++ resFuture.complete(messageReceiptHandle); ++ } else { ++ resFuture.complete(null); ++ } ++ } else if (AckStatus.OK.equals(ackResult.getStatus())) { ++ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); ++ messageReceiptHandle.resetRenewRetryTimes(); ++ messageReceiptHandle.incrementRenewTimes(); ++ resFuture.complete(messageReceiptHandle); ++ } else { ++ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); ++ resFuture.complete(null); ++ } ++ }); ++ } else { ++ ProxyContext context = createContext("RenewMessage"); ++ SubscriptionGroupConfig subscriptionGroupConfig = ++ metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); ++ if (subscriptionGroupConfig == null) { ++ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); ++ return CompletableFuture.completedFuture(null); ++ } ++ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); ++ future.whenComplete((ackResult, throwable) -> { ++ if (throwable != null) { ++ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); ++ } ++ resFuture.complete(null); ++ }); ++ } ++ } catch (Throwable t) { ++ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); ++ resFuture.complete(null); ++ } ++ return resFuture; ++ } ++ ++ protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { ++ if (key == null) { ++ return; ++ } ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); ++ if (handleGroup == null) { ++ return; ++ } ++ handleGroup.scan((msgID, handle, v) -> { ++ try { ++ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); ++ return CompletableFuture.completedFuture(null); ++ }); ++ } catch (Exception e) { ++ log.error("error when clear handle for group. key:{}", key, e); ++ } ++ }); ++ } ++ ++ protected void clearAllHandle() { ++ log.info("start clear all handle in receiptHandleProcessor"); ++ Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); ++ for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { ++ clearGroup(key); ++ } ++ log.info("clear all handle in receiptHandleProcessor done"); ++ } ++ ++ protected boolean renewExceptionNeedRetry(Throwable t) { ++ t = ExceptionUtils.getRealException(t); ++ if (t instanceof ProxyException) { ++ ProxyException proxyException = (ProxyException) t; ++ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || ++ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { ++ return false; ++ } ++ } ++ return true; ++ } ++ ++ protected ProxyContext createContext(String actionName) { ++ return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java +index f3b805624..6a8888e97 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java +@@ -17,266 +17,12 @@ + + package org.apache.rocketmq.proxy.service.receipt; + +-import com.google.common.base.Stopwatch; + import io.netty.channel.Channel; +-import java.util.Map; +-import java.util.Set; +-import java.util.concurrent.CompletableFuture; +-import java.util.concurrent.ConcurrentHashMap; +-import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Executors; +-import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ThreadPoolExecutor; +-import java.util.concurrent.TimeUnit; +-import org.apache.rocketmq.broker.client.ClientChannelInfo; +-import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +-import org.apache.rocketmq.broker.client.ConsumerManager; +-import org.apache.rocketmq.client.consumer.AckResult; +-import org.apache.rocketmq.client.consumer.AckStatus; +-import org.apache.rocketmq.common.ThreadFactoryImpl; +-import org.apache.rocketmq.common.constant.LoggerName; +-import org.apache.rocketmq.common.consumer.ReceiptHandle; +-import org.apache.rocketmq.common.state.StateEventListener; +-import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +-import org.apache.rocketmq.common.utils.StartAndShutdown; +-import org.apache.rocketmq.logging.org.slf4j.Logger; +-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +-import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; +-import org.apache.rocketmq.proxy.common.ProxyException; +-import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; +-import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; +-import org.apache.rocketmq.proxy.config.ConfigurationManager; +-import org.apache.rocketmq.proxy.config.ProxyConfig; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; +-import org.apache.rocketmq.proxy.service.metadata.MetadataService; +-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; +-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +-public class ReceiptHandleManager extends AbstractStartAndShutdown { +- protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); +- protected final MetadataService metadataService; +- protected final ConsumerManager consumerManager; +- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; +- protected final StateEventListener<RenewEvent> eventListener; +- protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); +- protected final ScheduledExecutorService scheduledExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); +- protected final ThreadPoolExecutor renewalWorkerService; ++public interface ReceiptHandleManager { ++ void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); + +- public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { +- this.metadataService = metadataService; +- this.consumerManager = consumerManager; +- this.eventListener = eventListener; +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( +- proxyConfig.getRenewThreadPoolNums(), +- proxyConfig.getRenewMaxThreadPoolNums(), +- 1, TimeUnit.MINUTES, +- "RenewalWorkerThread", +- proxyConfig.getRenewThreadPoolQueueCapacity() +- ); +- consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { +- @Override +- public void handle(ConsumerGroupEvent event, String group, Object... args) { +- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { +- if (args == null || args.length < 1) { +- return; +- } +- if (args[0] instanceof ClientChannelInfo) { +- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; +- if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { +- // if the channel sync from other proxy is expired, not to clear data of connect to current proxy +- return; +- } +- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); +- log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); +- } +- } +- } +- +- @Override +- public void shutdown() { +- +- } +- }); +- this.receiptHandleGroupMap = new ConcurrentHashMap<>(); +- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); +- this.appendStartAndShutdown(new StartAndShutdown() { +- @Override +- public void start() throws Exception { +- scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, +- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); +- } +- +- @Override +- public void shutdown() throws Exception { +- scheduledExecutorService.shutdown(); +- clearAllHandle(); +- } +- }); +- } +- +- public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { +- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), +- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); +- } +- +- public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) { +- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); +- if (handleGroup == null) { +- return null; +- } +- return handleGroup.remove(msgID, receiptHandle); +- } +- +- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { +- return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; +- } +- +- public void scheduleRenewTask() { +- Stopwatch stopwatch = Stopwatch.createStarted(); +- try { +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { +- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); +- if (clientIsOffline(key)) { +- clearGroup(key); +- continue; +- } +- +- ReceiptHandleGroup group = entry.getValue(); +- group.scan((msgID, handleStr, v) -> { +- long current = System.currentTimeMillis(); +- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); +- if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { +- return; +- } +- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); +- }); +- } +- } catch (Exception e) { +- log.error("unexpect error when schedule renew task", e); +- } +- +- log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); +- } +- +- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { +- try { +- group.computeIfPresent(msgID, handleStr, this::startRenewMessage); +- } catch (Exception e) { +- log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); +- } +- } +- +- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { +- CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- long current = System.currentTimeMillis(); +- try { +- if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { +- log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); +- return CompletableFuture.completedFuture(null); +- } +- if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { +- CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); +- future.whenComplete((ackResult, throwable) -> { +- if (throwable != null) { +- log.error("error when renew. handle:{}", messageReceiptHandle, throwable); +- if (renewExceptionNeedRetry(throwable)) { +- messageReceiptHandle.incrementAndGetRenewRetryTimes(); +- resFuture.complete(messageReceiptHandle); +- } else { +- resFuture.complete(null); +- } +- } else if (AckStatus.OK.equals(ackResult.getStatus())) { +- messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); +- messageReceiptHandle.resetRenewRetryTimes(); +- messageReceiptHandle.incrementRenewTimes(); +- resFuture.complete(messageReceiptHandle); +- } else { +- log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); +- resFuture.complete(null); +- } +- }); +- } else { +- ProxyContext context = createContext("RenewMessage"); +- SubscriptionGroupConfig subscriptionGroupConfig = +- metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); +- if (subscriptionGroupConfig == null) { +- log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); +- return CompletableFuture.completedFuture(null); +- } +- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); +- CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); +- future.whenComplete((ackResult, throwable) -> { +- if (throwable != null) { +- log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); +- } +- resFuture.complete(null); +- }); +- } +- } catch (Throwable t) { +- log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); +- resFuture.complete(null); +- } +- return resFuture; +- } +- +- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { +- if (key == null) { +- return; +- } +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); +- if (handleGroup == null) { +- return; +- } +- handleGroup.scan((msgID, handle, v) -> { +- try { +- handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { +- CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); +- return CompletableFuture.completedFuture(null); +- }); +- } catch (Exception e) { +- log.error("error when clear handle for group. key:{}", key, e); +- } +- }); +- } +- +- public void clearAllHandle() { +- log.info("start clear all handle in receiptHandleProcessor"); +- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); +- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { +- clearGroup(key); +- } +- log.info("clear all handle in receiptHandleProcessor done"); +- } +- +- protected boolean renewExceptionNeedRetry(Throwable t) { +- t = ExceptionUtils.getRealException(t); +- if (t instanceof ProxyException) { +- ProxyException proxyException = (ProxyException) t; +- if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || +- ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { +- return false; +- } +- } +- return true; +- } +- +- protected ProxyContext createContext(String actionName) { +- return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); +- } ++ MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle); + } +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +similarity index 93% +rename from proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java +rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +index 877c9fd6f..7c6943e44 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + +-public class ReceiptHandleManagerTest extends BaseServiceTest { +- private ReceiptHandleManager receiptHandleManager; ++public class DefaultReceiptHandleManagerTest extends BaseServiceTest { ++ private DefaultReceiptHandleManager receiptHandleManager; + @Mock + protected MessagingProcessor messagingProcessor; + @Mock +@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + + @Before + public void setup() { +- receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { ++ receiptHandleManager = new DefaultReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { + @Override + public void fireEvent(RenewEvent event) { + MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); +@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + @Test + public void testAddReceiptHandle() { + Channel channel = new LocalChannel(); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.scheduleRenewTask(); +@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + .build().encode(); + MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + } +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + receiptHandleManager.scheduleRenewTask(); +@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + public void testRenewReceiptHandle() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + public void testRenewExceedMaxRenewTimes() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + + CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); + ackResultFuture.completeExceptionally(new MQClientException(0, "error")); +@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + public void testRenewWithInvalidHandle() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + + CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); + ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); +@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + + AtomicInteger count = new AtomicInteger(0); + List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); +@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) +@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + @Test + public void testRemoveReceiptHandle() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); +- receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); + receiptHandleManager.scheduleRenewTask(); +@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + @Test + public void testClearGroup() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest { + ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); + Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); + assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); + } +-- +2.32.0.windows.2 + |