summaryrefslogtreecommitdiff
path: root/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch')
-rw-r--r--patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch1939
1 files changed, 1939 insertions, 0 deletions
diff --git a/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch b/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch
new file mode 100644
index 0000000..356202b
--- /dev/null
+++ b/patch004-backport-Support-Proxy-Protocol-for-gRPC-and-Remoting-Server.patch
@@ -0,0 +1,1939 @@
+From 955428278ccd9bfa0f15e21a8d3040c5213358bd Mon Sep 17 00:00:00 2001
+From: Dongyuan Pan <dongyuanpan0@gmail.com>
+Date: Tue, 4 Jul 2023 18:01:48 +0800
+Subject: [PATCH 1/5] [ISSUE #6991] Delete rocketmq.client.logUseSlf4j=true in
+ JAVA_OPT
+
+---
+ distribution/bin/runbroker.cmd | 1 -
+ distribution/bin/runbroker.sh | 1 -
+ 2 files changed, 2 deletions(-)
+
+diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd
+index 15f676aa8..77a0d1ff8 100644
+--- a/distribution/bin/runbroker.cmd
++++ b/distribution/bin/runbroker.cmd
+@@ -36,7 +36,6 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
+ set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch"
+ set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g"
+ set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking"
+-set "JAVA_OPT=%JAVA_OPT% -Drocketmq.client.logUseSlf4j=true"
+ set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp %CLASSPATH%"
+
+ "%JAVA%" %JAVA_OPT% %*
+\ No newline at end of file
+diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh
+index a081df79e..e6e2132ab 100644
+--- a/distribution/bin/runbroker.sh
++++ b/distribution/bin/runbroker.sh
+@@ -106,7 +106,6 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
+ JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
+ JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
+ JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
+-JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
+ #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
+ JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
+ JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
+--
+2.32.0.windows.2
+
+
+From 00fc42b8be848fc3f5c550cbab007b92f128dc38 Mon Sep 17 00:00:00 2001
+From: ShuangxiDing <dingshuangxi888@gmail.com>
+Date: Tue, 4 Jul 2023 18:02:16 +0800
+Subject: [PATCH 2/5] [ISSUE #6957] Support Proxy Protocol for gRPC and
+ Remoting Server (#6958)
+
+---
+ WORKSPACE | 1 +
+ .../common/constant/HAProxyConstants.java | 28 ++++
+ pom.xml | 5 +
+ proxy/BUILD.bazel | 2 +
+ proxy/pom.xml | 4 +
+ .../proxy/grpc/GrpcServerBuilder.java | 2 +-
+ ...ava => ProxyAndTlsProtocolNegotiator.java} | 139 ++++++++++++++++--
+ .../proxy/grpc/constant/AttributeKeys.java | 44 ++++++
+ .../grpc/interceptor/HeaderInterceptor.java | 32 +++-
+ .../remoting/MultiProtocolRemotingServer.java | 5 +-
+ .../remoting/common/RemotingHelper.java | 42 ++++--
+ .../remoting/netty/AttributeKeys.java | 45 ++++++
+ .../remoting/netty/NettyRemotingServer.java | 129 ++++++++++++++--
+ .../rocketmq/remoting/ProxyProtocolTest.java | 116 +++++++++++++++
+ .../org/apache/rocketmq/remoting/TlsTest.java | 28 ++--
+ 15 files changed, 563 insertions(+), 59 deletions(-)
+ create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
+ rename proxy/src/main/java/org/apache/rocketmq/proxy/grpc/{OptionalSSLProtocolNegotiator.java => ProxyAndTlsProtocolNegotiator.java} (51%)
+ create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
+ create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+ create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
+
+diff --git a/WORKSPACE b/WORKSPACE
+index fbb694efe..e3a8f37dc 100644
+--- a/WORKSPACE
++++ b/WORKSPACE
+@@ -104,6 +104,7 @@ maven_install(
+ "software.amazon.awssdk:s3:2.20.29",
+ "com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
+ "com.adobe.testing:s3mock-junit4:2.11.0",
++ "io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
+ ],
+ fetch_sources = True,
+ repositories = [
+diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
+new file mode 100644
+index 000000000..c1ae0cca1
+--- /dev/null
++++ b/common/src/main/java/org/apache/rocketmq/common/constant/HAProxyConstants.java
+@@ -0,0 +1,28 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.common.constant;
++
++public class HAProxyConstants {
++
++ public static final String PROXY_PROTOCOL_PREFIX = "proxy_protocol_";
++ public static final String PROXY_PROTOCOL_ADDR = PROXY_PROTOCOL_PREFIX + "addr";
++ public static final String PROXY_PROTOCOL_PORT = PROXY_PROTOCOL_PREFIX + "port";
++ public static final String PROXY_PROTOCOL_SERVER_ADDR = PROXY_PROTOCOL_PREFIX + "server_addr";
++ public static final String PROXY_PROTOCOL_SERVER_PORT = PROXY_PROTOCOL_PREFIX + "server_port";
++ public static final String PROXY_PROTOCOL_TLV_PREFIX = PROXY_PROTOCOL_PREFIX + "tlv_0x";
++}
+diff --git a/pom.xml b/pom.xml
+index a3b474602..12bc2dbd5 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -888,6 +888,11 @@
+ </exclusion>
+ </exclusions>
+ </dependency>
++ <dependency>
++ <groupId>io.github.aliyunmq</groupId>
++ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
++ <version>1.0.0</version>
++ </dependency>
+ <dependency>
+ <groupId>com.conversantmedia</groupId>
+ <artifactId>disruptor</artifactId>
+diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
+index fcb85e46f..b4f3c16e2 100644
+--- a/proxy/BUILD.bazel
++++ b/proxy/BUILD.bazel
+@@ -46,6 +46,7 @@ java_library(
+ "@maven//:io_grpc_grpc_services",
+ "@maven//:io_grpc_grpc_stub",
+ "@maven//:io_netty_netty_all",
++ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
+ "@maven//:io_openmessaging_storage_dledger",
+ "@maven//:io_opentelemetry_opentelemetry_api",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
+@@ -94,6 +95,7 @@ java_library(
+ "@maven//:io_grpc_grpc_netty_shaded",
+ "@maven//:io_grpc_grpc_stub",
+ "@maven//:io_netty_netty_all",
++ "@maven//:io_github_aliyunmq_rocketmq_grpc_netty_codec_haproxy",
+ "@maven//:org_apache_commons_commons_lang3",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
+diff --git a/proxy/pom.xml b/proxy/pom.xml
+index f14155737..3fbea107a 100644
+--- a/proxy/pom.xml
++++ b/proxy/pom.xml
+@@ -75,6 +75,10 @@
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>io.github.aliyunmq</groupId>
++ <artifactId>rocketmq-grpc-netty-codec-haproxy</artifactId>
++ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+index 0ca6a1fcb..437b9216b 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+@@ -50,7 +50,7 @@ public class GrpcServerBuilder {
+ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
+ serverBuilder = NettyServerBuilder.forPort(port);
+
+- serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());
++ serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
+
+ // build server
+ int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+similarity index 51%
+rename from proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
+rename to proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+index 670e1c1a2..ceb9becc0 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+@@ -16,36 +16,53 @@
+ */
+ package org.apache.rocketmq.proxy.grpc;
+
++import io.grpc.Attributes;
+ import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
+ import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
+ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
+ import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
++import io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiationEvent;
+ import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+ import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
+ import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
++import io.grpc.netty.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
+ import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
++import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionResult;
++import io.grpc.netty.shaded.io.netty.handler.codec.ProtocolDetectionState;
++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessage;
++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
++import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+ import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
+ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+ import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+ import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+ import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
+ import io.grpc.netty.shaded.io.netty.util.AsciiString;
+-import java.io.InputStream;
+-import java.nio.file.Files;
+-import java.nio.file.Paths;
+-import java.util.List;
++import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
++import org.apache.commons.collections.CollectionUtils;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.proxy.config.ConfigurationManager;
+ import org.apache.rocketmq.proxy.config.ProxyConfig;
++import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
+ import org.apache.rocketmq.remoting.common.TlsMode;
+ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+
+-public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
++import java.io.InputStream;
++import java.nio.file.Files;
++import java.nio.file.Paths;
++import java.util.List;
++
++public class ProxyAndTlsProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
+ protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
++ private static final String HA_PROXY_DECODER = "HAProxyDecoder";
++ private static final String HA_PROXY_HANDLER = "HAProxyHandler";
++ private static final String TLS_MODE_HANDLER = "TlsModeHandler";
+ /**
+ * the length of the ssl record header (in bytes)
+ */
+@@ -53,7 +70,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+
+ private static SslContext sslContext;
+
+- public OptionalSSLProtocolNegotiator() {
++ public ProxyAndTlsProtocolNegotiator() {
+ sslContext = loadSslContext();
+ }
+
+@@ -64,11 +81,12 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+
+ @Override
+ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+- return new PortUnificationServerHandler(grpcHandler);
++ return new ProxyAndTlsProtocolHandler(grpcHandler);
+ }
+
+ @Override
+- public void close() {}
++ public void close() {
++ }
+
+ private static SslContext loadSslContext() {
+ try {
+@@ -85,8 +103,8 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+ String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
+ try (InputStream serverKeyInputStream = Files.newInputStream(
+ Paths.get(tlsKeyPath));
+- InputStream serverCertificateStream = Files.newInputStream(
+- Paths.get(tlsCertPath))) {
++ InputStream serverCertificateStream = Files.newInputStream(
++ Paths.get(tlsCertPath))) {
+ SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
+ serverKeyInputStream)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+@@ -102,12 +120,95 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+ }
+ }
+
+- public static class PortUnificationServerHandler extends ByteToMessageDecoder {
++ private static class ProxyAndTlsProtocolHandler extends ByteToMessageDecoder {
++
++ private final GrpcHttp2ConnectionHandler grpcHandler;
++
++ public ProxyAndTlsProtocolHandler(GrpcHttp2ConnectionHandler grpcHandler) {
++ this.grpcHandler = grpcHandler;
++ }
++
++ @Override
++ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
++ try {
++ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(
++ in);
++ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
++ return;
++ }
++ if (ha.state() == ProtocolDetectionState.DETECTED) {
++ ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
++ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
++ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
++ } else {
++ ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new TlsModeHandler(grpcHandler));
++ }
++
++ ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
++ ctx.pipeline().remove(this);
++ } catch (Exception e) {
++ log.error("process proxy protocol negotiator failed.", e);
++ throw e;
++ }
++ }
++ }
++
++ private static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
++
++ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
++
++ @Override
++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
++ if (msg instanceof HAProxyMessage) {
++ replaceEventWithMessage((HAProxyMessage) msg);
++ ctx.fireUserEventTriggered(pne);
++ } else {
++ super.channelRead(ctx, msg);
++ }
++ ctx.pipeline().remove(this);
++ }
++
++ /**
++ * The definition of key refers to the implementation of nginx
++ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
++ *
++ * @param msg
++ */
++ private void replaceEventWithMessage(HAProxyMessage msg) {
++ Attributes.Builder builder = InternalProtocolNegotiationEvent.getAttributes(pne).toBuilder();
++ if (StringUtils.isNotBlank(msg.sourceAddress())) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_ADDR, msg.sourceAddress());
++ }
++ if (msg.sourcePort() > 0) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_PORT, String.valueOf(msg.sourcePort()));
++ }
++ if (StringUtils.isNotBlank(msg.destinationAddress())) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR, msg.destinationAddress());
++ }
++ if (msg.destinationPort() > 0) {
++ builder.set(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT, String.valueOf(msg.destinationPort()));
++ }
++ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
++ msg.tlvs().forEach(tlv -> {
++ Attributes.Key<String> key = AttributeKeys.valueOf(
++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
++ builder.set(key, value);
++ });
++ }
++ pne = InternalProtocolNegotiationEvent
++ .withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
++ }
++ }
++
++ private static class TlsModeHandler extends ByteToMessageDecoder {
++
++ private ProtocolNegotiationEvent pne = InternalProtocolNegotiationEvent.getDefault();
+
+ private final ChannelHandler ssl;
+ private final ChannelHandler plaintext;
+
+- public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) {
++ public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
+ this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
+ .newHandler(grpcHandler);
+ this.plaintext = InternalProtocolNegotiators.serverPlaintext()
+@@ -115,8 +216,7 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+ }
+
+ @Override
+- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
+- throws Exception {
++ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
+ try {
+ TlsMode tlsMode = TlsSystemConfig.tlsMode;
+ if (TlsMode.ENFORCING.equals(tlsMode)) {
+@@ -134,12 +234,21 @@ public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator
+ ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
+ }
+ }
+- ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
++ ctx.fireUserEventTriggered(pne);
+ ctx.pipeline().remove(this);
+ } catch (Exception e) {
+ log.error("process ssl protocol negotiator failed.", e);
+ throw e;
+ }
+ }
++
++ @Override
++ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
++ if (evt instanceof ProtocolNegotiationEvent) {
++ pne = (ProtocolNegotiationEvent) evt;
++ } else {
++ super.userEventTriggered(ctx, evt);
++ }
++ }
+ }
+ }
+\ No newline at end of file
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
+new file mode 100644
+index 000000000..096a5ba3d
+--- /dev/null
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/constant/AttributeKeys.java
+@@ -0,0 +1,44 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.proxy.grpc.constant;
++
++import io.grpc.Attributes;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
++
++import java.util.Map;
++import java.util.concurrent.ConcurrentHashMap;
++
++public class AttributeKeys {
++
++ public static final Attributes.Key<String> PROXY_PROTOCOL_ADDR =
++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_ADDR);
++
++ public static final Attributes.Key<String> PROXY_PROTOCOL_PORT =
++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_PORT);
++
++ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_ADDR =
++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
++
++ public static final Attributes.Key<String> PROXY_PROTOCOL_SERVER_PORT =
++ Attributes.Key.create(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
++
++ private static final Map<String, Attributes.Key<String>> ATTRIBUTES_KEY_MAP = new ConcurrentHashMap<>();
++
++ public static Attributes.Key<String> valueOf(String name) {
++ return ATTRIBUTES_KEY_MAP.computeIfAbsent(name, key -> Attributes.Key.create(name));
++ }
++}
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+index 1cbb00361..13893e5ed 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+@@ -18,11 +18,16 @@
+ package org.apache.rocketmq.proxy.grpc.interceptor;
+
+ import com.google.common.net.HostAndPort;
++import io.grpc.Attributes;
+ import io.grpc.Grpc;
+ import io.grpc.Metadata;
+ import io.grpc.ServerCall;
+ import io.grpc.ServerCallHandler;
+ import io.grpc.ServerInterceptor;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
++import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
++
+ import java.net.InetSocketAddress;
+ import java.net.SocketAddress;
+
+@@ -33,13 +38,27 @@ public class HeaderInterceptor implements ServerInterceptor {
+ Metadata headers,
+ ServerCallHandler<R, W> next
+ ) {
+- SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+- String remoteAddress = parseSocketAddress(remoteSocketAddress);
++ String remoteAddress = getProxyProtocolAddress(call.getAttributes());
++ if (StringUtils.isBlank(remoteAddress)) {
++ SocketAddress remoteSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
++ remoteAddress = parseSocketAddress(remoteSocketAddress);
++ }
+ headers.put(InterceptorConstants.REMOTE_ADDRESS, remoteAddress);
+
+ SocketAddress localSocketAddress = call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
+ String localAddress = parseSocketAddress(localSocketAddress);
+ headers.put(InterceptorConstants.LOCAL_ADDRESS, localAddress);
++
++ for (Attributes.Key<?> key : call.getAttributes().keys()) {
++ if (!StringUtils.startsWith(key.toString(), HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
++ continue;
++ }
++ Metadata.Key<String> headerKey
++ = Metadata.Key.of(key.toString(), Metadata.ASCII_STRING_MARSHALLER);
++ String headerValue = String.valueOf(call.getAttributes().get(key));
++ headers.put(headerKey, headerValue);
++ }
++
+ return next.startCall(call, headers);
+ }
+
+@@ -55,4 +74,13 @@ public class HeaderInterceptor implements ServerInterceptor {
+
+ return "";
+ }
++
++ private String getProxyProtocolAddress(Attributes attributes) {
++ String proxyProtocolAddr = attributes.get(AttributeKeys.PROXY_PROTOCOL_ADDR);
++ String proxyProtocolPort = attributes.get(AttributeKeys.PROXY_PROTOCOL_PORT);
++ if (StringUtils.isBlank(proxyProtocolAddr) || StringUtils.isBlank(proxyProtocolPort)) {
++ return null;
++ }
++ return proxyProtocolAddr + ":" + proxyProtocolPort;
++ }
+ }
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+index 1142132b7..858b1f022 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+@@ -20,8 +20,6 @@ package org.apache.rocketmq.proxy.remoting;
+ import io.netty.channel.ChannelPipeline;
+ import io.netty.channel.socket.SocketChannel;
+ import io.netty.handler.timeout.IdleStateHandler;
+-import java.io.IOException;
+-import java.security.cert.CertificateException;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+@@ -36,6 +34,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+
++import java.io.IOException;
++import java.security.cert.CertificateException;
++
+ /**
+ * support remoting and http2 protocol at one port
+ */
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+index 75e25a83a..d0750b678 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+@@ -21,14 +21,8 @@ import io.netty.channel.ChannelFuture;
+ import io.netty.channel.ChannelFutureListener;
+ import io.netty.util.Attribute;
+ import io.netty.util.AttributeKey;
+-import java.io.IOException;
+-import java.lang.reflect.Field;
+-import java.net.InetSocketAddress;
+-import java.net.SocketAddress;
+-import java.nio.ByteBuffer;
+-import java.nio.channels.SocketChannel;
+-import java.util.HashMap;
+-import java.util.Map;
++import org.apache.commons.lang3.StringUtils;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.common.utils.NetworkUtil;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+@@ -43,6 +37,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.remoting.protocol.RequestCode;
+ import org.apache.rocketmq.remoting.protocol.ResponseCode;
+
++import java.io.IOException;
++import java.lang.reflect.Field;
++import java.net.InetSocketAddress;
++import java.net.SocketAddress;
++import java.nio.ByteBuffer;
++import java.nio.channels.SocketChannel;
++import java.util.HashMap;
++import java.util.Map;
++
+ public class RemotingHelper {
+ public static final String DEFAULT_CHARSET = "UTF-8";
+ public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
+@@ -50,6 +53,9 @@ public class RemotingHelper {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+ private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
+
++ private static final AttributeKey<String> PROXY_PROTOCOL_ADDR = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
++ private static final AttributeKey<String> PROXY_PROTOCOL_PORT = AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
++
+ public static final AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("ClientId");
+
+ public static final AttributeKey<Integer> VERSION_KEY = AttributeKey.valueOf("Version");
+@@ -203,12 +209,16 @@ public class RemotingHelper {
+ if (null == channel) {
+ return "";
+ }
++ String addr = getProxyProtocolAddress(channel);
++ if (StringUtils.isNotBlank(addr)) {
++ return addr;
++ }
+ Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
+ if (att == null) {
+ // mocked in unit test
+ return parseChannelRemoteAddr0(channel);
+ }
+- String addr = att.get();
++ addr = att.get();
+ if (addr == null) {
+ addr = parseChannelRemoteAddr0(channel);
+ att.set(addr);
+@@ -216,6 +226,18 @@ public class RemotingHelper {
+ return addr;
+ }
+
++ private static String getProxyProtocolAddress(Channel channel) {
++ if (!channel.hasAttr(PROXY_PROTOCOL_ADDR)) {
++ return null;
++ }
++ String proxyProtocolAddr = getAttributeValue(PROXY_PROTOCOL_ADDR, channel);
++ String proxyProtocolPort = getAttributeValue(PROXY_PROTOCOL_PORT, channel);
++ if (StringUtils.isBlank(proxyProtocolAddr) || proxyProtocolPort == null) {
++ return null;
++ }
++ return proxyProtocolAddr + ":" + proxyProtocolPort;
++ }
++
+ private static String parseChannelRemoteAddr0(final Channel channel) {
+ SocketAddress remote = channel.remoteAddress();
+ final String addr = remote != null ? remote.toString() : "";
+@@ -255,7 +277,7 @@ public class RemotingHelper {
+ return "";
+ }
+
+- public static int parseSocketAddressPort(SocketAddress socketAddress) {
++ public static Integer parseSocketAddressPort(SocketAddress socketAddress) {
+ if (socketAddress instanceof InetSocketAddress) {
+ return ((InetSocketAddress) socketAddress).getPort();
+ }
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+new file mode 100644
+index 000000000..4e69ab82d
+--- /dev/null
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AttributeKeys.java
+@@ -0,0 +1,45 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.remoting.netty;
++
++
++import io.netty.util.AttributeKey;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
++
++import java.util.Map;
++import java.util.concurrent.ConcurrentHashMap;
++
++public class AttributeKeys {
++
++ public static final AttributeKey<String> PROXY_PROTOCOL_ADDR =
++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_ADDR);
++
++ public static final AttributeKey<String> PROXY_PROTOCOL_PORT =
++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_PORT);
++
++ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_ADDR =
++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_ADDR);
++
++ public static final AttributeKey<String> PROXY_PROTOCOL_SERVER_PORT =
++ AttributeKey.valueOf(HAProxyConstants.PROXY_PROTOCOL_SERVER_PORT);
++
++ private static final Map<String, AttributeKey<String>> ATTRIBUTE_KEY_MAP = new ConcurrentHashMap<>();
++
++ public static AttributeKey<String> valueOf(String name) {
++ return ATTRIBUTE_KEY_MAP.computeIfAbsent(name, AttributeKeys::valueOf);
++ }
++}
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+index 9f39d672e..94ffd8d07 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+@@ -24,6 +24,7 @@ import io.netty.channel.ChannelDuplexHandler;
+ import io.netty.channel.ChannelFuture;
+ import io.netty.channel.ChannelHandler;
+ import io.netty.channel.ChannelHandlerContext;
++import io.netty.channel.ChannelInboundHandlerAdapter;
+ import io.netty.channel.ChannelInitializer;
+ import io.netty.channel.ChannelOption;
+ import io.netty.channel.ChannelPipeline;
+@@ -36,27 +37,25 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
+ import io.netty.channel.nio.NioEventLoopGroup;
+ import io.netty.channel.socket.SocketChannel;
+ import io.netty.channel.socket.nio.NioServerSocketChannel;
++import io.netty.handler.codec.ProtocolDetectionResult;
++import io.netty.handler.codec.ProtocolDetectionState;
++import io.netty.handler.codec.haproxy.HAProxyMessage;
++import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+ import io.netty.handler.timeout.IdleState;
+ import io.netty.handler.timeout.IdleStateEvent;
+ import io.netty.handler.timeout.IdleStateHandler;
++import io.netty.util.AttributeKey;
++import io.netty.util.CharsetUtil;
+ import io.netty.util.HashedWheelTimer;
+ import io.netty.util.Timeout;
+ import io.netty.util.TimerTask;
+ import io.netty.util.concurrent.DefaultEventExecutorGroup;
+-import java.io.IOException;
+-import java.net.InetSocketAddress;
+-import java.security.cert.CertificateException;
+-import java.util.NoSuchElementException;
+-import java.util.concurrent.ConcurrentHashMap;
+-import java.util.concurrent.ConcurrentMap;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.ScheduledExecutorService;
+-import java.util.concurrent.ScheduledThreadPoolExecutor;
+-import java.util.concurrent.ThreadPoolExecutor;
+-import java.util.concurrent.TimeUnit;
++import org.apache.commons.collections.CollectionUtils;
++import org.apache.commons.lang3.StringUtils;
+ import org.apache.rocketmq.common.Pair;
+ import org.apache.rocketmq.common.ThreadFactoryImpl;
++import org.apache.rocketmq.common.constant.HAProxyConstants;
+ import org.apache.rocketmq.common.constant.LoggerName;
+ import org.apache.rocketmq.common.utils.NetworkUtil;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+@@ -71,6 +70,19 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
++import java.io.IOException;
++import java.net.InetSocketAddress;
++import java.security.cert.CertificateException;
++import java.util.NoSuchElementException;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ConcurrentMap;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.ScheduledThreadPoolExecutor;
++import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
++
+ @SuppressWarnings("NullableProblems")
+ public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+@@ -96,6 +108,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>();
+
+ public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
++ public static final String HA_PROXY_DECODER = "HAProxyDecoder";
++ public static final String HA_PROXY_HANDLER = "HAProxyHandler";
++ public static final String TLS_MODE_HANDLER = "TlsModeHandler";
+ public static final String TLS_HANDLER_NAME = "sslHandler";
+ public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+
+@@ -387,7 +402,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ }
+
+ private void prepareSharableHandlers() {
+- handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
++ handshakeHandler = new HandshakeHandler();
+ encoder = new NettyEncoder();
+ connectionManageHandler = new NettyConnectManageHandler();
+ serverHandler = new NettyServerHandler();
+@@ -437,11 +452,51 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ @ChannelHandler.Sharable
+ public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
++ private final TlsModeHandler tlsModeHandler;
++
++ public HandshakeHandler() {
++ tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
++ }
++
++ @Override
++ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
++ try {
++ ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
++ if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
++ return;
++ }
++ if (ha.state() == ProtocolDetectionState.DETECTED) {
++ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
++ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
++ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
++ } else {
++ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
++ }
++
++ try {
++ // Remove this handler
++ ctx.pipeline().remove(this);
++ } catch (NoSuchElementException e) {
++ log.error("Error while removing HandshakeHandler", e);
++ }
++
++ // Hand over this message to the next .
++ ctx.fireChannelRead(in.retain());
++ } catch (Exception e) {
++ log.error("process proxy protocol negotiator failed.", e);
++ throw e;
++ }
++ }
++ }
++
++ @ChannelHandler.Sharable
++ public class TlsModeHandler extends SimpleChannelInboundHandler<ByteBuf> {
++
+ private final TlsMode tlsMode;
+
+ private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
+
+- HandshakeHandler(TlsMode tlsMode) {
++ TlsModeHandler(TlsMode tlsMode) {
+ this.tlsMode = tlsMode;
+ }
+
+@@ -461,7 +516,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ case ENFORCING:
+ if (null != sslContext) {
+ ctx.pipeline()
+- .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
++ .addAfter(defaultEventExecutorGroup, TLS_MODE_HANDLER, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ log.info("Handlers prepended to channel pipeline to establish SSL connection");
+ } else {
+@@ -483,7 +538,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ // Remove this handler
+ ctx.pipeline().remove(this);
+ } catch (NoSuchElementException e) {
+- log.error("Error while removing HandshakeHandler", e);
++ log.error("Error while removing TlsModeHandler", e);
+ }
+
+ // Hand over this message to the next .
+@@ -706,4 +761,46 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ return NettyRemotingServer.this.getCallbackExecutor();
+ }
+ }
++
++ public static class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
++
++ @Override
++ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
++ if (msg instanceof HAProxyMessage) {
++ fillChannelWithMessage((HAProxyMessage) msg, ctx.channel());
++ } else {
++ super.channelRead(ctx, msg);
++ }
++ ctx.pipeline().remove(this);
++ }
++
++ /**
++ * The definition of key refers to the implementation of nginx
++ * <a href="https://nginx.org/en/docs/http/ngx_http_core_module.html#var_proxy_protocol_addr">ngx_http_core_module</a>
++ * @param msg
++ * @param channel
++ */
++ private void fillChannelWithMessage(HAProxyMessage msg, Channel channel) {
++ if (StringUtils.isNotBlank(msg.sourceAddress())) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_ADDR).set(msg.sourceAddress());
++ }
++ if (msg.sourcePort() > 0) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_PORT).set(String.valueOf(msg.sourcePort()));
++ }
++ if (StringUtils.isNotBlank(msg.destinationAddress())) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_ADDR).set(msg.destinationAddress());
++ }
++ if (msg.destinationPort() > 0) {
++ channel.attr(AttributeKeys.PROXY_PROTOCOL_SERVER_PORT).set(String.valueOf(msg.destinationPort()));
++ }
++ if (CollectionUtils.isNotEmpty(msg.tlvs())) {
++ msg.tlvs().forEach(tlv -> {
++ AttributeKey<String> key = AttributeKeys.valueOf(
++ HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX + String.format("%02x", tlv.typeByteValue()));
++ String value = StringUtils.trim(tlv.content().toString(CharsetUtil.UTF_8));
++ channel.attr(key).set(value);
++ });
++ }
++ }
++ }
+ }
+diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
+new file mode 100644
+index 000000000..c39fd2132
+--- /dev/null
++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/ProxyProtocolTest.java
+@@ -0,0 +1,116 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.remoting;
++
++import io.netty.buffer.ByteBuf;
++import io.netty.buffer.Unpooled;
++import io.netty.channel.Channel;
++import io.netty.handler.codec.haproxy.HAProxyCommand;
++import io.netty.handler.codec.haproxy.HAProxyMessage;
++import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
++import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
++import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
++import org.apache.rocketmq.common.utils.NetworkUtil;
++import org.apache.rocketmq.remoting.netty.NettyClientConfig;
++import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
++import org.apache.rocketmq.remoting.protocol.LanguageCode;
++import org.apache.rocketmq.remoting.protocol.RemotingCommand;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.mockito.junit.MockitoJUnitRunner;
++
++import java.io.IOException;
++import java.lang.reflect.Method;
++import java.net.Socket;
++import java.time.Duration;
++import java.util.concurrent.TimeUnit;
++
++import static org.assertj.core.api.Assertions.assertThat;
++import static org.awaitility.Awaitility.await;
++import static org.junit.Assert.assertNotNull;
++
++@RunWith(MockitoJUnitRunner.class)
++public class ProxyProtocolTest {
++
++ private RemotingServer remotingServer;
++ private RemotingClient remotingClient;
++
++ @Before
++ public void setUp() throws Exception {
++ NettyClientConfig clientConfig = new NettyClientConfig();
++ clientConfig.setUseTLS(false);
++
++ remotingServer = RemotingServerTest.createRemotingServer();
++ remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
++
++ await().pollDelay(Duration.ofMillis(10))
++ .pollInterval(Duration.ofMillis(10))
++ .atMost(20, TimeUnit.SECONDS).until(() -> isHostConnectable(getServerAddress()));
++ }
++
++ @Test
++ public void testProxyProtocol() throws Exception {
++ sendHAProxyMessage(remotingClient);
++ requestThenAssertResponse(remotingClient);
++ }
++
++ private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception {
++ RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 10000 * 3);
++ assertNotNull(response);
++ assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
++ assertThat(response.getExtFields()).hasSize(2);
++ assertThat(response.getExtFields().get("messageTitle")).isEqualTo("Welcome");
++ }
++
++ private void sendHAProxyMessage(RemotingClient remotingClient) throws Exception {
++ Method getAndCreateChannel = NettyRemotingClient.class.getDeclaredMethod("getAndCreateChannel", String.class);
++ getAndCreateChannel.setAccessible(true);
++ NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) remotingClient;
++ Channel channel = (Channel) getAndCreateChannel.invoke(nettyRemotingClient, getServerAddress());
++ HAProxyMessage message = new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
++ HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000);
++
++ ByteBuf byteBuf = Unpooled.directBuffer();
++ Method encode = HAProxyMessageEncoder.class.getDeclaredMethod("encodeV2", HAProxyMessage.class, ByteBuf.class);
++ encode.setAccessible(true);
++ encode.invoke(HAProxyMessageEncoder.INSTANCE, message, byteBuf);
++ channel.writeAndFlush(byteBuf).sync();
++ }
++
++ private static RemotingCommand createRequest() {
++ RequestHeader requestHeader = new RequestHeader();
++ requestHeader.setCount(1);
++ requestHeader.setMessageTitle("Welcome");
++ return RemotingCommand.createRequestCommand(0, requestHeader);
++ }
++
++
++ private String getServerAddress() {
++ return "localhost:" + remotingServer.localListenPort();
++ }
++
++ private boolean isHostConnectable(String addr) {
++ try (Socket socket = new Socket()) {
++ socket.connect(NetworkUtil.string2SocketAddress(addr));
++ return true;
++ } catch (IOException ignored) {
++ }
++ return false;
++ }
++}
+diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+index 3da7abf57..de7edbbfb 100644
+--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+@@ -17,19 +17,6 @@
+
+ package org.apache.rocketmq.remoting;
+
+-import java.io.BufferedInputStream;
+-import java.io.BufferedOutputStream;
+-import java.io.BufferedWriter;
+-import java.io.File;
+-import java.io.FileOutputStream;
+-import java.io.FileWriter;
+-import java.io.IOException;
+-import java.io.InputStream;
+-import java.io.PrintWriter;
+-import java.net.Socket;
+-import java.time.Duration;
+-import java.util.UUID;
+-import java.util.concurrent.TimeUnit;
+ import org.apache.rocketmq.common.utils.NetworkUtil;
+ import org.apache.rocketmq.remoting.common.TlsMode;
+ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+@@ -47,6 +34,20 @@ import org.junit.rules.TestName;
+ import org.junit.runner.RunWith;
+ import org.mockito.junit.MockitoJUnitRunner;
+
++import java.io.BufferedInputStream;
++import java.io.BufferedOutputStream;
++import java.io.BufferedWriter;
++import java.io.File;
++import java.io.FileOutputStream;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.io.InputStream;
++import java.io.PrintWriter;
++import java.net.Socket;
++import java.time.Duration;
++import java.util.UUID;
++import java.util.concurrent.TimeUnit;
++
+ import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
+ import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
+ import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
+@@ -234,6 +235,7 @@ public class TlsTest {
+ @Test
+ public void serverAcceptsUntrustedClientCert() throws Exception {
+ requestThenAssertResponse();
++// Thread.sleep(1000000L);
+ }
+
+ /**
+--
+2.32.0.windows.2
+
+
+From 4f840afcb04f5cc328795896198c6fba96ff37ec Mon Sep 17 00:00:00 2001
+From: mxsm <ljbmxsm@gmail.com>
+Date: Wed, 5 Jul 2023 11:03:52 +0800
+Subject: [PATCH 3/5] [ISSUE #6960] Added Slot formatting sketch comments
+ (#6961)
+
+---
+ .../java/org/apache/rocketmq/store/timer/Slot.java | 10 +++++++++-
+ 1 file changed, 9 insertions(+), 1 deletion(-)
+
+diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
+index b91193b94..2da846cee 100644
+--- a/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
++++ b/store/src/main/java/org/apache/rocketmq/store/timer/Slot.java
+@@ -16,9 +16,17 @@
+ */
+ package org.apache.rocketmq.store.timer;
+
++/**
++ * Represents a slot of timing wheel. Format:
++ * ┌────────────┬───────────┬───────────┬───────────┬───────────┐
++ * │delayed time│ first pos │ last pos │ num │ magic │
++ * ├────────────┼───────────┼───────────┼───────────┼───────────┤
++ * │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │
++ * └────────────┴───────────┴───────────┴───────────┴───────────┘
++ */
+ public class Slot {
+ public static final short SIZE = 32;
+- public final long timeMs;
++ public final long timeMs; //delayed time
+ public final long firstPos;
+ public final long lastPos;
+ public final int num;
+--
+2.32.0.windows.2
+
+
+From 58550f074ec101c0a158ede0df1839950e08837a Mon Sep 17 00:00:00 2001
+From: rongtong <jinrongtong5@163.com>
+Date: Mon, 10 Jul 2023 14:13:18 +0800
+Subject: [PATCH 4/5] [ISSUE #7008] Fix the issue of protocol parsing failure
+ when using haproxy and tls together (#7009)
+
+---
+ .../remoting/netty/NettyRemotingServer.java | 14 +++++++-------
+ 1 file changed, 7 insertions(+), 7 deletions(-)
+
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+index 94ffd8d07..445f06cc6 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+@@ -459,13 +459,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ }
+
+ @Override
+- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
++ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
+ try {
+- ProtocolDetectionResult<HAProxyProtocolVersion> ha = HAProxyMessageDecoder.detectProtocol(in);
+- if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
++ ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(byteBuf);
++ if (detectionResult.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+ return;
+ }
+- if (ha.state() == ProtocolDetectionState.DETECTED) {
++ if (detectionResult.state() == ProtocolDetectionState.DETECTED) {
+ ctx.pipeline().addAfter(defaultEventExecutorGroup, ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+@@ -481,7 +481,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ }
+
+ // Hand over this message to the next .
+- ctx.fireChannelRead(in.retain());
++ ctx.fireChannelRead(byteBuf.retain());
+ } catch (Exception e) {
+ log.error("process proxy protocol negotiator failed.", e);
+ throw e;
+@@ -503,8 +503,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
+
+- // Peek the first byte to determine if the content is starting with TLS handshake
+- byte b = msg.getByte(0);
++ // Peek the current read index byte to determine if the content is starting with TLS handshake
++ byte b = msg.getByte(msg.readerIndex());
+
+ if (b == HANDSHAKE_MAGIC_CODE) {
+ switch (tlsMode) {
+--
+2.32.0.windows.2
+
+
+From 8e6b5e62bd4da78c0a7d265891c52685fcffd08a Mon Sep 17 00:00:00 2001
+From: Zhouxiang Zhan <zhouxzhan@apache.org>
+Date: Mon, 10 Jul 2023 20:14:17 +0800
+Subject: [PATCH 5/5] [ISSUE #6999] Add interface ReceiptHandleManager (#7000)
+
+* Add interface ReceiptHandleManager
+
+* fix unit test
+
+* fix
+---
+ .../processor/ReceiptHandleProcessor.java | 10 +-
+ .../receipt/DefaultReceiptHandleManager.java | 282 ++++++++++++++++++
+ .../service/receipt/ReceiptHandleManager.java | 260 +---------------
+ ...a => DefaultReceiptHandleManagerTest.java} | 34 +--
+ 4 files changed, 307 insertions(+), 279 deletions(-)
+ create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+ rename proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/{ReceiptHandleManagerTest.java => DefaultReceiptHandleManagerTest.java} (93%)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+index 9c7e8dea9..fc49e7622 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+@@ -28,12 +28,12 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.proxy.common.RenewEvent;
+ import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+ import org.apache.rocketmq.proxy.common.ProxyContext;
+-import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager;
++import org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager;
+ import org.apache.rocketmq.proxy.service.ServiceManager;
+
+ public class ReceiptHandleProcessor extends AbstractProcessor {
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+- protected ReceiptHandleManager receiptHandleManager;
++ protected DefaultReceiptHandleManager receiptHandleManager;
+
+ public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
+ super(messagingProcessor, serviceManager);
+@@ -51,7 +51,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
+ event.getFuture().complete(v);
+ });
+ };
+- this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
++ this.receiptHandleManager = new DefaultReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener);
+ }
+
+ protected ProxyContext createContext(String actionName) {
+@@ -59,11 +59,11 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
+ }
+
+ public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
+- receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle);
+ }
+
+ public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) {
+- return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle);
++ return receiptHandleManager.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle);
+ }
+
+ public static class ReceiptHandleGroupKey {
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+new file mode 100644
+index 000000000..c7633d658
+--- /dev/null
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+@@ -0,0 +1,282 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.proxy.service.receipt;
++
++import com.google.common.base.Stopwatch;
++import io.netty.channel.Channel;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ConcurrentMap;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
++import org.apache.rocketmq.broker.client.ClientChannelInfo;
++import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
++import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
++import org.apache.rocketmq.broker.client.ConsumerManager;
++import org.apache.rocketmq.client.consumer.AckResult;
++import org.apache.rocketmq.client.consumer.AckStatus;
++import org.apache.rocketmq.common.ThreadFactoryImpl;
++import org.apache.rocketmq.common.constant.LoggerName;
++import org.apache.rocketmq.common.consumer.ReceiptHandle;
++import org.apache.rocketmq.common.state.StateEventListener;
++import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
++import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
++import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
++import org.apache.rocketmq.common.utils.StartAndShutdown;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.proxy.common.RenewEvent;
++import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
++import org.apache.rocketmq.proxy.common.ProxyContext;
++import org.apache.rocketmq.proxy.common.ProxyException;
++import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
++import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
++import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
++import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
++import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
++import org.apache.rocketmq.proxy.config.ConfigurationManager;
++import org.apache.rocketmq.proxy.config.ProxyConfig;
++import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
++import org.apache.rocketmq.proxy.service.metadata.MetadataService;
++import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
++import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
++
++public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implements ReceiptHandleManager {
++ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
++ protected final MetadataService metadataService;
++ protected final ConsumerManager consumerManager;
++ protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
++ protected final StateEventListener<RenewEvent> eventListener;
++ protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
++ protected final ScheduledExecutorService scheduledExecutorService =
++ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
++ protected final ThreadPoolExecutor renewalWorkerService;
++
++ public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
++ this.metadataService = metadataService;
++ this.consumerManager = consumerManager;
++ this.eventListener = eventListener;
++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
++ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
++ proxyConfig.getRenewThreadPoolNums(),
++ proxyConfig.getRenewMaxThreadPoolNums(),
++ 1, TimeUnit.MINUTES,
++ "RenewalWorkerThread",
++ proxyConfig.getRenewThreadPoolQueueCapacity()
++ );
++ consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
++ @Override
++ public void handle(ConsumerGroupEvent event, String group, Object... args) {
++ if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
++ if (args == null || args.length < 1) {
++ return;
++ }
++ if (args[0] instanceof ClientChannelInfo) {
++ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
++ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
++ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
++ return;
++ }
++ clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
++ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
++ }
++ }
++ }
++
++ @Override
++ public void shutdown() {
++
++ }
++ });
++ this.receiptHandleGroupMap = new ConcurrentHashMap<>();
++ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
++ this.appendStartAndShutdown(new StartAndShutdown() {
++ @Override
++ public void start() throws Exception {
++ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
++ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
++ }
++
++ @Override
++ public void shutdown() throws Exception {
++ scheduledExecutorService.shutdown();
++ clearAllHandle();
++ }
++ });
++ }
++
++ public void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
++ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
++ k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
++ }
++
++ public MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle) {
++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
++ if (handleGroup == null) {
++ return null;
++ }
++ return handleGroup.remove(msgID, receiptHandle);
++ }
++
++ protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
++ return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
++ }
++
++ protected void scheduleRenewTask() {
++ Stopwatch stopwatch = Stopwatch.createStarted();
++ try {
++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
++ for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
++ ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
++ if (clientIsOffline(key)) {
++ clearGroup(key);
++ continue;
++ }
++
++ ReceiptHandleGroup group = entry.getValue();
++ group.scan((msgID, handleStr, v) -> {
++ long current = System.currentTimeMillis();
++ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
++ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
++ return;
++ }
++ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
++ });
++ }
++ } catch (Exception e) {
++ log.error("unexpect error when schedule renew task", e);
++ }
++
++ log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
++ }
++
++ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
++ try {
++ group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
++ } catch (Exception e) {
++ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
++ }
++ }
++
++ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
++ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
++ long current = System.currentTimeMillis();
++ try {
++ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
++ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
++ return CompletableFuture.completedFuture(null);
++ }
++ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
++ CompletableFuture<AckResult> future = new CompletableFuture<>();
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
++ future.whenComplete((ackResult, throwable) -> {
++ if (throwable != null) {
++ log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
++ if (renewExceptionNeedRetry(throwable)) {
++ messageReceiptHandle.incrementAndGetRenewRetryTimes();
++ resFuture.complete(messageReceiptHandle);
++ } else {
++ resFuture.complete(null);
++ }
++ } else if (AckStatus.OK.equals(ackResult.getStatus())) {
++ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
++ messageReceiptHandle.resetRenewRetryTimes();
++ messageReceiptHandle.incrementRenewTimes();
++ resFuture.complete(messageReceiptHandle);
++ } else {
++ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
++ resFuture.complete(null);
++ }
++ });
++ } else {
++ ProxyContext context = createContext("RenewMessage");
++ SubscriptionGroupConfig subscriptionGroupConfig =
++ metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
++ if (subscriptionGroupConfig == null) {
++ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
++ return CompletableFuture.completedFuture(null);
++ }
++ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
++ CompletableFuture<AckResult> future = new CompletableFuture<>();
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
++ future.whenComplete((ackResult, throwable) -> {
++ if (throwable != null) {
++ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
++ }
++ resFuture.complete(null);
++ });
++ }
++ } catch (Throwable t) {
++ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
++ resFuture.complete(null);
++ }
++ return resFuture;
++ }
++
++ protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
++ if (key == null) {
++ return;
++ }
++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
++ if (handleGroup == null) {
++ return;
++ }
++ handleGroup.scan((msgID, handle, v) -> {
++ try {
++ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
++ CompletableFuture<AckResult> future = new CompletableFuture<>();
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
++ return CompletableFuture.completedFuture(null);
++ });
++ } catch (Exception e) {
++ log.error("error when clear handle for group. key:{}", key, e);
++ }
++ });
++ }
++
++ protected void clearAllHandle() {
++ log.info("start clear all handle in receiptHandleProcessor");
++ Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
++ for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
++ clearGroup(key);
++ }
++ log.info("clear all handle in receiptHandleProcessor done");
++ }
++
++ protected boolean renewExceptionNeedRetry(Throwable t) {
++ t = ExceptionUtils.getRealException(t);
++ if (t instanceof ProxyException) {
++ ProxyException proxyException = (ProxyException) t;
++ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
++ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
++ return false;
++ }
++ }
++ return true;
++ }
++
++ protected ProxyContext createContext(String actionName) {
++ return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
++ }
++}
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+index f3b805624..6a8888e97 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java
+@@ -17,266 +17,12 @@
+
+ package org.apache.rocketmq.proxy.service.receipt;
+
+-import com.google.common.base.Stopwatch;
+ import io.netty.channel.Channel;
+-import java.util.Map;
+-import java.util.Set;
+-import java.util.concurrent.CompletableFuture;
+-import java.util.concurrent.ConcurrentHashMap;
+-import java.util.concurrent.ConcurrentMap;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.ScheduledExecutorService;
+-import java.util.concurrent.ThreadPoolExecutor;
+-import java.util.concurrent.TimeUnit;
+-import org.apache.rocketmq.broker.client.ClientChannelInfo;
+-import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
+-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+-import org.apache.rocketmq.broker.client.ConsumerManager;
+-import org.apache.rocketmq.client.consumer.AckResult;
+-import org.apache.rocketmq.client.consumer.AckStatus;
+-import org.apache.rocketmq.common.ThreadFactoryImpl;
+-import org.apache.rocketmq.common.constant.LoggerName;
+-import org.apache.rocketmq.common.consumer.ReceiptHandle;
+-import org.apache.rocketmq.common.state.StateEventListener;
+-import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+-import org.apache.rocketmq.common.utils.StartAndShutdown;
+-import org.apache.rocketmq.logging.org.slf4j.Logger;
+-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+-import org.apache.rocketmq.proxy.common.RenewEvent;
+ import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
+ import org.apache.rocketmq.proxy.common.ProxyContext;
+-import org.apache.rocketmq.proxy.common.ProxyException;
+-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
+-import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
+-import org.apache.rocketmq.proxy.config.ConfigurationManager;
+-import org.apache.rocketmq.proxy.config.ProxyConfig;
+-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;
+-import org.apache.rocketmq.proxy.service.metadata.MetadataService;
+-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
+-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+-public class ReceiptHandleManager extends AbstractStartAndShutdown {
+- protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+- protected final MetadataService metadataService;
+- protected final ConsumerManager consumerManager;
+- protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap;
+- protected final StateEventListener<RenewEvent> eventListener;
+- protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
+- protected final ScheduledExecutorService scheduledExecutorService =
+- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
+- protected final ThreadPoolExecutor renewalWorkerService;
++public interface ReceiptHandleManager {
++ void addReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle);
+
+- public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
+- this.metadataService = metadataService;
+- this.consumerManager = consumerManager;
+- this.eventListener = eventListener;
+- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(
+- proxyConfig.getRenewThreadPoolNums(),
+- proxyConfig.getRenewMaxThreadPoolNums(),
+- 1, TimeUnit.MINUTES,
+- "RenewalWorkerThread",
+- proxyConfig.getRenewThreadPoolQueueCapacity()
+- );
+- consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() {
+- @Override
+- public void handle(ConsumerGroupEvent event, String group, Object... args) {
+- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) {
+- if (args == null || args.length < 1) {
+- return;
+- }
+- if (args[0] instanceof ClientChannelInfo) {
+- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
+- if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
+- // if the channel sync from other proxy is expired, not to clear data of connect to current proxy
+- return;
+- }
+- clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
+- log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo);
+- }
+- }
+- }
+-
+- @Override
+- public void shutdown() {
+-
+- }
+- });
+- this.receiptHandleGroupMap = new ConcurrentHashMap<>();
+- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size()));
+- this.appendStartAndShutdown(new StartAndShutdown() {
+- @Override
+- public void start() throws Exception {
+- scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0,
+- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
+- }
+-
+- @Override
+- public void shutdown() throws Exception {
+- scheduledExecutorService.shutdown();
+- clearAllHandle();
+- }
+- });
+- }
+-
+- public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) {
+- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group),
+- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle);
+- }
+-
+- public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) {
+- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group));
+- if (handleGroup == null) {
+- return null;
+- }
+- return handleGroup.remove(msgID, receiptHandle);
+- }
+-
+- protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) {
+- return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null;
+- }
+-
+- public void scheduleRenewTask() {
+- Stopwatch stopwatch = Stopwatch.createStarted();
+- try {
+- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+- for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) {
+- ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey();
+- if (clientIsOffline(key)) {
+- clearGroup(key);
+- continue;
+- }
+-
+- ReceiptHandleGroup group = entry.getValue();
+- group.scan((msgID, handleStr, v) -> {
+- long current = System.currentTimeMillis();
+- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
+- if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
+- return;
+- }
+- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr));
+- });
+- }
+- } catch (Exception e) {
+- log.error("unexpect error when schedule renew task", e);
+- }
+-
+- log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
+- }
+-
+- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) {
+- try {
+- group.computeIfPresent(msgID, handleStr, this::startRenewMessage);
+- } catch (Exception e) {
+- log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
+- }
+- }
+-
+- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
+- CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
+- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+- long current = System.currentTimeMillis();
+- try {
+- if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
+- log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
+- return CompletableFuture.completedFuture(null);
+- }
+- if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+- CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
+- future.whenComplete((ackResult, throwable) -> {
+- if (throwable != null) {
+- log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
+- if (renewExceptionNeedRetry(throwable)) {
+- messageReceiptHandle.incrementAndGetRenewRetryTimes();
+- resFuture.complete(messageReceiptHandle);
+- } else {
+- resFuture.complete(null);
+- }
+- } else if (AckStatus.OK.equals(ackResult.getStatus())) {
+- messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
+- messageReceiptHandle.resetRenewRetryTimes();
+- messageReceiptHandle.incrementRenewTimes();
+- resFuture.complete(messageReceiptHandle);
+- } else {
+- log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
+- resFuture.complete(null);
+- }
+- });
+- } else {
+- ProxyContext context = createContext("RenewMessage");
+- SubscriptionGroupConfig subscriptionGroupConfig =
+- metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
+- if (subscriptionGroupConfig == null) {
+- log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
+- return CompletableFuture.completedFuture(null);
+- }
+- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
+- CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
+- future.whenComplete((ackResult, throwable) -> {
+- if (throwable != null) {
+- log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
+- }
+- resFuture.complete(null);
+- });
+- }
+- } catch (Throwable t) {
+- log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t);
+- resFuture.complete(null);
+- }
+- return resFuture;
+- }
+-
+- protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) {
+- if (key == null) {
+- return;
+- }
+- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
+- if (handleGroup == null) {
+- return;
+- }
+- handleGroup.scan((msgID, handle, v) -> {
+- try {
+- handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
+- CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
+- return CompletableFuture.completedFuture(null);
+- });
+- } catch (Exception e) {
+- log.error("error when clear handle for group. key:{}", key, e);
+- }
+- });
+- }
+-
+- public void clearAllHandle() {
+- log.info("start clear all handle in receiptHandleProcessor");
+- Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet();
+- for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) {
+- clearGroup(key);
+- }
+- log.info("clear all handle in receiptHandleProcessor done");
+- }
+-
+- protected boolean renewExceptionNeedRetry(Throwable t) {
+- t = ExceptionUtils.getRealException(t);
+- if (t instanceof ProxyException) {
+- ProxyException proxyException = (ProxyException) t;
+- if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) ||
+- ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) {
+- return false;
+- }
+- }
+- return true;
+- }
+-
+- protected ProxyContext createContext(String actionName) {
+- return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName);
+- }
++ MessageReceiptHandle removeReceiptHandle(ProxyContext context, Channel channel, String group, String msgID, String receiptHandle);
+ }
+diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+similarity index 93%
+rename from proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
+rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+index 877c9fd6f..7c6943e44 100644
+--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java
++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+@@ -62,8 +62,8 @@ import static org.awaitility.Awaitility.await;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+
+-public class ReceiptHandleManagerTest extends BaseServiceTest {
+- private ReceiptHandleManager receiptHandleManager;
++public class DefaultReceiptHandleManagerTest extends BaseServiceTest {
++ private DefaultReceiptHandleManager receiptHandleManager;
+ @Mock
+ protected MessagingProcessor messagingProcessor;
+ @Mock
+@@ -87,7 +87,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+
+ @Before
+ public void setup() {
+- receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
++ receiptHandleManager = new DefaultReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() {
+ @Override
+ public void fireEvent(RenewEvent event) {
+ MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
+@@ -125,7 +125,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ @Test
+ public void testAddReceiptHandle() {
+ Channel channel = new LocalChannel();
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleManager.scheduleRenewTask();
+@@ -152,9 +152,9 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ .build().encode();
+ MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ }
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleManager.scheduleRenewTask();
+@@ -170,7 +170,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ public void testRenewReceiptHandle() {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+@@ -216,7 +216,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ public void testRenewExceedMaxRenewTimes() {
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new MQClientException(0, "error"));
+@@ -246,7 +246,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ public void testRenewWithInvalidHandle() {
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+
+ CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>();
+ ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
+@@ -270,7 +270,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+
+ AtomicInteger count = new AtomicInteger(0);
+ List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
+@@ -348,7 +348,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
+@@ -382,7 +382,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null);
+ Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()))
+@@ -418,7 +418,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
+ RECONSUME_TIMES);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+@@ -431,8 +431,8 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ @Test
+ public void testRemoveReceiptHandle() {
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
+- receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle);
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
+ receiptHandleManager.scheduleRenewTask();
+@@ -444,7 +444,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ @Test
+ public void testClearGroup() {
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig);
+@@ -459,7 +459,7 @@ public class ReceiptHandleManagerTest extends BaseServiceTest {
+ ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
+ Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+- receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle);
++ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle);
+ listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
+ assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
+ }
+--
+2.32.0.windows.2
+