diff options
Diffstat (limited to 'patch003-backport-feature-refactor-recipt-processor.patch')
-rw-r--r-- | patch003-backport-feature-refactor-recipt-processor.patch | 1653 |
1 files changed, 1653 insertions, 0 deletions
diff --git a/patch003-backport-feature-refactor-recipt-processor.patch b/patch003-backport-feature-refactor-recipt-processor.patch new file mode 100644 index 0000000..80ce7f3 --- /dev/null +++ b/patch003-backport-feature-refactor-recipt-processor.patch @@ -0,0 +1,1653 @@ +From d1bcda57b32f7ee033a3cb0067aef781dc12b7f1 Mon Sep 17 00:00:00 2001 +From: Zhouxiang Zhan <zhouxzhan@apache.org> +Date: Mon, 3 Jul 2023 14:09:21 +0800 +Subject: [PATCH] [ISSUE #6974] Feature/refector receipt processor (#6975) + +* Refector ReceiptHandleProcessor +--- + .../common/state/StateEventListener.java | 22 + + .../rocketmq/proxy/common/RenewEvent.java | 45 ++ + .../grpc/v2/DefaultGrpcMessingActivity.java | 12 +- + .../grpc/v2/consumer/AckMessageActivity.java | 8 +- + .../ChangeInvisibleDurationActivity.java | 6 +- + .../v2/consumer/ReceiveMessageActivity.java | 7 +- + .../producer/ForwardMessageToDLQActivity.java | 7 +- + .../processor/DefaultMessagingProcessor.java | 16 +- + .../proxy/processor/MessagingProcessor.java | 5 + + .../processor/ReceiptHandleProcessor.java | 292 ++----------- + .../service/receipt/ReceiptHandleManager.java | 282 +++++++++++++ + .../v2/consumer/AckMessageActivityTest.java | 2 +- + .../ChangeInvisibleDurationActivityTest.java | 4 +- + .../consumer/ReceiveMessageActivityTest.java | 2 +- + .../ForwardMessageToDLQActivityTest.java | 4 +- + .../processor/ConsumerProcessorTest.java | 1 - + .../receipt/ReceiptHandleManagerTest.java} | 389 ++++-------------- + 17 files changed, 499 insertions(+), 605 deletions(-) + create mode 100644 common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java + create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java + rename proxy/src/test/java/org/apache/rocketmq/proxy/{processor/ReceiptHandleProcessorTest.java => service/receipt/ReceiptHandleManagerTest.java} (63%) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java +new file mode 100644 +index 000000000..aed04dc31 +--- /dev/null ++++ b/common/src/main/java/org/apache/rocketmq/common/state/StateEventListener.java +@@ -0,0 +1,22 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.common.state; ++ ++public interface StateEventListener<T> { ++ void fireEvent(T event); ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +new file mode 100644 +index 000000000..fdf9833cc +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.common; ++ ++import java.util.concurrent.CompletableFuture; ++import org.apache.rocketmq.client.consumer.AckResult; ++ ++public class RenewEvent { ++ protected MessageReceiptHandle messageReceiptHandle; ++ protected long renewTime; ++ protected CompletableFuture<AckResult> future; ++ ++ public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture<AckResult> future) { ++ this.messageReceiptHandle = messageReceiptHandle; ++ this.renewTime = renewTime; ++ this.future = future; ++ } ++ ++ public MessageReceiptHandle getMessageReceiptHandle() { ++ return messageReceiptHandle; ++ } ++ ++ public long getRenewTime() { ++ return renewTime; ++ } ++ ++ public CompletableFuture<AckResult> getFuture() { ++ return future; ++ } ++} +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +index 73b764bc4..091e9086e 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +@@ -55,14 +55,12 @@ import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity; + import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity; + import org.apache.rocketmq.proxy.grpc.v2.transaction.EndTransactionActivity; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + + public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown implements GrpcMessingActivity { + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + + protected GrpcClientSettingsManager grpcClientSettingsManager; + protected GrpcChannelManager grpcChannelManager; +- protected ReceiptHandleProcessor receiptHandleProcessor; + protected ReceiveMessageActivity receiveMessageActivity; + protected AckMessageActivity ackMessageActivity; + protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity; +@@ -79,18 +77,16 @@ public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown impleme + protected void init(MessagingProcessor messagingProcessor) { + this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor); + this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(), this.grpcClientSettingsManager); +- this.receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); + +- this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.endTransactionActivity = new EndTransactionActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.routeActivity = new RouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + +- this.appendStartAndShutdown(this.receiptHandleProcessor); + this.appendStartAndShutdown(this.grpcClientSettingsManager); + } + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +index 993f069b9..9a3a77201 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java +@@ -37,16 +37,12 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; + import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + + public class AckMessageActivity extends AbstractMessingActivity { +- protected ReceiptHandleProcessor receiptHandleProcessor; + +- public AckMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, +- GrpcClientSettingsManager grpcClientSettingsManager, ++ public AckMessageActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, + GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.receiptHandleProcessor = receiptHandleProcessor; + } + + public CompletableFuture<AckMessageResponse> ackMessage(ProxyContext ctx, AckMessageRequest request) { +@@ -98,7 +94,7 @@ public class AckMessageActivity extends AbstractMessingActivity { + String handleString = ackMessageEntry.getReceiptHandle(); + + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); +- MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); ++ MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle()); + if (messageReceiptHandle != null) { + handleString = messageReceiptHandle.getReceiptHandleStr(); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java +index 9b7e947e0..02356c497 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java +@@ -32,16 +32,12 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; + import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + + public class ChangeInvisibleDurationActivity extends AbstractMessingActivity { +- protected ReceiptHandleProcessor receiptHandleProcessor; + + public ChangeInvisibleDurationActivity(MessagingProcessor messagingProcessor, +- ReceiptHandleProcessor receiptHandleProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.receiptHandleProcessor = receiptHandleProcessor; + } + + public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx, +@@ -55,7 +51,7 @@ public class ChangeInvisibleDurationActivity extends AbstractMessingActivity { + ReceiptHandle receiptHandle = ReceiptHandle.decode(request.getReceiptHandle()); + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + +- MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); ++ MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle()); + if (messageReceiptHandle != null) { + receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +index 9830e7dac..a504179a9 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +@@ -40,7 +40,6 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; + import org.apache.rocketmq.proxy.processor.QueueSelector; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; + import org.apache.rocketmq.proxy.service.route.MessageQueueSelector; + import org.apache.rocketmq.proxy.service.route.MessageQueueView; +@@ -48,13 +47,11 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; + import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; + + public class ReceiveMessageActivity extends AbstractMessingActivity { +- protected ReceiptHandleProcessor receiptHandleProcessor; + private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; + +- public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, ++ public ReceiveMessageActivity(MessagingProcessor messagingProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.receiptHandleProcessor = receiptHandleProcessor; + } + + public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, +@@ -145,7 +142,7 @@ public class ReceiveMessageActivity extends AbstractMessingActivity { + MessageReceiptHandle messageReceiptHandle = + new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), + messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); +- receiptHandleProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), messageReceiptHandle); ++ messagingProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), messageReceiptHandle); + } + } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java +index 6b5c5c7e0..f1fc5a143 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java +@@ -28,16 +28,13 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; + import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter; + import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; + import org.apache.rocketmq.proxy.processor.MessagingProcessor; +-import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + import org.apache.rocketmq.remoting.protocol.RemotingCommand; + + public class ForwardMessageToDLQActivity extends AbstractMessingActivity { +- protected ReceiptHandleProcessor receiptHandleProcessor; + +- public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor, ++ public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); +- this.receiptHandleProcessor = receiptHandleProcessor; + } + + public CompletableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(ProxyContext ctx, +@@ -48,7 +45,7 @@ public class ForwardMessageToDLQActivity extends AbstractMessingActivity { + + String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); + String handleString = request.getReceiptHandle(); +- MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); ++ MessageReceiptHandle messageReceiptHandle = messagingProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle()); + if (messageReceiptHandle != null) { + handleString = messageReceiptHandle.getReceiptHandleStr(); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +index e663ae1ba..1b3f0af4e 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +@@ -22,7 +22,6 @@ import java.util.Set; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +- + import org.apache.rocketmq.acl.common.AclUtils; + import org.apache.rocketmq.broker.BrokerController; + import org.apache.rocketmq.broker.client.ClientChannelInfo; +@@ -41,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.common.thread.ThreadPoolMonitor; + import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; + import org.apache.rocketmq.proxy.common.Address; ++import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; +@@ -64,6 +64,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen + protected TransactionProcessor transactionProcessor; + protected ClientProcessor clientProcessor; + protected RequestBrokerProcessor requestBrokerProcessor; ++ protected ReceiptHandleProcessor receiptHandleProcessor; + + protected ThreadPoolExecutor producerProcessorExecutor; + protected ThreadPoolExecutor consumerProcessorExecutor; +@@ -95,6 +96,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen + this.transactionProcessor = new TransactionProcessor(this, serviceManager); + this.clientProcessor = new ClientProcessor(this, serviceManager); + this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager); ++ this.receiptHandleProcessor = new ReceiptHandleProcessor(this, serviceManager); + + this.init(); + } +@@ -308,4 +310,16 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen + public MetadataService getMetadataService() { + return this.serviceManager.getMetadataService(); + } ++ ++ @Override ++ public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, ++ MessageReceiptHandle messageReceiptHandle) { ++ receiptHandleProcessor.addReceiptHandle(ctx, channel, group, msgID, messageReceiptHandle); ++ } ++ ++ @Override ++ public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, ++ String receiptHandle) { ++ return receiptHandleProcessor.removeReceiptHandle(ctx, channel, group, msgID, receiptHandle); ++ } + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +index 263068965..d86be0bd8 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; + import org.apache.rocketmq.common.message.Message; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.proxy.common.Address; ++import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.common.utils.StartAndShutdown; + import org.apache.rocketmq.proxy.service.metadata.MetadataService; +@@ -299,4 +300,8 @@ public interface MessagingProcessor extends StartAndShutdown { + ProxyRelayService getProxyRelayService(); + + MetadataService getMetadataService(); ++ ++ void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); ++ ++ MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle); + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +index 88c597e99..9c7e8dea9 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +@@ -19,291 +19,51 @@ package org.apache.rocketmq.proxy.processor; + + import com.google.common.base.MoreObjects; + import com.google.common.base.Objects; +-import com.google.common.base.Stopwatch; + import io.netty.channel.Channel; +-import java.util.Map; +-import java.util.Set; +-import java.util.concurrent.CompletableFuture; +-import java.util.concurrent.ConcurrentHashMap; +-import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.Executors; +-import java.util.concurrent.ScheduledExecutorService; +-import java.util.concurrent.ThreadPoolExecutor; +-import java.util.concurrent.TimeUnit; +-import org.apache.rocketmq.broker.client.ClientChannelInfo; +-import org.apache.rocketmq.broker.client.ConsumerGroupEvent; +-import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; +-import org.apache.rocketmq.client.consumer.AckResult; +-import org.apache.rocketmq.client.consumer.AckStatus; +-import org.apache.rocketmq.common.ThreadFactoryImpl; + import org.apache.rocketmq.common.constant.LoggerName; + import org.apache.rocketmq.common.consumer.ReceiptHandle; +-import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +-import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +-import org.apache.rocketmq.proxy.common.MessageReceiptHandle; +-import org.apache.rocketmq.proxy.common.ProxyContext; +-import org.apache.rocketmq.proxy.common.ProxyException; +-import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; +-import org.apache.rocketmq.common.utils.StartAndShutdown; +-import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +-import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; +-import org.apache.rocketmq.proxy.config.ConfigurationManager; +-import org.apache.rocketmq.proxy.config.ProxyConfig; +-import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; +-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; ++import org.apache.rocketmq.common.state.StateEventListener; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.proxy.common.RenewEvent; ++import org.apache.rocketmq.proxy.common.MessageReceiptHandle; ++import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.service.receipt.ReceiptHandleManager; ++import org.apache.rocketmq.proxy.service.ServiceManager; + +-public class ReceiptHandleProcessor extends AbstractStartAndShutdown { ++public class ReceiptHandleProcessor extends AbstractProcessor { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); +- protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; +- protected final ScheduledExecutorService scheduledExecutorService = +- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); +- protected ThreadPoolExecutor renewalWorkerService; +- protected final MessagingProcessor messagingProcessor; +- protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); +- +- public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) { +- this.messagingProcessor = messagingProcessor; +- this.receiptHandleGroupMap = new ConcurrentHashMap<>(); +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( +- proxyConfig.getRenewThreadPoolNums(), +- proxyConfig.getRenewMaxThreadPoolNums(), +- 1, TimeUnit.MINUTES, +- "RenewalWorkerThread", +- proxyConfig.getRenewThreadPoolQueueCapacity() +- ); +- this.init(); +- } +- +- protected void init() { +- this.registerConsumerListener(); +- this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); +- this.appendStartAndShutdown(new StartAndShutdown() { +- @Override +- public void start() throws Exception { +- scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, +- ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); +- } +- +- @Override +- public void shutdown() throws Exception { +- scheduledExecutorService.shutdown(); +- clearAllHandle(); +- } +- }); +- } +- +- protected void registerConsumerListener() { +- this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListener() { +- @Override +- public void handle(ConsumerGroupEvent event, String group, Object... args) { +- if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { +- if (args == null || args.length < 1) { ++ protected ReceiptHandleManager receiptHandleManager; ++ ++ public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { ++ super(messagingProcessor, serviceManager); ++ StateEventListener<RenewEvent> eventListener = event -> { ++ ProxyContext context = createContext("RenewMessage"); ++ MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ++ ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); ++ messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), ++ messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) ++ .whenComplete((v, t) -> { ++ if (t != null) { ++ event.getFuture().completeExceptionally(t); + return; + } +- if (args[0] instanceof ClientChannelInfo) { +- ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; +- if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { +- // if the channel sync from other proxy is expired, not to clear data of connect to current proxy +- return; +- } +- clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); +- log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); +- } +- } +- } +- +- @Override +- public void shutdown() { +- +- } +- }); ++ event.getFuture().complete(v); ++ }); ++ }; ++ this.receiptHandleManager = new ReceiptHandleManager(serviceManager.getMetadataService(), serviceManager.getConsumerManager(), eventListener); + } + + protected ProxyContext createContext(String actionName) { + return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); + } + +- protected void scheduleRenewTask() { +- Stopwatch stopwatch = Stopwatch.createStarted(); +- try { +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { +- ReceiptHandleGroupKey key = entry.getKey(); +- if (clientIsOffline(key)) { +- clearGroup(key); +- continue; +- } +- +- ReceiptHandleGroup group = entry.getValue(); +- group.scan((msgID, handleStr, v) -> { +- long current = System.currentTimeMillis(); +- ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); +- if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { +- return; +- } +- renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); +- }); +- } +- } catch (Exception e) { +- log.error("unexpect error when schedule renew task", e); +- } +- +- log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); +- } +- +- protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { +- try { +- group.computeIfPresent(msgID, handleStr, this::startRenewMessage); +- } catch (Exception e) { +- log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); +- } +- } +- +- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { +- CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- ProxyContext context = createContext("RenewMessage"); +- ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); +- long current = System.currentTimeMillis(); +- try { +- if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { +- log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); +- return CompletableFuture.completedFuture(null); +- } +- if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { +- CompletableFuture<AckResult> future = +- messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), +- messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes())); +- future.whenComplete((ackResult, throwable) -> { +- if (throwable != null) { +- log.error("error when renew. handle:{}", messageReceiptHandle, throwable); +- if (renewExceptionNeedRetry(throwable)) { +- messageReceiptHandle.incrementAndGetRenewRetryTimes(); +- resFuture.complete(messageReceiptHandle); +- } else { +- resFuture.complete(null); +- } +- } else if (AckStatus.OK.equals(ackResult.getStatus())) { +- messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); +- messageReceiptHandle.resetRenewRetryTimes(); +- messageReceiptHandle.incrementRenewTimes(); +- resFuture.complete(messageReceiptHandle); +- } else { +- log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); +- resFuture.complete(null); +- } +- }); +- } else { +- SubscriptionGroupConfig subscriptionGroupConfig = +- messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); +- if (subscriptionGroupConfig == null) { +- log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); +- return CompletableFuture.completedFuture(null); +- } +- RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); +- CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context, +- handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), +- messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes())); +- future.whenComplete((ackResult, throwable) -> { +- if (throwable != null) { +- log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); +- } +- resFuture.complete(null); +- }); +- } +- } catch (Throwable t) { +- log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); +- resFuture.complete(null); +- } +- return resFuture; +- } +- +- protected boolean renewExceptionNeedRetry(Throwable t) { +- t = ExceptionUtils.getRealException(t); +- if (t instanceof ProxyException) { +- ProxyException proxyException = (ProxyException) t; +- if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || +- ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { +- return false; +- } +- } +- return true; +- } +- +- protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) { +- return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.channel) == null; +- } +- + public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { +- this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, messageReceiptHandle); +- } +- +- protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, MessageReceiptHandle messageReceiptHandle) { +- if (key == null) { +- return; +- } +- ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key, +- k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, group, msgID, messageReceiptHandle); + } + + public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) { +- return this.removeReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle); +- } +- +- protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle) { +- if (key == null) { +- return null; +- } +- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(key); +- if (handleGroup == null) { +- return null; +- } +- return handleGroup.remove(msgID, receiptHandle); +- } +- +- protected void clearGroup(ReceiptHandleGroupKey key) { +- if (key == null) { +- return; +- } +- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); +- ProxyContext context = createContext("ClearGroup"); +- ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); +- if (handleGroup == null) { +- return; +- } +- handleGroup.scan((msgID, handle, v) -> { +- try { +- handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { +- ReceiptHandle receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); +- messagingProcessor.changeInvisibleTime( +- context, +- receiptHandle, +- messageReceiptHandle.getMessageId(), +- messageReceiptHandle.getGroup(), +- messageReceiptHandle.getTopic(), +- proxyConfig.getInvisibleTimeMillisWhenClear() +- ); +- return CompletableFuture.completedFuture(null); +- }); +- } catch (Exception e) { +- log.error("error when clear handle for group. key:{}", key, e); +- } +- }); +- } +- +- protected void clearAllHandle() { +- log.info("start clear all handle in receiptHandleProcessor"); +- Set<ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); +- for (ReceiptHandleGroupKey key : keySet) { +- clearGroup(key); +- } +- log.info("clear all handle in receiptHandleProcessor done"); ++ return receiptHandleManager.removeReceiptHandle(channel, group, msgID, receiptHandle); + } + + public static class ReceiptHandleGroupKey { +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java +new file mode 100644 +index 000000000..f3b805624 +--- /dev/null ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManager.java +@@ -0,0 +1,282 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.proxy.service.receipt; ++ ++import com.google.common.base.Stopwatch; ++import io.netty.channel.Channel; ++import java.util.Map; ++import java.util.Set; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ConcurrentMap; ++import java.util.concurrent.Executors; ++import java.util.concurrent.ScheduledExecutorService; ++import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; ++import org.apache.rocketmq.broker.client.ClientChannelInfo; ++import org.apache.rocketmq.broker.client.ConsumerGroupEvent; ++import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; ++import org.apache.rocketmq.broker.client.ConsumerManager; ++import org.apache.rocketmq.client.consumer.AckResult; ++import org.apache.rocketmq.client.consumer.AckStatus; ++import org.apache.rocketmq.common.ThreadFactoryImpl; ++import org.apache.rocketmq.common.constant.LoggerName; ++import org.apache.rocketmq.common.consumer.ReceiptHandle; ++import org.apache.rocketmq.common.state.StateEventListener; ++import org.apache.rocketmq.common.thread.ThreadPoolMonitor; ++import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; ++import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; ++import org.apache.rocketmq.common.utils.StartAndShutdown; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.proxy.common.RenewEvent; ++import org.apache.rocketmq.proxy.common.MessageReceiptHandle; ++import org.apache.rocketmq.proxy.common.ProxyContext; ++import org.apache.rocketmq.proxy.common.ProxyException; ++import org.apache.rocketmq.proxy.common.ProxyExceptionCode; ++import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; ++import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; ++import org.apache.rocketmq.proxy.common.channel.ChannelHelper; ++import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; ++import org.apache.rocketmq.proxy.config.ConfigurationManager; ++import org.apache.rocketmq.proxy.config.ProxyConfig; ++import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; ++import org.apache.rocketmq.proxy.service.metadata.MetadataService; ++import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; ++import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; ++ ++public class ReceiptHandleManager extends AbstractStartAndShutdown { ++ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); ++ protected final MetadataService metadataService; ++ protected final ConsumerManager consumerManager; ++ protected final ConcurrentMap<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap; ++ protected final StateEventListener<RenewEvent> eventListener; ++ protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); ++ protected final ScheduledExecutorService scheduledExecutorService = ++ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); ++ protected final ThreadPoolExecutor renewalWorkerService; ++ ++ public ReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) { ++ this.metadataService = metadataService; ++ this.consumerManager = consumerManager; ++ this.eventListener = eventListener; ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor( ++ proxyConfig.getRenewThreadPoolNums(), ++ proxyConfig.getRenewMaxThreadPoolNums(), ++ 1, TimeUnit.MINUTES, ++ "RenewalWorkerThread", ++ proxyConfig.getRenewThreadPoolQueueCapacity() ++ ); ++ consumerManager.appendConsumerIdsChangeListener(new ConsumerIdsChangeListener() { ++ @Override ++ public void handle(ConsumerGroupEvent event, String group, Object... args) { ++ if (ConsumerGroupEvent.CLIENT_UNREGISTER.equals(event)) { ++ if (args == null || args.length < 1) { ++ return; ++ } ++ if (args[0] instanceof ClientChannelInfo) { ++ ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; ++ if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) { ++ // if the channel sync from other proxy is expired, not to clear data of connect to current proxy ++ return; ++ } ++ clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group)); ++ log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", group, clientChannelInfo); ++ } ++ } ++ } ++ ++ @Override ++ public void shutdown() { ++ ++ } ++ }); ++ this.receiptHandleGroupMap = new ConcurrentHashMap<>(); ++ this.renewalWorkerService.setRejectedExecutionHandler((r, executor) -> log.warn("add renew task failed. queueSize:{}", executor.getQueue().size())); ++ this.appendStartAndShutdown(new StartAndShutdown() { ++ @Override ++ public void start() throws Exception { ++ scheduledExecutorService.scheduleWithFixedDelay(() -> scheduleRenewTask(), 0, ++ ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS); ++ } ++ ++ @Override ++ public void shutdown() throws Exception { ++ scheduledExecutorService.shutdown(); ++ clearAllHandle(); ++ } ++ }); ++ } ++ ++ public void addReceiptHandle(Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle) { ++ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group), ++ k -> new ReceiptHandleGroup()).put(msgID, messageReceiptHandle); ++ } ++ ++ public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) { ++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.get(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, group)); ++ if (handleGroup == null) { ++ return null; ++ } ++ return handleGroup.remove(msgID, receiptHandle); ++ } ++ ++ protected boolean clientIsOffline(ReceiptHandleProcessor.ReceiptHandleGroupKey groupKey) { ++ return this.consumerManager.findChannel(groupKey.getGroup(), groupKey.getChannel()) == null; ++ } ++ ++ public void scheduleRenewTask() { ++ Stopwatch stopwatch = Stopwatch.createStarted(); ++ try { ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ for (Map.Entry<ReceiptHandleProcessor.ReceiptHandleGroupKey, ReceiptHandleGroup> entry : receiptHandleGroupMap.entrySet()) { ++ ReceiptHandleProcessor.ReceiptHandleGroupKey key = entry.getKey(); ++ if (clientIsOffline(key)) { ++ clearGroup(key); ++ continue; ++ } ++ ++ ReceiptHandleGroup group = entry.getValue(); ++ group.scan((msgID, handleStr, v) -> { ++ long current = System.currentTimeMillis(); ++ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); ++ if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { ++ return; ++ } ++ renewalWorkerService.submit(() -> renewMessage(group, msgID, handleStr)); ++ }); ++ } ++ } catch (Exception e) { ++ log.error("unexpect error when schedule renew task", e); ++ } ++ ++ log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis()); ++ } ++ ++ protected void renewMessage(ReceiptHandleGroup group, String msgID, String handleStr) { ++ try { ++ group.computeIfPresent(msgID, handleStr, this::startRenewMessage); ++ } catch (Exception e) { ++ log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e); ++ } ++ } ++ ++ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) { ++ CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>(); ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ long current = System.currentTimeMillis(); ++ try { ++ if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) { ++ log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle); ++ return CompletableFuture.completedFuture(null); ++ } ++ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); ++ future.whenComplete((ackResult, throwable) -> { ++ if (throwable != null) { ++ log.error("error when renew. handle:{}", messageReceiptHandle, throwable); ++ if (renewExceptionNeedRetry(throwable)) { ++ messageReceiptHandle.incrementAndGetRenewRetryTimes(); ++ resFuture.complete(messageReceiptHandle); ++ } else { ++ resFuture.complete(null); ++ } ++ } else if (AckStatus.OK.equals(ackResult.getStatus())) { ++ messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo()); ++ messageReceiptHandle.resetRenewRetryTimes(); ++ messageReceiptHandle.incrementRenewTimes(); ++ resFuture.complete(messageReceiptHandle); ++ } else { ++ log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle); ++ resFuture.complete(null); ++ } ++ }); ++ } else { ++ ProxyContext context = createContext("RenewMessage"); ++ SubscriptionGroupConfig subscriptionGroupConfig = ++ metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup()); ++ if (subscriptionGroupConfig == null) { ++ log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle); ++ return CompletableFuture.completedFuture(null); ++ } ++ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); ++ future.whenComplete((ackResult, throwable) -> { ++ if (throwable != null) { ++ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); ++ } ++ resFuture.complete(null); ++ }); ++ } ++ } catch (Throwable t) { ++ log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, t); ++ resFuture.complete(null); ++ } ++ return resFuture; ++ } ++ ++ protected void clearGroup(ReceiptHandleProcessor.ReceiptHandleGroupKey key) { ++ if (key == null) { ++ return; ++ } ++ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); ++ ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key); ++ if (handleGroup == null) { ++ return; ++ } ++ handleGroup.scan((msgID, handle, v) -> { ++ try { ++ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { ++ CompletableFuture<AckResult> future = new CompletableFuture<>(); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); ++ return CompletableFuture.completedFuture(null); ++ }); ++ } catch (Exception e) { ++ log.error("error when clear handle for group. key:{}", key, e); ++ } ++ }); ++ } ++ ++ public void clearAllHandle() { ++ log.info("start clear all handle in receiptHandleProcessor"); ++ Set<ReceiptHandleProcessor.ReceiptHandleGroupKey> keySet = receiptHandleGroupMap.keySet(); ++ for (ReceiptHandleProcessor.ReceiptHandleGroupKey key : keySet) { ++ clearGroup(key); ++ } ++ log.info("clear all handle in receiptHandleProcessor done"); ++ } ++ ++ protected boolean renewExceptionNeedRetry(Throwable t) { ++ t = ExceptionUtils.getRealException(t); ++ if (t instanceof ProxyException) { ++ ProxyException proxyException = (ProxyException) t; ++ if (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || ++ ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) { ++ return false; ++ } ++ } ++ return true; ++ } ++ ++ protected ProxyContext createContext(String actionName) { ++ return ProxyContext.createForInner(this.getClass().getSimpleName() + actionName); ++ } ++} +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +index 4df834bb6..49fdfc6a8 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java +@@ -47,7 +47,7 @@ public class AckMessageActivityTest extends BaseActivityTest { + @Before + public void before() throws Throwable { + super.before(); +- this.ackMessageActivity = new AckMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + @Test +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java +index fdd052da7..2de9a066b 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java +@@ -49,7 +49,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest { + @Before + public void before() throws Throwable { + super.before(); +- this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, receiptHandleProcessor, ++ this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, + grpcClientSettingsManager, grpcChannelManager); + } + +@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends BaseActivityTest { + when(this.messagingProcessor.changeInvisibleTime( + any(), receiptHandleCaptor.capture(), anyString(), anyString(), anyString(), invisibleTimeArgumentCaptor.capture() + )).thenReturn(CompletableFuture.completedFuture(ackResult)); +- when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) ++ when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) + .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); + + ChangeInvisibleDurationResponse response = this.changeInvisibleDurationActivity.changeInvisibleDuration( +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +index 535af838c..2e562504a 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +@@ -74,7 +74,7 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { + public void before() throws Throwable { + super.before(); + ConfigurationManager.getProxyConfig().setGrpcClientConsumerMinLongPollingTimeoutMillis(0); +- this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, ++ this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, + grpcClientSettingsManager, grpcChannelManager); + } + +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java +index ec620340c..87824e5b4 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java +@@ -44,7 +44,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest { + @Before + public void before() throws Throwable { + super.before(); +- this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor,receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); ++ this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + @Test +@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends BaseActivityTest { + .thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, ""))); + + String savedHandleStr = buildReceiptHandle("topic", System.currentTimeMillis(),3000); +- when(receiptHandleProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) ++ when(messagingProcessor.removeReceiptHandle(any(), any(), anyString(), anyString(), anyString())) + .thenReturn(new MessageReceiptHandle("group", "topic", 0, savedHandleStr, "msgId", 0, 0)); + + ForwardMessageToDeadLetterQueueResponse response = this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue( +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +index bfa2cc3e6..717e86fc0 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +@@ -73,7 +73,6 @@ public class ConsumerProcessorTest extends BaseProcessorTest { + @Before + public void before() throws Throwable { + super.before(); +- ReceiptHandleProcessor receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); + this.consumerProcessor = new ConsumerProcessor(messagingProcessor, serviceManager, Executors.newCachedThreadPool()); + } + +diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java +similarity index 63% +rename from proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +rename to proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java +index c76f40f92..877c9fd6f 100644 +--- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java ++++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/ReceiptHandleManagerTest.java +@@ -15,21 +15,10 @@ + * limitations under the License. + */ + +-package org.apache.rocketmq.proxy.processor; ++package org.apache.rocketmq.proxy.service.receipt; + +-import io.netty.buffer.ByteBufAllocator; + import io.netty.channel.Channel; +-import io.netty.channel.ChannelConfig; +-import io.netty.channel.ChannelFuture; +-import io.netty.channel.ChannelId; +-import io.netty.channel.ChannelMetadata; +-import io.netty.channel.ChannelPipeline; +-import io.netty.channel.ChannelProgressivePromise; +-import io.netty.channel.ChannelPromise; +-import io.netty.channel.EventLoop; +-import io.netty.util.Attribute; +-import io.netty.util.AttributeKey; +-import java.net.SocketAddress; ++import io.netty.channel.local.LocalChannel; + import java.time.Duration; + import java.util.ArrayList; + import java.util.List; +@@ -38,26 +27,34 @@ import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.broker.client.ClientChannelInfo; + import org.apache.rocketmq.broker.client.ConsumerGroupEvent; + import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; ++import org.apache.rocketmq.broker.client.ConsumerManager; + import org.apache.rocketmq.client.consumer.AckResult; + import org.apache.rocketmq.client.consumer.AckStatus; + import org.apache.rocketmq.client.exception.MQClientException; + import org.apache.rocketmq.common.consumer.ReceiptHandle; + import org.apache.rocketmq.common.message.MessageClientIDSetter; ++import org.apache.rocketmq.common.state.StateEventListener; ++import org.apache.rocketmq.proxy.common.RenewEvent; + import org.apache.rocketmq.proxy.common.ContextVariable; + import org.apache.rocketmq.proxy.common.MessageReceiptHandle; + import org.apache.rocketmq.proxy.common.ProxyContext; + import org.apache.rocketmq.proxy.common.ProxyException; +-import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; + import org.apache.rocketmq.proxy.common.ProxyExceptionCode; + import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; ++import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; + import org.apache.rocketmq.proxy.config.ConfigurationManager; + import org.apache.rocketmq.proxy.config.ProxyConfig; ++import org.apache.rocketmq.proxy.processor.MessagingProcessor; ++import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; ++import org.apache.rocketmq.proxy.service.BaseServiceTest; ++import org.apache.rocketmq.proxy.service.metadata.MetadataService; + import org.apache.rocketmq.remoting.protocol.LanguageCode; + import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; + import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + import org.junit.Before; + import org.junit.Test; + import org.mockito.ArgumentCaptor; ++import org.mockito.Mock; + import org.mockito.Mockito; + import org.mockito.stubbing.Answer; + +@@ -65,8 +62,14 @@ import static org.awaitility.Awaitility.await; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + +-public class ReceiptHandleProcessorTest extends BaseProcessorTest { +- private ReceiptHandleProcessor receiptHandleProcessor; ++public class ReceiptHandleManagerTest extends BaseServiceTest { ++ private ReceiptHandleManager receiptHandleManager; ++ @Mock ++ protected MessagingProcessor messagingProcessor; ++ @Mock ++ protected MetadataService metadataService; ++ @Mock ++ protected ConsumerManager consumerManager; + + private static final ProxyContext PROXY_CONTEXT = ProxyContext.create(); + private static final String GROUP = "group"; +@@ -84,6 +87,22 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + + @Before + public void setup() { ++ receiptHandleManager = new ReceiptHandleManager(metadataService, consumerManager, new StateEventListener<RenewEvent>() { ++ @Override ++ public void fireEvent(RenewEvent event) { ++ MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ++ ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); ++ messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, messageReceiptHandle.getMessageId(), ++ messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) ++ .whenComplete((v, t) -> { ++ if (t != null) { ++ event.getFuture().completeExceptionally(t); ++ return; ++ } ++ event.getFuture().complete(v); ++ }); ++ } ++ }); + ProxyConfig config = ConfigurationManager.getProxyConfig(); + receiptHandle = ReceiptHandle.builder() + .startOffset(0L) +@@ -97,20 +116,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + .commitLogOffset(0L) + .build().encode(); + PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id"); +- PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new MockChannel()); +- receiptHandleProcessor = new ReceiptHandleProcessor(messagingProcessor); +- Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class)); ++ PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel()); ++ Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class)); + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + } + + @Test + public void testAddReceiptHandle() { +- Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ Channel channel = new LocalChannel(); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.scheduleRenewTask(); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); +@@ -134,12 +152,12 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + .build().encode(); + MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + } +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.scheduleRenewTask(); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.scheduleRenewTask(); + ArgumentCaptor<ReceiptHandle> handleArgumentCaptor = ArgumentCaptor.forClass(ReceiptHandle.class); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), handleArgumentCaptor.capture(), Mockito.eq(MESSAGE_ID), +@@ -152,10 +170,10 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + public void testRenewReceiptHandle() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + long newInvisibleTime = 18000L; + + ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder() +@@ -179,27 +197,26 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + ackResult.setExtraInfo(newReceiptHandle); + + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), +- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())))) ++ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get())))) + .thenReturn(CompletableFuture.completedFuture(ackResult)); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == INVISIBLE_TIME), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.get()))); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.argThat(r -> r.getInvisibleTime() == newInvisibleTime), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.incrementAndGet()))); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + } + + @Test + public void testRenewExceedMaxRenewTimes() { +- ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + + CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); + ackResultFuture.completeExceptionally(new MQClientException(0, "error")); +@@ -207,13 +224,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + RetryPolicy retryPolicy = new RenewStrategyPolicy(); + + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), +- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) ++ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())))) + .thenReturn(ackResultFuture); + + await().atMost(Duration.ofSeconds(1)).until(() -> { +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + try { +- ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); ++ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; +@@ -228,19 +245,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + @Test + public void testRenewWithInvalidHandle() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + + CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); + ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), +- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) ++ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) + .thenReturn(ackResultFuture); + + await().atMost(Duration.ofSeconds(1)).until(() -> { +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + try { +- ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); ++ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; +@@ -252,8 +269,8 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + public void testRenewWithErrorThenOK() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + + AtomicInteger count = new AtomicInteger(0); + List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); +@@ -297,13 +314,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> { + return futureList.get(count.getAndIncrement()); + }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), +- Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); ++ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); + } + + await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> { +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + try { +- ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); ++ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; +@@ -331,19 +348,19 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) + .thenReturn(CompletableFuture.completedFuture(new AckResult())); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(RECONSUME_TIMES))); + + await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { +- ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); ++ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + assertTrue(receiptHandleGroup.isEmpty()); + }); + } +@@ -365,15 +382,15 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) + .thenReturn(CompletableFuture.completedFuture(new AckResult())); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + await().atMost(Duration.ofSeconds(1)).until(() -> { + try { +- ReceiptHandleGroup receiptHandleGroup = receiptHandleProcessor.receiptHandleGroupMap.values().stream().findFirst().get(); ++ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; +@@ -401,11 +418,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, + RECONSUME_TIMES); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); +- receiptHandleProcessor.scheduleRenewTask(); ++ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); ++ receiptHandleManager.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); +@@ -414,11 +431,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + @Test + public void testRemoveReceiptHandle() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); +- receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.removeReceiptHandle(channel, GROUP, MSG_ID, receiptHandle); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); +@@ -427,11 +444,11 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + @Test + public void testClearGroup() { + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); +- receiptHandleProcessor.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.clearGroup(new ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP)); + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); +- receiptHandleProcessor.scheduleRenewTask(); ++ receiptHandleManager.scheduleRenewTask(); + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getInvisibleTimeMillisWhenClear())); +@@ -440,242 +457,10 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { + @Test + public void testClientOffline() { + ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); +- Mockito.verify(messagingProcessor, Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture()); ++ Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); +- receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); ++ receiptHandleManager.addReceiptHandle(channel, GROUP, MSG_ID, messageReceiptHandle); + listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); +- assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty()); +- } +- +- class MockChannel implements Channel { +- @Override +- public ChannelId id() { +- return new ChannelId() { +- @Override +- public String asShortText() { +- return "short"; +- } +- +- @Override +- public String asLongText() { +- return "long"; +- } +- +- @Override +- public int compareTo(ChannelId o) { +- return 1; +- } +- }; +- } +- +- @Override +- public EventLoop eventLoop() { +- return null; +- } +- +- @Override +- public Channel parent() { +- return null; +- } +- +- @Override +- public ChannelConfig config() { +- return null; +- } +- +- @Override +- public boolean isOpen() { +- return false; +- } +- +- @Override +- public boolean isRegistered() { +- return false; +- } +- +- @Override +- public boolean isActive() { +- return false; +- } +- +- @Override +- public ChannelMetadata metadata() { +- return null; +- } +- +- @Override +- public SocketAddress localAddress() { +- return null; +- } +- +- @Override +- public SocketAddress remoteAddress() { +- return null; +- } +- +- @Override +- public ChannelFuture closeFuture() { +- return null; +- } +- +- @Override +- public boolean isWritable() { +- return false; +- } +- +- @Override +- public long bytesBeforeUnwritable() { +- return 0; +- } +- +- @Override +- public long bytesBeforeWritable() { +- return 0; +- } +- +- @Override +- public Unsafe unsafe() { +- return null; +- } +- +- @Override +- public ChannelPipeline pipeline() { +- return null; +- } +- +- @Override +- public ByteBufAllocator alloc() { +- return null; +- } +- +- @Override +- public ChannelFuture bind(SocketAddress localAddress) { +- return null; +- } +- +- @Override +- public ChannelFuture connect(SocketAddress remoteAddress) { +- return null; +- } +- +- @Override +- public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { +- return null; +- } +- +- @Override +- public ChannelFuture disconnect() { +- return null; +- } +- +- @Override +- public ChannelFuture close() { +- return null; +- } +- +- @Override +- public ChannelFuture deregister() { +- return null; +- } +- +- @Override +- public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture disconnect(ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture close(ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture deregister(ChannelPromise promise) { +- return null; +- } +- +- @Override +- public Channel read() { +- return null; +- } +- +- @Override +- public ChannelFuture write(Object msg) { +- return null; +- } +- +- @Override +- public ChannelFuture write(Object msg, ChannelPromise promise) { +- return null; +- } +- +- @Override +- public Channel flush() { +- return null; +- } +- +- @Override +- public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { +- return null; +- } +- +- @Override +- public ChannelFuture writeAndFlush(Object msg) { +- return null; +- } +- +- @Override +- public ChannelPromise newPromise() { +- return null; +- } +- +- @Override +- public ChannelProgressivePromise newProgressivePromise() { +- return null; +- } +- +- @Override +- public ChannelFuture newSucceededFuture() { +- return null; +- } +- +- @Override +- public ChannelFuture newFailedFuture(Throwable cause) { +- return null; +- } +- +- @Override +- public ChannelPromise voidPromise() { +- return null; +- } +- +- @Override +- public <T> Attribute<T> attr(AttributeKey<T> key) { +- return null; +- } +- +- @Override +- public <T> boolean hasAttr(AttributeKey<T> key) { +- return false; +- } +- +- @Override +- public int compareTo(Channel o) { +- return 1; +- } ++ assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); + } +-} ++} +\ No newline at end of file +-- +2.32.0.windows.2 + |