summaryrefslogtreecommitdiff
path: root/patch006-backport-auto-batch-producer.patch
diff options
context:
space:
mode:
Diffstat (limited to 'patch006-backport-auto-batch-producer.patch')
-rw-r--r--patch006-backport-auto-batch-producer.patch2773
1 files changed, 2773 insertions, 0 deletions
diff --git a/patch006-backport-auto-batch-producer.patch b/patch006-backport-auto-batch-producer.patch
new file mode 100644
index 0000000..264fe95
--- /dev/null
+++ b/patch006-backport-auto-batch-producer.patch
@@ -0,0 +1,2773 @@
+From 737c1e53383350a5671fa207ee0e4ce932850bac Mon Sep 17 00:00:00 2001
+From: rongtong <jinrongtong5@163.com>
+Date: Tue, 18 Jul 2023 14:12:39 +0800
+Subject: [PATCH 1/7] [ISSUE #7029] Add a config to determine whether pop
+ response should return the actual retry topic or tamper with the original
+ topic (#7030)
+
+---
+ .../broker/processor/PopMessageProcessor.java | 4 ++--
+ .../org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++
+ 2 files changed, 15 insertions(+), 2 deletions(-)
+
+diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+index 464f8f4fd..53e172561 100644
+--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+@@ -591,8 +591,8 @@ public class PopMessageProcessor implements NettyRequestProcessor {
+ atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
+ String brokerName = brokerController.getBrokerConfig().getBrokerName();
+ for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
+- // We should not recode buffer for normal topic message
+- if (!isRetry) {
++ // We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic
++ if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) {
+ getMessageResult.addMessage(mapedBuffer);
+ } else {
+ List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
+diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+index f5f0db101..a4d82d1c5 100644
+--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+@@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity {
+ */
+ private long fetchNamesrvAddrInterval = 10 * 1000;
+
++ /**
++ * Pop response returns the actual retry topic rather than tampering with the original topic
++ */
++ private boolean popResponseReturnActualRetryTopic = false;
++
+ public long getMaxPopPollingSize() {
+ return maxPopPollingSize;
+ }
+@@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity {
+ public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
+ this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
+ }
++
++ public boolean isPopResponseReturnActualRetryTopic() {
++ return popResponseReturnActualRetryTopic;
++ }
++
++ public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) {
++ this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic;
++ }
+ }
+--
+2.32.0.windows.2
+
+
+From 7996ec3b3f7ccea01f66951ac639b48303bbf7a6 Mon Sep 17 00:00:00 2001
+From: leeyiyu <43566239+leeyiyu@users.noreply.github.com>
+Date: Tue, 18 Jul 2023 20:58:56 +0800
+Subject: [PATCH 2/7] [ISSUE #6879] ConcurrentHashMapUtils fails to solve the
+ loop bug in JDK8 (#6883)
+
+---
+ .../common/utils/ConcurrentHashMapUtils.java | 16 +++++++++++++++-
+ .../common/utils/ConcurrentHashMapUtilsTest.java | 2 ++
+ 2 files changed, 17 insertions(+), 1 deletion(-)
+
+diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
+index 1f1b4dd89..6fd9c21c9 100644
+--- a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
++++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
+@@ -16,6 +16,7 @@
+ */
+ package org.apache.rocketmq.common.utils;
+
++import java.util.Objects;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.function.Function;
+
+@@ -40,10 +41,23 @@ public abstract class ConcurrentHashMapUtils {
+ * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a>
+ */
+ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) {
++ Objects.requireNonNull(func);
+ if (isJdk8) {
+ V v = map.get(key);
+ if (null == v) {
+- v = map.computeIfAbsent(key, func);
++// v = map.computeIfAbsent(key, func);
++
++ // this bug fix methods maybe cause `func.apply` multiple calls.
++ v = func.apply(key);
++ if (null == v) {
++ return null;
++ }
++ final V res = map.putIfAbsent(key, v);
++ if (null != res) {
++ // if pre value present, means other thread put value already, and putIfAbsent not effect
++ // return exist value
++ return res;
++ }
+ }
+ return v;
+ } else {
+diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
+index 8e32fc93a..fa97ddb1c 100644
+--- a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
++++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
+@@ -35,5 +35,7 @@ public class ConcurrentHashMapUtilsTest {
+ assertEquals("2342", value1);
+ String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342");
+ assertEquals("1111", value2);
++// map.computeIfAbsent("AaAa", key->map.computeIfAbsent("BBBB",key2->"42"));
++ ConcurrentHashMapUtils.computeIfAbsent(map, "AaAa", key -> map.computeIfAbsent("BBBB", key2 -> "42"));
+ }
+ }
+\ No newline at end of file
+--
+2.32.0.windows.2
+
+
+From e0f5295fed8791d93bfa5b8420074c00b651ddfe Mon Sep 17 00:00:00 2001
+From: lk <xdkxlk@outlook.com>
+Date: Wed, 19 Jul 2023 15:55:11 +0800
+Subject: [PATCH 3/7] passing the renew event type to create the correct
+ context (#7045)
+
+---
+ .../apache/rocketmq/proxy/common/RenewEvent.java | 14 +++++++++++++-
+ .../proxy/processor/ReceiptHandleProcessor.java | 2 +-
+ .../receipt/DefaultReceiptHandleManager.java | 6 +++---
+ 3 files changed, 17 insertions(+), 5 deletions(-)
+
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+index fdf9833cc..0ff65c1cc 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+@@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult;
+ public class RenewEvent {
+ protected MessageReceiptHandle messageReceiptHandle;
+ protected long renewTime;
++ protected EventType eventType;
+ protected CompletableFuture<AckResult> future;
+
+- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture<AckResult> future) {
++ public enum EventType {
++ RENEW,
++ STOP_RENEW,
++ CLEAR_GROUP
++ }
++
++ public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture<AckResult> future) {
+ this.messageReceiptHandle = messageReceiptHandle;
+ this.renewTime = renewTime;
++ this.eventType = eventType;
+ this.future = future;
+ }
+
+@@ -39,6 +47,10 @@ public class RenewEvent {
+ return renewTime;
+ }
+
++ public EventType getEventType() {
++ return eventType;
++ }
++
+ public CompletableFuture<AckResult> getFuture() {
+ return future;
+ }
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+index fc49e7622..460842a86 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+@@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
+ public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
+ super(messagingProcessor, serviceManager);
+ StateEventListener<RenewEvent> eventListener = event -> {
+- ProxyContext context = createContext("RenewMessage");
++ ProxyContext context = createContext(event.getEventType().name());
+ MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
+ ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
+ messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
+diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+index c7633d658..9f35435f0 100644
+--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
+ }
+ if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
+@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
+ }
+ RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
+ future.whenComplete((ackResult, throwable) -> {
+ if (throwable != null) {
+ log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
+@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
+ try {
+ handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
+ return CompletableFuture.completedFuture(null);
+ });
+ } catch (Exception e) {
+--
+2.32.0.windows.2
+
+
+From 2c5808b9fdab8cae63318c89f34ad48a1ab6e962 Mon Sep 17 00:00:00 2001
+From: lizhimins <707364882@qq.com>
+Date: Wed, 19 Jul 2023 20:14:02 +0800
+Subject: [PATCH 4/7] [#ISSUE 7035] Fix correct min offset behavior in tiered
+ storage (#7038)
+
+---
+ .../tieredstore/TieredDispatcher.java | 2 +-
+ .../tieredstore/TieredMessageFetcher.java | 5 +
+ .../tieredstore/file/CompositeFlatFile.java | 12 +-
+ .../tieredstore/file/TieredCommitLog.java | 46 +++++++-
+ .../tieredstore/file/TieredFlatFile.java | 29 +++--
+ .../file/TieredFlatFileManager.java | 2 +-
+ .../metrics/TieredStoreMetricsConstant.java | 1 +
+ .../provider/posix/PosixFileSegment.java | 19 +--
+ .../tieredstore/file/TieredCommitLogTest.java | 108 ++++++++++++++++++
+ .../TieredStoreMetricsManagerTest.java | 4 +-
+ .../src/test/resources/rmq.logback-test.xml | 2 +-
+ 11 files changed, 201 insertions(+), 29 deletions(-)
+ create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+index 6584b0e89..523b0c2cd 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+@@ -352,7 +352,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
+ case SUCCESS:
+ long offset = MessageBufferUtil.getQueueOffset(message);
+ if (queueOffset != offset) {
+- logger.error("Message cq offset in commitlog does not meet expectations, " +
++ logger.warn("Message cq offset in commitlog does not meet expectations, " +
+ "result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
+ AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
+ }
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+index 8802a73a3..c4fed54bd 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+@@ -473,6 +473,11 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
+ return CompletableFuture.completedFuture(result);
+ }
+
++ // request range | result
++ // (0, min) | too small
++ // [min, max) | correct
++ // [max, max] | overflow one
++ // (max, +oo) | overflow badly
+ if (queueOffset < minQueueOffset) {
+ result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
+ result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+index 8f8ba98b1..fa01382e1 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+@@ -120,17 +120,19 @@ public class CompositeFlatFile implements CompositeAccess {
+ return commitLog.getBeginTimestamp();
+ }
+
+- public long getConsumeQueueBaseOffset() {
+- return consumeQueue.getBaseOffset();
+- }
+-
+ @Override
+ public long getCommitLogDispatchCommitOffset() {
+ return commitLog.getDispatchCommitOffset();
+ }
+
++ public long getConsumeQueueBaseOffset() {
++ return consumeQueue.getBaseOffset();
++ }
++
+ public long getConsumeQueueMinOffset() {
+- return consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
++ long cqOffset = consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
++ long effectiveOffset = this.commitLog.getMinConsumeQueueOffset();
++ return Math.max(cqOffset, effectiveOffset);
+ }
+
+ public long getConsumeQueueCommitOffset() {
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+index 92aea58be..80e1bce50 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+ import org.apache.rocketmq.tieredstore.common.AppendResult;
+@@ -31,6 +32,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+ public class TieredCommitLog {
+
+ private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
++ private static final Long NOT_EXIST_MIN_OFFSET = -1L;
+
+ /**
+ * item size: int, 4 bytes
+@@ -42,10 +44,13 @@ public class TieredCommitLog {
+
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredFlatFile flatFile;
++ private final AtomicLong minConsumeQueueOffset;
+
+ public TieredCommitLog(TieredFileAllocator fileQueueFactory, String filePath) {
+ this.storeConfig = fileQueueFactory.getStoreConfig();
+ this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
++ this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
++ this.correctMinOffset();
+ }
+
+ @VisibleForTesting
+@@ -61,6 +66,10 @@ public class TieredCommitLog {
+ return flatFile.getCommitOffset();
+ }
+
++ public long getMinConsumeQueueOffset() {
++ return minConsumeQueueOffset.get() != NOT_EXIST_MIN_OFFSET ? minConsumeQueueOffset.get() : correctMinOffset();
++ }
++
+ public long getDispatchCommitOffset() {
+ return flatFile.getDispatchCommitOffset();
+ }
+@@ -82,6 +91,39 @@ public class TieredCommitLog {
+ return flatFile.getFileToWrite().getMaxTimestamp();
+ }
+
++ public synchronized long correctMinOffset() {
++ if (flatFile.getFileSegmentCount() == 0) {
++ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
++ return NOT_EXIST_MIN_OFFSET;
++ }
++
++ // queue offset field length is 8
++ int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
++ if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
++ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
++ return NOT_EXIST_MIN_OFFSET;
++ }
++
++ try {
++ return this.flatFile.readAsync(this.flatFile.getMinOffset(), length)
++ .thenApply(buffer -> {
++ long offset = MessageBufferUtil.getQueueOffset(buffer);
++ minConsumeQueueOffset.set(offset);
++ log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
++ flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
++ return offset;
++ })
++ .exceptionally(throwable -> {
++ log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
++ flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
++ return minConsumeQueueOffset.get();
++ }).get();
++ } catch (Exception e) {
++ log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
++ }
++ return minConsumeQueueOffset.get();
++ }
++
+ public AppendResult append(ByteBuffer byteBuf) {
+ return flatFile.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf));
+ }
+@@ -99,7 +141,9 @@ public class TieredCommitLog {
+ }
+
+ public void cleanExpiredFile(long expireTimestamp) {
+- flatFile.cleanExpiredFile(expireTimestamp);
++ if (flatFile.cleanExpiredFile(expireTimestamp) > 0) {
++ correctMinOffset();
++ }
+ }
+
+ public void destroyExpiredFile() {
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+index a71323348..90ca843bf 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+@@ -133,6 +133,14 @@ public class TieredFlatFile {
+ }
+ }
+
++ public String getFilePath() {
++ return filePath;
++ }
++
++ public FileSegmentType getFileType() {
++ return fileType;
++ }
++
+ @VisibleForTesting
+ public List<TieredFileSegment> getFileSegmentList() {
+ return fileSegmentList;
+@@ -333,10 +341,9 @@ public class TieredFlatFile {
+ TieredFileSegment fileSegment = this.newSegment(fileType, offset, true);
+ fileSegmentList.add(fileSegment);
+ needCommitFileSegmentList.add(fileSegment);
+-
+ Collections.sort(fileSegmentList);
+-
+- logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType);
++ logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}",
++ offset, fileSegment.getPath(), fileType);
+ return fileSegment;
+ } finally {
+ fileSegmentLock.writeLock().unlock();
+@@ -429,7 +436,7 @@ public class TieredFlatFile {
+ return result;
+ }
+
+- public void cleanExpiredFile(long expireTimestamp) {
++ public int cleanExpiredFile(long expireTimestamp) {
+ Set<Long> needToDeleteSet = new HashSet<>();
+ try {
+ tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> {
+@@ -438,32 +445,32 @@ public class TieredFlatFile {
+ }
+ });
+ } catch (Exception e) {
+- logger.error("clean expired failed: filePath: {}, file type: {}, expire timestamp: {}",
++ logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}",
+ filePath, fileType, expireTimestamp);
+ }
+
+ if (needToDeleteSet.isEmpty()) {
+- return;
++ return 0;
+ }
+
+ fileSegmentLock.writeLock().lock();
+ try {
+ for (int i = 0; i < fileSegmentList.size(); i++) {
++ TieredFileSegment fileSegment = fileSegmentList.get(i);
+ try {
+- TieredFileSegment fileSegment = fileSegmentList.get(i);
+ if (needToDeleteSet.contains(fileSegment.getBaseOffset())) {
+ fileSegment.close();
+ fileSegmentList.remove(fileSegment);
+ needCommitFileSegmentList.remove(fileSegment);
+ i--;
+ this.updateFileSegment(fileSegment);
+- logger.info("expired file {} is been cleaned", fileSegment.getPath());
++ logger.debug("Clean expired file, filePath: {}", fileSegment.getPath());
+ } else {
+ break;
+ }
+ } catch (Exception e) {
+- logger.error("clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
+- filePath, fileType, expireTimestamp, e);
++ logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
++ fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e);
+ }
+ }
+ if (fileSegmentList.size() > 0) {
+@@ -476,6 +483,7 @@ public class TieredFlatFile {
+ } finally {
+ fileSegmentLock.writeLock().unlock();
+ }
++ return needToDeleteSet.size();
+ }
+
+ @VisibleForTesting
+@@ -493,7 +501,6 @@ public class TieredFlatFile {
+ fileSegment.destroyFile();
+ if (!fileSegment.exists()) {
+ tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset());
+- logger.info("Destroyed expired file, file path: {}", fileSegment.getPath());
+ }
+ } catch (Exception e) {
+ logger.error("Destroyed expired file failed, file path: {}, file type: {}",
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+index 5fe511f68..aeca44b8c 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+@@ -223,7 +223,7 @@ public class TieredFlatFileManager {
+ public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
+ return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
+ try {
+- logger.info("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
++ logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
+ "try to create new flat file: topic: {}, queueId: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId());
+ return new CompositeQueueFlatFile(tieredFileAllocator, mq);
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+index ad7281510..cb4674ea9 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+@@ -38,6 +38,7 @@ public class TieredStoreMetricsConstant {
+ public static final String LABEL_OPERATION = "operation";
+ public static final String LABEL_SUCCESS = "success";
+
++ public static final String LABEL_PATH = "path";
+ public static final String LABEL_TOPIC = "topic";
+ public static final String LABEL_GROUP = "group";
+ public static final String LABEL_QUEUE_ID = "queue_id";
+diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+index 8c0d1cbcd..52be90b1d 100644
+--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+@@ -41,8 +41,8 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+ import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
+ import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION;
++import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_PATH;
+ import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS;
+-import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
+
+ /**
+ * this class is experimental and may change without notice.
+@@ -55,6 +55,7 @@ public class PosixFileSegment extends TieredFileSegment {
+ private static final String OPERATION_POSIX_READ = "read";
+ private static final String OPERATION_POSIX_WRITE = "write";
+
++ private final String fullPath;
+ private volatile File file;
+ private volatile FileChannel readFileChannel;
+ private volatile FileChannel writeFileChannel;
+@@ -71,7 +72,7 @@ public class PosixFileSegment extends TieredFileSegment {
+ // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
+ String brokerClusterName = storeConfig.getBrokerClusterName();
+ String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName;
+- String fullPath = Paths.get(basePath, clusterBasePath, filePath,
++ this.fullPath = Paths.get(basePath, clusterBasePath, filePath,
+ fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString();
+ logger.info("Constructing Posix FileSegment, filePath: {}", fullPath);
+
+@@ -80,13 +81,13 @@ public class PosixFileSegment extends TieredFileSegment {
+
+ protected AttributesBuilder newAttributesBuilder() {
+ return TieredStoreMetricsManager.newAttributesBuilder()
+- .put(LABEL_TOPIC, filePath)
++ .put(LABEL_PATH, filePath)
+ .put(LABEL_FILE_TYPE, fileType.name().toLowerCase());
+ }
+
+ @Override
+ public String getPath() {
+- return filePath;
++ return fullPath;
+ }
+
+ @Override
+@@ -107,7 +108,7 @@ public class PosixFileSegment extends TieredFileSegment {
+ if (file == null) {
+ synchronized (this) {
+ if (file == null) {
+- File file = new File(filePath);
++ File file = new File(fullPath);
+ try {
+ File dir = file.getParentFile();
+ if (!dir.exists()) {
+@@ -136,8 +137,9 @@ public class PosixFileSegment extends TieredFileSegment {
+ if (writeFileChannel != null && writeFileChannel.isOpen()) {
+ writeFileChannel.close();
+ }
++ logger.info("Destroy Posix FileSegment, filePath: {}", fullPath);
+ } catch (IOException e) {
+- logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filePath, e);
++ logger.error("Destroy Posix FileSegment failed, filePath: {}", fullPath, e);
+ }
+
+ if (file.exists()) {
+@@ -181,8 +183,9 @@ public class PosixFileSegment extends TieredFileSegment {
+ }
+
+ @Override
+- public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+- boolean append) {
++ public CompletableFuture<Boolean> commit0(
++ TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
++
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ AttributesBuilder attributesBuilder = newAttributesBuilder()
+ .put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+new file mode 100644
+index 000000000..6693d3cb7
+--- /dev/null
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
+@@ -0,0 +1,108 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.rocketmq.tieredstore.file;
++
++import java.io.File;
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.List;
++import org.apache.rocketmq.common.message.MessageQueue;
++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
++import org.apache.rocketmq.tieredstore.common.AppendResult;
++import org.apache.rocketmq.tieredstore.common.FileSegmentType;
++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
++import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
++import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
++import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
++import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++public class TieredCommitLogTest {
++
++ private final String storePath = TieredStoreTestUtil.getRandomStorePath();
++ private MessageQueue mq;
++ private TieredFileAllocator fileAllocator;
++ private TieredMetadataStore metadataStore;
++
++ @Before
++ public void setUp() throws ClassNotFoundException, NoSuchMethodException {
++ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
++ storeConfig.setBrokerName("brokerName");
++ storeConfig.setStorePathRootDir(storePath);
++ storeConfig.setTieredStoreFilePath(storePath + File.separator);
++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
++ storeConfig.setCommitLogRollingInterval(0);
++ storeConfig.setTieredStoreCommitLogMaxSize(1000);
++
++ metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
++ fileAllocator = new TieredFileAllocator(storeConfig);
++ mq = new MessageQueue("CommitLogTest", storeConfig.getBrokerName(), 0);
++ TieredStoreExecutor.init();
++ }
++
++ @After
++ public void tearDown() throws IOException {
++ TieredStoreTestUtil.destroyCompositeFlatFileManager();
++ TieredStoreTestUtil.destroyMetadataStore();
++ TieredStoreTestUtil.destroyTempDir(storePath);
++ TieredStoreExecutor.shutdown();
++ }
++
++ @Test
++ public void correctMinOffsetTest() {
++ String filePath = TieredStoreUtil.toPath(mq);
++ TieredCommitLog tieredCommitLog = new TieredCommitLog(fileAllocator, filePath);
++ Assert.assertEquals(0L, tieredCommitLog.getMinOffset());
++ Assert.assertEquals(0L, tieredCommitLog.getCommitOffset());
++ Assert.assertEquals(0L, tieredCommitLog.getDispatchCommitOffset());
++
++ // append some messages
++ for (int i = 6; i < 50; i++) {
++ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
++ byteBuffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, i);
++ Assert.assertEquals(AppendResult.SUCCESS, tieredCommitLog.append(byteBuffer));
++ }
++
++ tieredCommitLog.commit(true);
++ tieredCommitLog.correctMinOffset();
++
++ // single file store: 1000 / 122 = 8, file count: 44 / 8 = 5
++ Assert.assertEquals(6, tieredCommitLog.getFlatFile().getFileSegmentCount());
++
++ metadataStore.iterateFileSegment(filePath, FileSegmentType.COMMIT_LOG, metadata -> {
++ if (metadata.getBaseOffset() < 1000) {
++ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
++ metadataStore.updateFileSegment(metadata);
++ }
++ });
++
++ // manually delete file
++ List<TieredFileSegment> segmentList = tieredCommitLog.getFlatFile().getFileSegmentList();
++ segmentList.remove(0).destroyFile();
++ segmentList.remove(0).destroyFile();
++
++ tieredCommitLog.correctMinOffset();
++ Assert.assertEquals(4, tieredCommitLog.getFlatFile().getFileSegmentCount());
++ Assert.assertEquals(6 + 8 + 8, tieredCommitLog.getMinConsumeQueueOffset());
++ }
++}
+\ No newline at end of file
+diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+index a1dde0451..26b38b970 100644
+--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+@@ -21,6 +21,7 @@ import java.io.IOException;
+ import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
+ import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+ import org.junit.After;
+ import org.junit.Test;
+
+@@ -30,9 +31,9 @@ public class TieredStoreMetricsManagerTest {
+ public void tearDown() throws IOException {
+ TieredStoreTestUtil.destroyCompositeFlatFileManager();
+ TieredStoreTestUtil.destroyMetadataStore();
++ TieredStoreExecutor.shutdown();
+ }
+
+-
+ @Test
+ public void getMetricsView() {
+ TieredStoreMetricsManager.getMetricsView();
+@@ -40,6 +41,7 @@ public class TieredStoreMetricsManagerTest {
+
+ @Test
+ public void init() {
++ TieredStoreExecutor.init();
+ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
+ TieredStoreMetricsManager.init(OpenTelemetrySdk.builder().build().getMeter(""),
+diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml
+index b70b42046..a7933b5ef 100644
+--- a/tieredstore/src/test/resources/rmq.logback-test.xml
++++ b/tieredstore/src/test/resources/rmq.logback-test.xml
+@@ -23,7 +23,7 @@
+ </encoder>
+ </appender>
+
+- <root level="debug">
++ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+ </configuration>
+--
+2.32.0.windows.2
+
+
+From ebad3c8a6b41915edb3db65fca593123b296042d Mon Sep 17 00:00:00 2001
+From: gaoyf <gaoyf@users.noreply.github.com>
+Date: Thu, 20 Jul 2023 10:59:40 +0800
+Subject: [PATCH 5/7] [ISSUE #7047] NettyRemotingClient#invokeOneway throw
+ Exception with address
+
+---
+ .../rocketmq/remoting/netty/NettyRemotingClient.java | 2 +-
+ .../remoting/netty/NettyRemotingClientTest.java | 11 +++++++++++
+ 2 files changed, 12 insertions(+), 1 deletion(-)
+
+diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+index afd779c83..9715b918a 100644
+--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+@@ -756,7 +756,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
+ }
+ } else {
+ this.closeChannel(addr, channel);
+- throw new RemotingConnectException(channelRemoteAddr);
++ throw new RemotingConnectException(addr);
+ }
+ }
+
+diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+index efa3eb3d5..8fabbb21d 100644
+--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+@@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import org.apache.rocketmq.remoting.InvokeCallback;
++import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+ import org.apache.rocketmq.remoting.exception.RemotingException;
+ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+@@ -123,4 +124,14 @@ public class NettyRemotingClientTest {
+ Throwable thrown = catchThrowable(future::get);
+ assertThat(thrown.getCause()).isInstanceOf(RemotingException.class);
+ }
++
++ @Test
++ public void testInvokeOnewayException() throws Exception {
++ String addr = "0.0.0.0";
++ try {
++ remotingClient.invokeOneway(addr, null, 1000);
++ } catch (RemotingConnectException e) {
++ assertThat(e.getMessage()).contains(addr);
++ }
++ }
+ }
+--
+2.32.0.windows.2
+
+
+From 804f2d85f22d9ee52573b9c6ee6abae248c9b387 Mon Sep 17 00:00:00 2001
+From: wenbin yao <ywb992134@163.com>
+Date: Thu, 20 Jul 2023 11:01:38 +0800
+Subject: [PATCH 6/7] [ISSUE ##7036] rename method: getWriteQueueIdByBroker to
+ getWriteQueueNumsByBroker(#7037)
+
+* [ISSUE ##7036] rename method: getWriteQueueIdByBroker to getWriteQueueNumsByBroker
+
+* [ISSUE #7036] rename method from getWriteQueueIdByBroker to getWriteQueueNumsByBroker
+---
+ .../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 2 +-
+ .../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 2 +-
+ 2 files changed, 2 insertions(+), 2 deletions(-)
+
+diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+index a5f840500..275ada7ac 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+@@ -89,7 +89,7 @@ public class TopicPublishInfo {
+ return this.messageQueueList.get(pos);
+ }
+
+- public int getWriteQueueIdByBroker(final String brokerName) {
++ public int getWriteQueueNumsByBroker(final String brokerName) {
+ for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
+ final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
+ if (queueData.getBrokerName().equals(brokerName)) {
+diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+index e86238e55..1e1953fad 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
++++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+@@ -69,7 +69,7 @@ public class MQFaultStrategy {
+ }
+
+ final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
+- int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);
++ int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker);
+ if (writeQueueNums > 0) {
+ final MessageQueue mq = tpInfo.selectOneMessageQueue();
+ if (notBestBroker != null) {
+--
+2.32.0.windows.2
+
+
+From af993d28e20922d91862f0911e59f748dcb64e6a Mon Sep 17 00:00:00 2001
+From: guyinyou <36399867+guyinyou@users.noreply.github.com>
+Date: Fri, 21 Jul 2023 09:31:56 +0800
+Subject: [PATCH 7/7] [ISSUE #3717][RIP-27] Auto batching in producer
+
+Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
+---
+ .../rocketmq/client/impl/MQClientManager.java | 21 +-
+ .../impl/producer/DefaultMQProducerImpl.java | 36 ++
+ .../client/producer/DefaultMQProducer.java | 501 +++++++++++------
+ .../rocketmq/client/producer/MQProducer.java | 24 +-
+ .../client/producer/ProduceAccumulator.java | 510 ++++++++++++++++++
+ .../producer/DefaultMQProducerTest.java | 38 +-
+ .../producer/ProduceAccumulatorTest.java | 176 ++++++
+ .../rocketmq/common/message/MessageBatch.java | 2 +-
+ 8 files changed, 1133 insertions(+), 175 deletions(-)
+ create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
+ create mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
+
+diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+index 49186633f..02eaa66e9 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import org.apache.rocketmq.client.ClientConfig;
+ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
++import org.apache.rocketmq.client.producer.ProduceAccumulator;
+ import org.apache.rocketmq.remoting.RPCHook;
+ import org.apache.rocketmq.logging.org.slf4j.Logger;
+ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+@@ -31,6 +32,9 @@ public class MQClientManager {
+ private AtomicInteger factoryIndexGenerator = new AtomicInteger();
+ private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
+ new ConcurrentHashMap<>();
++ private ConcurrentMap<String/* clientId */, ProduceAccumulator> accumulatorTable =
++ new ConcurrentHashMap<String, ProduceAccumulator>();
++
+
+ private MQClientManager() {
+
+@@ -43,7 +47,6 @@ public class MQClientManager {
+ public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
+ return getOrCreateMQClientInstance(clientConfig, null);
+ }
+-
+ public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
+ String clientId = clientConfig.buildMQClientId();
+ MQClientInstance instance = this.factoryTable.get(clientId);
+@@ -62,6 +65,22 @@ public class MQClientManager {
+
+ return instance;
+ }
++ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) {
++ String clientId = clientConfig.buildMQClientId();
++ ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
++ if (null == accumulator) {
++ accumulator = new ProduceAccumulator(clientId);
++ ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator);
++ if (prev != null) {
++ accumulator = prev;
++ log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId);
++ } else {
++ log.info("Created new ProduceAccumulator for clientId:[{}]", clientId);
++ }
++ }
++
++ return accumulator;
++ }
+
+ public void removeClientFactory(final String clientId) {
+ this.factoryTable.remove(clientId);
+diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+index 4eb0e6924..3f4c6e5f7 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+@@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements MQProducerInner {
+
+ }
+
++ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
++ final long timeout) throws MQClientException, RemotingTooMuchRequestException {
++ long beginStartTime = System.currentTimeMillis();
++ this.makeSureStateOK();
++ Validators.checkMessage(msg, this.defaultMQProducer);
++
++ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
++ if (topicPublishInfo != null && topicPublishInfo.ok()) {
++ MessageQueue mq = null;
++ try {
++ List<MessageQueue> messageQueueList =
++ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
++ Message userMessage = MessageAccessor.cloneMessage(msg);
++ String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
++ userMessage.setTopic(userTopic);
++
++ mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
++ } catch (Throwable e) {
++ throw new MQClientException("select message queue threw exception.", e);
++ }
++
++ long costTime = System.currentTimeMillis() - beginStartTime;
++ if (timeout < costTime) {
++ throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
++ }
++ if (mq != null) {
++ return mq;
++ } else {
++ throw new MQClientException("select message queue return null.", null);
++ }
++ }
++
++ validateNameServerSetting();
++ throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
++ }
++
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+ return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
+ }
+diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+index 6e9ffed8c..c5b1b5223 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
++++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
+ import org.apache.rocketmq.client.exception.MQBrokerException;
+ import org.apache.rocketmq.client.exception.MQClientException;
+ import org.apache.rocketmq.client.exception.RequestTimeoutException;
++import org.apache.rocketmq.client.impl.MQClientManager;
+ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+ import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+ import org.apache.rocketmq.client.trace.TraceDispatcher;
+@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.message.Message;
+ import org.apache.rocketmq.common.message.MessageBatch;
+ import org.apache.rocketmq.common.message.MessageClientIDSetter;
++import org.apache.rocketmq.common.message.MessageConst;
+ import org.apache.rocketmq.common.message.MessageExt;
+ import org.apache.rocketmq.common.message.MessageQueue;
+ import org.apache.rocketmq.common.topic.TopicValidator;
+@@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+ /**
+ * This class is the entry point for applications intending to send messages. </p>
+- *
++ * <p>
+ * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
+ * box for most scenarios. </p>
+- *
++ * <p>
+ * This class aggregates various <code>send</code> methods to deliver messages to broker(s). Each of them has pros and
+ * cons; you'd better understand strengths and weakness of them before actually coding. </p>
+ *
+@@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
+ * important when transactional messages are involved. </p>
+- *
++ * <p>
+ * For non-transactional messages, it does not matter as long as it's unique per process. </p>
+- *
++ * <p>
+ * See <a href="https://rocketmq.apache.org/docs/introduction/02concepts">core concepts</a> for more discussion.
+ */
+ private String producerGroup;
+@@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
+- *
++ * <p>
+ * This may potentially cause message duplication which is up to application developers to resolve.
+ */
+ private int retryTimesWhenSendFailed = 2;
+
+ /**
+ * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
+- *
++ * <p>
+ * This may potentially cause message duplication which is up to application developers to resolve.
+ */
+ private int retryTimesWhenSendAsyncFailed = 2;
+@@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ */
+ private TraceDispatcher traceDispatcher = null;
+
++ /**
++ * Switch flag instance for automatic batch message
++ */
++ private boolean autoBatch = false;
++ /**
++ * Instance for batching message automatically
++ */
++ private ProduceAccumulator produceAccumulator = null;
++
+ /**
+ * Indicate whether to block message when asynchronous sending traffic is too heavy.
+ */
+@@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Constructor specifying producer group.
+ *
+- * @param producerGroup Producer group, see the name-sake field.
+- * @param rpcHook RPC hook to execute per each remoting command execution.
+- * @param enableMsgTrace Switch flag instance for message trace.
++ * @param producerGroup Producer group, see the name-sake field.
++ * @param rpcHook RPC hook to execute per each remoting command execution.
++ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+- * trace topic name.
++ * trace topic name.
+ */
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
+@@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Constructor specifying producer group.
+ *
+- * @param namespace Namespace for this MQ Producer instance.
++ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup) {
+@@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * Constructor specifying both producer group and RPC hook.
+ *
+ * @param producerGroup Producer group, see the name-sake field.
+- * @param rpcHook RPC hook to execute per each remoting command execution.
++ * @param rpcHook RPC hook to execute per each remoting command execution.
+ */
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
+ this(null, producerGroup, rpcHook);
+@@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Constructor specifying namespace, producer group and RPC hook.
+ *
+- * @param namespace Namespace for this MQ Producer instance.
++ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+- * @param rpcHook RPC hook to execute per each remoting command execution.
++ * @param rpcHook RPC hook to execute per each remoting command execution.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ }
+
+ /**
+ * Constructor specifying producer group and enabled msg trace flag.
+ *
+- * @param producerGroup Producer group, see the name-sake field.
++ * @param producerGroup Producer group, see the name-sake field.
+ * @param enableMsgTrace Switch flag instance for message trace.
+ */
+ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
+@@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
+ *
+- * @param producerGroup Producer group, see the name-sake field.
+- * @param enableMsgTrace Switch flag instance for message trace.
++ * @param producerGroup Producer group, see the name-sake field.
++ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+- * trace topic name.
++ * trace topic name.
+ */
+ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
+ this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
+@@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
+ * name.
+ *
+- * @param namespace Namespace for this MQ Producer instance.
+- * @param producerGroup Producer group, see the name-sake field.
+- * @param rpcHook RPC hook to execute per each remoting command execution.
+- * @param enableMsgTrace Switch flag instance for message trace.
++ * @param namespace Namespace for this MQ Producer instance.
++ * @param producerGroup Producer group, see the name-sake field.
++ * @param rpcHook RPC hook to execute per each remoting command execution.
++ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+- * trace topic name.
++ * trace topic name.
+ */
+ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
+ boolean enableMsgTrace, final String customizedTraceTopic) {
+ this.namespace = namespace;
+ this.producerGroup = producerGroup;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ //if client open the message trace feature
+ if (enableMsgTrace) {
+ try {
+@@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ public void start() throws MQClientException {
+ this.setProducerGroup(withNamespace(this.producerGroup));
+ this.defaultMQProducerImpl.start();
++ if (this.produceAccumulator != null) {
++ this.produceAccumulator.start();
++ }
+ if (null != traceDispatcher) {
+ try {
+ traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
+@@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ @Override
+ public void shutdown() {
+ this.defaultMQProducerImpl.shutdown();
++ if (this.produceAccumulator != null) {
++ this.produceAccumulator.shutdown();
++ }
+ if (null != traceDispatcher) {
+ traceDispatcher.shutdown();
+ }
+@@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
+ }
+
++ private boolean canBatch(Message msg) {
++ // produceAccumulator is full
++ if (!produceAccumulator.tryAddMessage(msg)) {
++ return false;
++ }
++ // delay message do not support batch processing
++ if (msg.getDelayTimeLevel() > 0) {
++ return false;
++ }
++ // retry message do not support batch processing
++ if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
++ return false;
++ }
++ // message which have been assigned to producer group do not support batch processing
++ if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
++ return false;
++ }
++ return true;
++ }
++
+ /**
+ * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
+ *
+@@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * @param msg Message to send.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public SendResult send(
+ Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- return this.defaultMQProducerImpl.send(msg);
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ return sendByAccumulator(msg, null, null);
++ } else {
++ return sendDirect(msg, null, null);
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message)} with send timeout specified in addition.
+ *
+- * @param msg Message to send.
++ * @param msg Message to send.
+ * @param timeout send timeout.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Send message to broker asynchronously. </p>
+- *
++ * <p>
+ * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
+- *
++ * <p>
+ * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
+ * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+ * application developers are the one to resolve this potential issue.
+ *
+- * @param msg Message to send.
++ * @param msg Message to send.
+ * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public void send(Message msg,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- this.defaultMQProducerImpl.send(msg, sendCallback);
++ try {
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ sendByAccumulator(msg, null, sendCallback);
++ } else {
++ sendDirect(msg, null, sendCallback);
++ }
++ } catch (Throwable e) {
++ sendCallback.onException(e);
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
+ *
+- * @param msg message to send.
++ * @param msg message to send.
+ * @param sendCallback Callback to execute.
+- * @param timeout send timeout.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout send timeout.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
+ *
+ * @param msg Message to send.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * Same to {@link #send(Message)} with target message queue specified in addition.
+ *
+ * @param msg Message to send.
+- * @param mq Target message queue.
++ * @param mq Target message queue.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public SendResult send(Message msg, MessageQueue mq)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
++ mq = queueWithNamespace(mq);
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ return sendByAccumulator(msg, mq, null);
++ } else {
++ return sendDirect(msg, mq, null);
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message)} with target message queue and send timeout specified.
+ *
+- * @param msg Message to send.
+- * @param mq Target message queue.
++ * @param msg Message to send.
++ * @param mq Target message queue.
+ * @param timeout send timeout.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #send(Message, SendCallback)} with target message queue specified.
+ *
+- * @param msg Message to send.
+- * @param mq Target message queue.
++ * @param msg Message to send.
++ * @param mq Target message queue.
+ * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
++ mq = queueWithNamespace(mq);
++ try {
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ sendByAccumulator(msg, mq, sendCallback);
++ } else {
++ sendDirect(msg, mq, sendCallback);
++ }
++ } catch (MQBrokerException e) {
++ // ignore
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified.
+ *
+- * @param msg Message to send.
+- * @param mq Target message queue.
++ * @param msg Message to send.
++ * @param mq Target message queue.
+ * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
+- * @param timeout Send timeout.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout Send timeout.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * Same to {@link #sendOneway(Message)} with target message queue specified.
+ *
+ * @param msg Message to send.
+- * @param mq Target message queue.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param mq Target message queue.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #send(Message)} with message queue selector specified.
+ *
+- * @param msg Message to send.
++ * @param msg Message to send.
+ * @param selector Message queue selector, through which we get target message queue to deliver message to.
+- * @param arg Argument to work along with message queue selector.
++ * @param arg Argument to work along with message queue selector.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- return this.defaultMQProducerImpl.send(msg, selector, arg);
++ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
++ mq = queueWithNamespace(mq);
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ return sendByAccumulator(msg, mq, null);
++ } else {
++ return sendDirect(msg, mq, null);
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified.
+ *
+- * @param msg Message to send.
++ * @param msg Message to send.
+ * @param selector Message queue selector, through which we get target message queue to deliver message to.
+- * @param arg Argument to work along with message queue selector.
+- * @param timeout Send timeout.
++ * @param arg Argument to work along with message queue selector.
++ * @param timeout Send timeout.
+ * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
+ * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any error with broker.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any error with broker.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #send(Message, SendCallback)} with message queue selector specified.
+ *
+- * @param msg Message to send.
+- * @param selector Message selector through which to get target message queue.
+- * @param arg Argument used along with message queue selector.
++ * @param msg Message to send.
++ * @param selector Message selector through which to get target message queue.
++ * @param arg Argument used along with message queue selector.
+ * @param sendCallback callback to execute on sending completion.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
++ try {
++ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
++ mq = queueWithNamespace(mq);
++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
++ sendByAccumulator(msg, mq, sendCallback);
++ } else {
++ sendDirect(msg, mq, sendCallback);
++ }
++ } catch (Throwable e) {
++ sendCallback.onException(e);
++ }
+ }
+
+ /**
+ * Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified.
+ *
+- * @param msg Message to send.
+- * @param selector Message selector through which to get target message queue.
+- * @param arg Argument used along with message queue selector.
++ * @param msg Message to send.
++ * @param selector Message selector through which to get target message queue.
++ * @param arg Argument used along with message queue selector.
+ * @param sendCallback callback to execute on sending completion.
+- * @param timeout Send timeout.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout Send timeout.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
+ }
+
++ public SendResult sendDirect(Message msg, MessageQueue mq,
++ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
++ // send in sync mode
++ if (sendCallback == null) {
++ if (mq == null) {
++ return this.defaultMQProducerImpl.send(msg);
++ } else {
++ return this.defaultMQProducerImpl.send(msg, mq);
++ }
++ } else {
++ if (mq == null) {
++ this.defaultMQProducerImpl.send(msg, sendCallback);
++ } else {
++ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
++ }
++ return null;
++ }
++ }
++
++ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
++ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
++ // check whether it can batch
++ if (!canBatch(msg)) {
++ return sendDirect(msg, mq, sendCallback);
++ } else {
++ Validators.checkMessage(msg, this);
++ MessageClientIDSetter.setUniqID(msg);
++ if (sendCallback == null) {
++ return this.produceAccumulator.send(msg, mq, this);
++ } else {
++ this.produceAccumulator.send(msg, mq, sendCallback, this);
++ return null;
++ }
++ }
++ }
++
+ /**
+ * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
+ *
+@@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially
+ * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
+ *
+- * @param msg request message to send
++ * @param msg request message to send
+ * @param timeout request timeout
+ * @return reply message
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any broker error.
+- * @throws InterruptedException if the thread is interrupted.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any broker error.
++ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+@@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Request asynchronously. </p>
+ * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
+- *
++ * <p>
+ * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
+ * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+ * application developers are the one to resolve this potential issue.
+ *
+- * @param msg request message to send
++ * @param msg request message to send
+ * @param requestCallback callback to execute on request completion.
+- * @param timeout request timeout
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout request timeout
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+- * @throws MQBrokerException if there is any broker error.
++ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+@@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #request(Message, long)} with message queue selector specified.
+ *
+- * @param msg request message to send
++ * @param msg request message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+- * @param arg argument to work along with message queue selector.
+- * @param timeout timeout of request.
++ * @param arg argument to work along with message queue selector.
++ * @param timeout timeout of request.
+ * @return reply message
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any broker error.
+- * @throws InterruptedException if the thread is interrupted.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any broker error.
++ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+@@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
+ *
+- * @param msg requst message to send
+- * @param selector message queue selector, through which we get target message queue to deliver message to.
+- * @param arg argument to work along with message queue selector.
++ * @param msg requst message to send
++ * @param selector message queue selector, through which we get target message queue to deliver message to.
++ * @param arg argument to work along with message queue selector.
+ * @param requestCallback callback to execute on request completion.
+- * @param timeout timeout of request.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout timeout of request.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+- * @throws MQBrokerException if there is any broker error.
++ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+@@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #request(Message, long)} with target message queue specified in addition.
+ *
+- * @param msg request message to send
+- * @param mq target message queue.
++ * @param msg request message to send
++ * @param mq target message queue.
+ * @param timeout request timeout
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
+- * @throws MQBrokerException if there is any broker error.
+- * @throws InterruptedException if the thread is interrupted.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any broker error.
++ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+@@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
+ *
+- * @param msg request message to send
+- * @param mq target message queue.
++ * @param msg request message to send
++ * @param mq target message queue.
+ * @param requestCallback callback to execute on request completion.
+- * @param timeout timeout of request.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param timeout timeout of request.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+- * @throws MQBrokerException if there is any broker error.
++ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+@@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * Same to {@link #sendOneway(Message)} with message queue selector specified.
+ *
+- * @param msg Message to send.
++ * @param msg Message to send.
+ * @param selector Message queue selector, through which to determine target message queue to deliver message
+- * @param arg Argument used along with message queue selector.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @param arg Argument used along with message queue selector.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Override
+@@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * This method is to send transactional messages.
+ *
+- * @param msg Transactional message to send.
++ * @param msg Transactional message to send.
+ * @param tranExecuter local transaction executor.
+- * @param arg Argument used along with local transaction executor.
++ * @param arg Argument used along with local transaction executor.
+ * @return Transaction result.
+ * @throws MQClientException if there is any client error.
+ */
+@@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+- * @param key accessKey
+- * @param newTopic topic name
+- * @param queueNum topic's queue number
++ * @param key accessKey
++ * @param newTopic topic name
++ * @param queueNum topic's queue number
+ * @param attributes
+ * @throws MQClientException if there is any client error.
+ */
+ @Deprecated
+ @Override
+- public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
++ public void createTopic(String key, String newTopic, int queueNum,
++ Map<String, String> attributes) throws MQClientException {
+ createTopic(key, withNamespace(newTopic), queueNum, 0, null);
+ }
+
+@@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
+ * use this method.
+ *
+- * @param key accessKey
+- * @param newTopic topic name
+- * @param queueNum topic's queue number
++ * @param key accessKey
++ * @param newTopic topic name
++ * @param queueNum topic's queue number
+ * @param topicSysFlag topic system flag
+ * @param attributes
+ * @throws MQClientException if there is any client error.
+ */
+ @Deprecated
+ @Override
+- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
++ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
++ Map<String, String> attributes) throws MQClientException {
+ this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
+ }
+
+ /**
+ * Search consume queue offset of the given time stamp.
+ *
+- * @param mq Instance of MessageQueue
++ * @param mq Instance of MessageQueue
+ * @param timestamp from when in milliseconds.
+ * @return Consume queue offset.
+ * @throws MQClientException if there is any client error.
+@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query maximum offset of the given message queue.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+ * @param mq Instance of MessageQueue
+@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query minimum offset of the given message queue.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+ * @param mq Instance of MessageQueue
+@@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query the earliest message store time.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+ * @param mq Instance of MessageQueue
+@@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query message of the given offset message ID.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+ * @param offsetMsgId message id
+ * @return Message specified.
+- * @throws MQBrokerException if there is any broker error.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any broker error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Deprecated
+@@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query message by key.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+- * @param topic message topic
+- * @param key message key index word
++ * @param topic message topic
++ * @param key message key index word
+ * @param maxNum max message number
+- * @param begin from when
+- * @param end to when
++ * @param begin from when
++ * @param end to when
+ * @return QueryResult instance contains matched messages.
+- * @throws MQClientException if there is any client error.
++ * @throws MQClientException if there is any client error.
+ * @throws InterruptedException if the thread is interrupted.
+ */
+ @Deprecated
+@@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ /**
+ * Query message of the given message ID.
+- *
++ * <p>
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
+ * @param topic Topic
+ * @param msgId Message ID
+ * @return Message specified.
+- * @throws MQBrokerException if there is any broker error.
+- * @throws MQClientException if there is any client error.
+- * @throws RemotingException if there is any network-tier error.
++ * @throws MQBrokerException if there is any broker error.
++ * @throws MQClientException if there is any client error.
++ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the sending thread is interrupted.
+ */
+ @Deprecated
+@@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ }
+
+ @Override
+- public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
++ public void send(Collection<Message> msgs,
++ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
+ }
+
+@@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+
+ @Override
+ public void send(Collection<Message> msgs, MessageQueue mq,
+- SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
++ SendCallback sendCallback,
++ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
+ }
+
+@@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ return msgBatch;
+ }
+
++ public int getBatchMaxDelayMs() {
++ if (this.produceAccumulator == null) {
++ return 0;
++ }
++ return produceAccumulator.getBatchMaxDelayMs();
++ }
++
++ public void batchMaxDelayMs(int holdMs) {
++ if (this.produceAccumulator == null) {
++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
++ }
++ this.produceAccumulator.batchMaxDelayMs(holdMs);
++ }
++
++ public long getBatchMaxBytes() {
++ if (this.produceAccumulator == null) {
++ return 0;
++ }
++ return produceAccumulator.getBatchMaxBytes();
++ }
++
++ public void batchMaxBytes(long holdSize) {
++ if (this.produceAccumulator == null) {
++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
++ }
++ this.produceAccumulator.batchMaxBytes(holdSize);
++ }
++
++ public long getTotalBatchMaxBytes() {
++ if (this.produceAccumulator == null) {
++ return 0;
++ }
++ return produceAccumulator.getTotalBatchMaxBytes();
++ }
++
++ public void totalBatchMaxBytes(long totalHoldSize) {
++ if (this.produceAccumulator == null) {
++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
++ }
++ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
++ }
++
++ public boolean getAutoBatch() {
++ if (this.produceAccumulator == null) {
++ return false;
++ }
++ return this.autoBatch;
++ }
++
++ public void setAutoBatch(boolean autoBatch) {
++ if (this.produceAccumulator == null) {
++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
++ }
++ this.autoBatch = autoBatch;
++ }
++
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+@@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ }
+
+ public boolean isEnableBackpressureForAsyncMode() {
+- return enableBackpressureForAsyncMode;
++ return enableBackpressureForAsyncMode;
+ }
+
+ public void setEnableBackpressureForAsyncMode(boolean enableBackpressureForAsyncMode) {
+diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+index f70ddb283..78657e623 100644
+--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
++++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
+ RemotingException, MQBrokerException, InterruptedException;
+
+ void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
+- RemotingException, InterruptedException;
++ RemotingException, InterruptedException, MQBrokerException;
+
+ void send(final Message msg, final SendCallback sendCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException;
+@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
+
+ SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+-
+- void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
++
++ void send(final Collection<Message> msgs,
++ final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+-
+- void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
++
++ void send(final Collection<Message> msgs, final SendCallback sendCallback,
++ final long timeout) throws MQClientException, RemotingException,
+ MQBrokerException, InterruptedException;
+-
+- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
++
++ void send(final Collection<Message> msgs, final MessageQueue mq,
++ final SendCallback sendCallback) throws MQClientException, RemotingException,
+ MQBrokerException, InterruptedException;
+-
+- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
++
++ void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback,
++ final long timeout) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+-
++
+ //for rpc
+ Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
+new file mode 100644
+index 000000000..46dfcf71d
+--- /dev/null
++++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
+@@ -0,0 +1,510 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.client.producer;
++
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.LinkedList;
++import java.util.Map;
++import java.util.Objects;
++import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicLong;
++import org.apache.rocketmq.client.exception.MQBrokerException;
++import org.apache.rocketmq.client.exception.MQClientException;
++import org.apache.rocketmq.common.ServiceThread;
++import org.apache.rocketmq.common.message.Message;
++import org.apache.rocketmq.common.message.MessageBatch;
++import org.apache.rocketmq.common.message.MessageClientIDSetter;
++import org.apache.rocketmq.common.message.MessageConst;
++import org.apache.rocketmq.common.message.MessageDecoder;
++import org.apache.rocketmq.common.message.MessageQueue;
++import org.apache.rocketmq.logging.org.slf4j.Logger;
++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
++import org.apache.rocketmq.remoting.exception.RemotingException;
++
++public class ProduceAccumulator {
++ // totalHoldSize normal value
++ private long totalHoldSize = 32 * 1024 * 1024;
++ // holdSize normal value
++ private long holdSize = 32 * 1024;
++ // holdMs normal value
++ private int holdMs = 10;
++ private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class);
++ private final GuardForSyncSendService guardThreadForSyncSend;
++ private final GuardForAsyncSendService guardThreadForAsyncSend;
++ private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
++ private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>();
++ private AtomicLong currentlyHoldSize = new AtomicLong(0);
++ private final String instanceName;
++
++ public ProduceAccumulator(String instanceName) {
++ this.instanceName = instanceName;
++ this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
++ this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
++ }
++
++ private class GuardForSyncSendService extends ServiceThread {
++ private final String serviceName;
++
++ public GuardForSyncSendService(String clientInstanceName) {
++ serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
++ }
++
++ @Override public String getServiceName() {
++ return serviceName;
++ }
++
++ @Override public void run() {
++ log.info(this.getServiceName() + " service started");
++
++ while (!this.isStopped()) {
++ try {
++ this.doWork();
++ } catch (Exception e) {
++ log.warn(this.getServiceName() + " service has exception. ", e);
++ }
++ }
++
++ log.info(this.getServiceName() + " service end");
++ }
++
++ private void doWork() throws InterruptedException {
++ Collection<MessageAccumulation> values = syncSendBatchs.values();
++ final int sleepTime = Math.max(1, holdMs / 2);
++ for (MessageAccumulation v : values) {
++ v.wakeup();
++ synchronized (v) {
++ synchronized (v.closed) {
++ if (v.messagesSize.get() == 0) {
++ v.closed.set(true);
++ syncSendBatchs.remove(v.aggregateKey, v);
++ } else {
++ v.notify();
++ }
++ }
++ }
++ }
++ Thread.sleep(sleepTime);
++ }
++ }
++
++ private class GuardForAsyncSendService extends ServiceThread {
++ private final String serviceName;
++
++ public GuardForAsyncSendService(String clientInstanceName) {
++ serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
++ }
++
++ @Override public String getServiceName() {
++ return serviceName;
++ }
++
++ @Override public void run() {
++ log.info(this.getServiceName() + " service started");
++
++ while (!this.isStopped()) {
++ try {
++ this.doWork();
++ } catch (Exception e) {
++ log.warn(this.getServiceName() + " service has exception. ", e);
++ }
++ }
++
++ log.info(this.getServiceName() + " service end");
++ }
++
++ private void doWork() throws Exception {
++ Collection<MessageAccumulation> values = asyncSendBatchs.values();
++ final int sleepTime = Math.max(1, holdMs / 2);
++ for (MessageAccumulation v : values) {
++ if (v.readyToSend()) {
++ v.send(null);
++ }
++ synchronized (v.closed) {
++ if (v.messagesSize.get() == 0) {
++ v.closed.set(true);
++ asyncSendBatchs.remove(v.aggregateKey, v);
++ }
++ }
++ }
++ Thread.sleep(sleepTime);
++ }
++ }
++
++ void start() {
++ guardThreadForSyncSend.start();
++ guardThreadForAsyncSend.start();
++ }
++
++ void shutdown() {
++ guardThreadForSyncSend.shutdown();
++ guardThreadForAsyncSend.shutdown();
++ }
++
++ int getBatchMaxDelayMs() {
++ return holdMs;
++ }
++
++ void batchMaxDelayMs(int holdMs) {
++ if (holdMs <= 0 || holdMs > 30 * 1000) {
++ throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
++ }
++ this.holdMs = holdMs;
++ }
++
++ long getBatchMaxBytes() {
++ return holdSize;
++ }
++
++ void batchMaxBytes(long holdSize) {
++ if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
++ throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
++ }
++ this.holdSize = holdSize;
++ }
++
++ long getTotalBatchMaxBytes() {
++ return holdSize;
++ }
++
++ void totalBatchMaxBytes(long totalHoldSize) {
++ if (totalHoldSize <= 0) {
++ throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
++ }
++ this.totalHoldSize = totalHoldSize;
++ }
++
++ private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
++ DefaultMQProducer defaultMQProducer) {
++ MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
++ if (batch != null) {
++ return batch;
++ }
++ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
++ MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);
++
++ return previous == null ? batch : previous;
++ }
++
++ private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
++ DefaultMQProducer defaultMQProducer) {
++ MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
++ if (batch != null) {
++ return batch;
++ }
++ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
++ MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch);
++
++ return previous == null ? batch : previous;
++ }
++
++ SendResult send(Message msg,
++ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
++ AggregateKey partitionKey = new AggregateKey(msg);
++ while (true) {
++ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
++ int index = batch.add(msg);
++ if (index == -1) {
++ syncSendBatchs.remove(partitionKey, batch);
++ } else {
++ return batch.sendResults[index];
++ }
++ }
++ }
++
++ SendResult send(Message msg, MessageQueue mq,
++ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
++ AggregateKey partitionKey = new AggregateKey(msg, mq);
++ while (true) {
++ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
++ int index = batch.add(msg);
++ if (index == -1) {
++ syncSendBatchs.remove(partitionKey, batch);
++ } else {
++ return batch.sendResults[index];
++ }
++ }
++ }
++
++ void send(Message msg, SendCallback sendCallback,
++ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
++ AggregateKey partitionKey = new AggregateKey(msg);
++ while (true) {
++ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
++ if (!batch.add(msg, sendCallback)) {
++ asyncSendBatchs.remove(partitionKey, batch);
++ } else {
++ return;
++ }
++ }
++ }
++
++ void send(Message msg, MessageQueue mq,
++ SendCallback sendCallback,
++ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
++ AggregateKey partitionKey = new AggregateKey(msg, mq);
++ while (true) {
++ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
++ if (!batch.add(msg, sendCallback)) {
++ asyncSendBatchs.remove(partitionKey, batch);
++ } else {
++ return;
++ }
++ }
++ }
++
++ boolean tryAddMessage(Message message) {
++ synchronized (currentlyHoldSize) {
++ if (currentlyHoldSize.get() < totalHoldSize) {
++ currentlyHoldSize.addAndGet(message.getBody().length);
++ return true;
++ } else {
++ return false;
++ }
++ }
++ }
++
++ private class AggregateKey {
++ public String topic = null;
++ public MessageQueue mq = null;
++ public boolean waitStoreMsgOK = false;
++ public String tag = null;
++
++ public AggregateKey(Message message) {
++ this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
++ }
++
++ public AggregateKey(Message message, MessageQueue mq) {
++ this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
++ }
++
++ public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
++ this.topic = topic;
++ this.mq = mq;
++ this.waitStoreMsgOK = waitStoreMsgOK;
++ this.tag = tag;
++ }
++
++ @Override public boolean equals(Object o) {
++ if (this == o)
++ return true;
++ if (o == null || getClass() != o.getClass())
++ return false;
++ AggregateKey key = (AggregateKey) o;
++ return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
++ }
++
++ @Override public int hashCode() {
++ return Objects.hash(topic, mq, waitStoreMsgOK, tag);
++ }
++ }
++
++ private class MessageAccumulation {
++ private final DefaultMQProducer defaultMQProducer;
++ private LinkedList<Message> messages;
++ private LinkedList<SendCallback> sendCallbacks;
++ private Set<String> keys;
++ private AtomicBoolean closed;
++ private SendResult[] sendResults;
++ private AggregateKey aggregateKey;
++ private AtomicInteger messagesSize;
++ private int count;
++ private long createTime;
++
++ public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
++ this.defaultMQProducer = defaultMQProducer;
++ this.messages = new LinkedList<Message>();
++ this.sendCallbacks = new LinkedList<SendCallback>();
++ this.keys = new HashSet<String>();
++ this.closed = new AtomicBoolean(false);
++ this.messagesSize = new AtomicInteger(0);
++ this.aggregateKey = aggregateKey;
++ this.count = 0;
++ this.createTime = System.currentTimeMillis();
++ }
++
++ private boolean readyToSend() {
++ if (this.messagesSize.get() > holdSize
++ || System.currentTimeMillis() >= this.createTime + holdMs) {
++ return true;
++ }
++ return false;
++ }
++
++ public int add(
++ Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
++ int ret = -1;
++ synchronized (this.closed) {
++ if (this.closed.get()) {
++ return ret;
++ }
++ ret = this.count++;
++ this.messages.add(msg);
++ messagesSize.addAndGet(msg.getBody().length);
++ String msgKeys = msg.getKeys();
++ if (msgKeys != null) {
++ this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
++ }
++ }
++ synchronized (this) {
++ while (!this.closed.get()) {
++ if (readyToSend()) {
++ this.send();
++ break;
++ } else {
++ this.wait();
++ }
++ }
++ return ret;
++ }
++ }
++
++ public boolean add(Message msg,
++ SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
++ synchronized (this.closed) {
++ if (this.closed.get()) {
++ return false;
++ }
++ this.count++;
++ this.messages.add(msg);
++ this.sendCallbacks.add(sendCallback);
++ messagesSize.getAndAdd(msg.getBody().length);
++ }
++ if (readyToSend()) {
++ this.send(sendCallback);
++ }
++ return true;
++
++ }
++
++ public synchronized void wakeup() {
++ if (this.closed.get()) {
++ return;
++ }
++ this.notify();
++ }
++
++ private MessageBatch batch() {
++ MessageBatch messageBatch = new MessageBatch(this.messages);
++ messageBatch.setTopic(this.aggregateKey.topic);
++ messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
++ messageBatch.setKeys(this.keys);
++ messageBatch.setTags(this.aggregateKey.tag);
++ MessageClientIDSetter.setUniqID(messageBatch);
++ messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
++ return messageBatch;
++ }
++
++ private void splitSendResults(SendResult sendResult) {
++ if (sendResult == null) {
++ throw new IllegalArgumentException("sendResult is null");
++ }
++ boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
++ this.sendResults = new SendResult[this.count];
++ if (!isBatchConsumerQueue) {
++ String[] msgIds = sendResult.getMsgId().split(",");
++ String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
++ if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
++ throw new IllegalArgumentException("sendResult is illegal");
++ }
++ for (int i = 0; i < this.count; i++) {
++ this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
++ sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
++ sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
++ }
++ } else {
++ for (int i = 0; i < this.count; i++) {
++ this.sendResults[i] = sendResult;
++ }
++ }
++ }
++
++ private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
++ synchronized (this.closed) {
++ if (this.closed.getAndSet(true)) {
++ return;
++ }
++ }
++ MessageBatch messageBatch = this.batch();
++ SendResult sendResult = null;
++ try {
++ if (defaultMQProducer != null) {
++ sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
++ this.splitSendResults(sendResult);
++ } else {
++ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
++ }
++ } finally {
++ currentlyHoldSize.addAndGet(-messagesSize.get());
++ this.notifyAll();
++ }
++ }
++
++ private void send(SendCallback sendCallback) {
++ synchronized (this.closed) {
++ if (this.closed.getAndSet(true)) {
++ return;
++ }
++ }
++ MessageBatch messageBatch = this.batch();
++ SendResult sendResult = null;
++ try {
++ if (defaultMQProducer != null) {
++ final int size = messagesSize.get();
++ defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() {
++ @Override public void onSuccess(SendResult sendResult) {
++ try {
++ splitSendResults(sendResult);
++ int i = 0;
++ Iterator<SendCallback> it = sendCallbacks.iterator();
++ while (it.hasNext()) {
++ SendCallback v = it.next();
++ v.onSuccess(sendResults[i++]);
++ }
++ if (i != count) {
++ throw new IllegalArgumentException("sendResult is illegal");
++ }
++ currentlyHoldSize.addAndGet(-size);
++ } catch (Exception e) {
++ onException(e);
++ }
++ }
++
++ @Override public void onException(Throwable e) {
++ for (SendCallback v : sendCallbacks) {
++ v.onException(e);
++ }
++ currentlyHoldSize.addAndGet(-size);
++ }
++ });
++ } else {
++ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
++ }
++ } catch (Exception e) {
++ for (SendCallback v : sendCallbacks) {
++ v.onException(e);
++ }
++ }
++ }
++ }
++}
+diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+index 658f22ab0..d4153c7cd 100644
+--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
++++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+@@ -250,7 +250,7 @@ public class DefaultMQProducerTest {
+
+ @Test
+ public void testBatchSendMessageAsync()
+- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
++ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ final AtomicInteger cc = new AtomicInteger(0);
+ final CountDownLatch countDownLatch = new CountDownLatch(4);
+
+@@ -504,6 +504,42 @@ public class DefaultMQProducerTest {
+ assertThat(cc.get()).isEqualTo(1);
+ }
+
++ @Test
++ public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
++ final CountDownLatch countDownLatch = new CountDownLatch(1);
++ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
++ producer.setAutoBatch(true);
++ producer.send(message, new SendCallback() {
++ @Override
++ public void onSuccess(SendResult sendResult) {
++ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
++ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
++ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
++ countDownLatch.countDown();
++ }
++
++ @Override
++ public void onException(Throwable e) {
++ countDownLatch.countDown();
++ }
++ });
++
++ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
++ producer.setAutoBatch(false);
++ }
++
++ @Test
++ public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
++ producer.setAutoBatch(true);
++ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
++ SendResult sendResult = producer.send(message);
++
++ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
++ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
++ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
++ producer.setAutoBatch(false);
++ }
++
+ public static TopicRouteData createTopicRoute() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+
+diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
+new file mode 100644
+index 000000000..7074fae24
+--- /dev/null
++++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
+@@ -0,0 +1,176 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.rocketmq.client.producer;
++
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.List;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.TimeUnit;
++import org.apache.rocketmq.client.exception.MQBrokerException;
++import org.apache.rocketmq.client.exception.MQClientException;
++import org.apache.rocketmq.common.message.Message;
++import org.apache.rocketmq.common.message.MessageBatch;
++import org.apache.rocketmq.common.message.MessageQueue;
++import org.apache.rocketmq.remoting.exception.RemotingException;
++import org.junit.Test;
++
++import static org.assertj.core.api.Assertions.assertThat;
++
++public class ProduceAccumulatorTest {
++ private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
++ if (!a.getTopic().equals(b.getTopic())) {
++ return false;
++ }
++ if (!Arrays.equals(a.getBody(), b.getBody())) {
++ return false;
++ }
++ return true;
++ }
++
++ private class MockMQProducer extends DefaultMQProducer {
++ private Message beSendMessage = null;
++ private MessageQueue beSendMessageQueue = null;
++
++ @Override
++ public SendResult sendDirect(Message msg, MessageQueue mq,
++ SendCallback sendCallback) {
++ this.beSendMessage = msg;
++ this.beSendMessageQueue = mq;
++
++ SendResult sendResult = new SendResult();
++ sendResult.setMsgId("123");
++ if (sendCallback != null) {
++ sendCallback.onSuccess(sendResult);
++ }
++ return sendResult;
++ }
++ }
++
++ @Test
++ public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
++ MockMQProducer mockMQProducer = new MockMQProducer();
++
++ ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
++ produceAccumulator.start();
++
++ final CountDownLatch countDownLatch = new CountDownLatch(1);
++ List<Message> messages = new ArrayList<Message>();
++ messages.add(new Message("testTopic", "1".getBytes()));
++ messages.add(new Message("testTopic", "22".getBytes()));
++ messages.add(new Message("testTopic", "333".getBytes()));
++ messages.add(new Message("testTopic", "4444".getBytes()));
++ messages.add(new Message("testTopic", "55555".getBytes()));
++ for (Message message : messages) {
++ produceAccumulator.send(message, new SendCallback() {
++ final CountDownLatch finalCountDownLatch = countDownLatch;
++
++ @Override
++ public void onSuccess(SendResult sendResult) {
++ finalCountDownLatch.countDown();
++ }
++
++ @Override
++ public void onException(Throwable e) {
++ finalCountDownLatch.countDown();
++ }
++ }, mockMQProducer);
++ }
++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
++ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue();
++
++ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage;
++ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
++ messageBatch2.setBody(messageBatch2.encode());
++
++ assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue();
++ }
++
++ @Test
++ public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
++ final MockMQProducer mockMQProducer = new MockMQProducer();
++
++ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
++ produceAccumulator.start();
++
++ List<Message> messages = new ArrayList<Message>();
++ messages.add(new Message("testTopic", "1".getBytes()));
++ messages.add(new Message("testTopic", "22".getBytes()));
++ messages.add(new Message("testTopic", "333".getBytes()));
++ messages.add(new Message("testTopic", "4444".getBytes()));
++ messages.add(new Message("testTopic", "55555".getBytes()));
++ final CountDownLatch countDownLatch = new CountDownLatch(messages.size());
++
++ for (final Message message : messages) {
++ new Thread(new Runnable() {
++ final ProduceAccumulator finalProduceAccumulator = produceAccumulator;
++ final CountDownLatch finalCountDownLatch = countDownLatch;
++ final MockMQProducer finalMockMQProducer = mockMQProducer;
++ final Message finalMessage = message;
++
++ @Override
++ public void run() {
++ try {
++ finalProduceAccumulator.send(finalMessage, finalMockMQProducer);
++ finalCountDownLatch.countDown();
++ } catch (Exception e) {
++ e.printStackTrace();
++ }
++ }
++ }).start();
++ }
++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
++ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue();
++
++ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage;
++ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
++ messageBatch2.setBody(messageBatch2.encode());
++
++ assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic());
++ // The execution order is uncertain, just compare the length
++ assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length);
++ }
++
++ @Test
++ public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
++ MockMQProducer mockMQProducer = new MockMQProducer();
++
++ MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0);
++ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
++ produceAccumulator.start();
++
++ Message message = new Message("testTopic", "1".getBytes());
++ produceAccumulator.send(message, messageQueue, mockMQProducer);
++ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
++
++ final CountDownLatch countDownLatch = new CountDownLatch(1);
++ produceAccumulator.send(message, messageQueue, new SendCallback() {
++ @Override
++ public void onSuccess(SendResult sendResult) {
++ countDownLatch.countDown();
++ }
++
++ @Override
++ public void onException(Throwable e) {
++ countDownLatch.countDown();
++ }
++ }, mockMQProducer);
++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
++ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
++ }
++}
+diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+index a423048c5..30369b8f3 100644
+--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable<Message> {
+ private static final long serialVersionUID = 621335151046335557L;
+ private final List<Message> messages;
+
+- private MessageBatch(List<Message> messages) {
++ public MessageBatch(List<Message> messages) {
+ this.messages = messages;
+ }
+
+--
+2.32.0.windows.2
+