diff options
Diffstat (limited to 'patch006-backport-auto-batch-producer.patch')
-rw-r--r-- | patch006-backport-auto-batch-producer.patch | 2773 |
1 files changed, 2773 insertions, 0 deletions
diff --git a/patch006-backport-auto-batch-producer.patch b/patch006-backport-auto-batch-producer.patch new file mode 100644 index 0000000..264fe95 --- /dev/null +++ b/patch006-backport-auto-batch-producer.patch @@ -0,0 +1,2773 @@ +From 737c1e53383350a5671fa207ee0e4ce932850bac Mon Sep 17 00:00:00 2001 +From: rongtong <jinrongtong5@163.com> +Date: Tue, 18 Jul 2023 14:12:39 +0800 +Subject: [PATCH 1/7] [ISSUE #7029] Add a config to determine whether pop + response should return the actual retry topic or tamper with the original + topic (#7030) + +--- + .../broker/processor/PopMessageProcessor.java | 4 ++-- + .../org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++ + 2 files changed, 15 insertions(+), 2 deletions(-) + +diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +index 464f8f4fd..53e172561 100644 +--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java ++++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +@@ -591,8 +591,8 @@ public class PopMessageProcessor implements NettyRequestProcessor { + atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get()); + String brokerName = brokerController.getBrokerConfig().getBrokerName(); + for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) { +- // We should not recode buffer for normal topic message +- if (!isRetry) { ++ // We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic ++ if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) { + getMessageResult.addMessage(mapedBuffer); + } else { + List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(), +diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +index f5f0db101..a4d82d1c5 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ++++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +@@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity { + */ + private long fetchNamesrvAddrInterval = 10 * 1000; + ++ /** ++ * Pop response returns the actual retry topic rather than tampering with the original topic ++ */ ++ private boolean popResponseReturnActualRetryTopic = false; ++ + public long getMaxPopPollingSize() { + return maxPopPollingSize; + } +@@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity { + public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) { + this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval; + } ++ ++ public boolean isPopResponseReturnActualRetryTopic() { ++ return popResponseReturnActualRetryTopic; ++ } ++ ++ public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) { ++ this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic; ++ } + } +-- +2.32.0.windows.2 + + +From 7996ec3b3f7ccea01f66951ac639b48303bbf7a6 Mon Sep 17 00:00:00 2001 +From: leeyiyu <43566239+leeyiyu@users.noreply.github.com> +Date: Tue, 18 Jul 2023 20:58:56 +0800 +Subject: [PATCH 2/7] [ISSUE #6879] ConcurrentHashMapUtils fails to solve the + loop bug in JDK8 (#6883) + +--- + .../common/utils/ConcurrentHashMapUtils.java | 16 +++++++++++++++- + .../common/utils/ConcurrentHashMapUtilsTest.java | 2 ++ + 2 files changed, 17 insertions(+), 1 deletion(-) + +diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java +index 1f1b4dd89..6fd9c21c9 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java ++++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java +@@ -16,6 +16,7 @@ + */ + package org.apache.rocketmq.common.utils; + ++import java.util.Objects; + import java.util.concurrent.ConcurrentMap; + import java.util.function.Function; + +@@ -40,10 +41,23 @@ public abstract class ConcurrentHashMapUtils { + * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a> + */ + public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) { ++ Objects.requireNonNull(func); + if (isJdk8) { + V v = map.get(key); + if (null == v) { +- v = map.computeIfAbsent(key, func); ++// v = map.computeIfAbsent(key, func); ++ ++ // this bug fix methods maybe cause `func.apply` multiple calls. ++ v = func.apply(key); ++ if (null == v) { ++ return null; ++ } ++ final V res = map.putIfAbsent(key, v); ++ if (null != res) { ++ // if pre value present, means other thread put value already, and putIfAbsent not effect ++ // return exist value ++ return res; ++ } + } + return v; + } else { +diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java +index 8e32fc93a..fa97ddb1c 100644 +--- a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java ++++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java +@@ -35,5 +35,7 @@ public class ConcurrentHashMapUtilsTest { + assertEquals("2342", value1); + String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342"); + assertEquals("1111", value2); ++// map.computeIfAbsent("AaAa", key->map.computeIfAbsent("BBBB",key2->"42")); ++ ConcurrentHashMapUtils.computeIfAbsent(map, "AaAa", key -> map.computeIfAbsent("BBBB", key2 -> "42")); + } + } +\ No newline at end of file +-- +2.32.0.windows.2 + + +From e0f5295fed8791d93bfa5b8420074c00b651ddfe Mon Sep 17 00:00:00 2001 +From: lk <xdkxlk@outlook.com> +Date: Wed, 19 Jul 2023 15:55:11 +0800 +Subject: [PATCH 3/7] passing the renew event type to create the correct + context (#7045) + +--- + .../apache/rocketmq/proxy/common/RenewEvent.java | 14 +++++++++++++- + .../proxy/processor/ReceiptHandleProcessor.java | 2 +- + .../receipt/DefaultReceiptHandleManager.java | 6 +++--- + 3 files changed, 17 insertions(+), 5 deletions(-) + +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +index fdf9833cc..0ff65c1cc 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java +@@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult; + public class RenewEvent { + protected MessageReceiptHandle messageReceiptHandle; + protected long renewTime; ++ protected EventType eventType; + protected CompletableFuture<AckResult> future; + +- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture<AckResult> future) { ++ public enum EventType { ++ RENEW, ++ STOP_RENEW, ++ CLEAR_GROUP ++ } ++ ++ public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture<AckResult> future) { + this.messageReceiptHandle = messageReceiptHandle; + this.renewTime = renewTime; ++ this.eventType = eventType; + this.future = future; + } + +@@ -39,6 +47,10 @@ public class RenewEvent { + return renewTime; + } + ++ public EventType getEventType() { ++ return eventType; ++ } ++ + public CompletableFuture<AckResult> getFuture() { + return future; + } +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +index fc49e7622..460842a86 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +@@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { + public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) { + super(messagingProcessor, serviceManager); + StateEventListener<RenewEvent> eventListener = event -> { +- ProxyContext context = createContext("RenewMessage"); ++ ProxyContext context = createContext(event.getEventType().name()); + MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); + ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); + messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), +diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +index c7633d658..9f35435f0 100644 +--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java ++++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + } + if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { + CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future)); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when renew. handle:{}", messageReceiptHandle, throwable); +@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + } + RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy(); + CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future)); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future)); + future.whenComplete((ackResult, throwable) -> { + if (throwable != null) { + log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable); +@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem + try { + handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> { + CompletableFuture<AckResult> future = new CompletableFuture<>(); +- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future)); ++ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future)); + return CompletableFuture.completedFuture(null); + }); + } catch (Exception e) { +-- +2.32.0.windows.2 + + +From 2c5808b9fdab8cae63318c89f34ad48a1ab6e962 Mon Sep 17 00:00:00 2001 +From: lizhimins <707364882@qq.com> +Date: Wed, 19 Jul 2023 20:14:02 +0800 +Subject: [PATCH 4/7] [#ISSUE 7035] Fix correct min offset behavior in tiered + storage (#7038) + +--- + .../tieredstore/TieredDispatcher.java | 2 +- + .../tieredstore/TieredMessageFetcher.java | 5 + + .../tieredstore/file/CompositeFlatFile.java | 12 +- + .../tieredstore/file/TieredCommitLog.java | 46 +++++++- + .../tieredstore/file/TieredFlatFile.java | 29 +++-- + .../file/TieredFlatFileManager.java | 2 +- + .../metrics/TieredStoreMetricsConstant.java | 1 + + .../provider/posix/PosixFileSegment.java | 19 +-- + .../tieredstore/file/TieredCommitLogTest.java | 108 ++++++++++++++++++ + .../TieredStoreMetricsManagerTest.java | 4 +- + .../src/test/resources/rmq.logback-test.xml | 2 +- + 11 files changed, 201 insertions(+), 29 deletions(-) + create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java + +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +index 6584b0e89..523b0c2cd 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +@@ -352,7 +352,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch + case SUCCESS: + long offset = MessageBufferUtil.getQueueOffset(message); + if (queueOffset != offset) { +- logger.error("Message cq offset in commitlog does not meet expectations, " + ++ logger.warn("Message cq offset in commitlog does not meet expectations, " + + "result={}, topic={}, queueId={}, cq offset={}, msg offset={}", + AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset); + } +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +index 8802a73a3..c4fed54bd 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +@@ -473,6 +473,11 @@ public class TieredMessageFetcher implements MessageStoreFetcher { + return CompletableFuture.completedFuture(result); + } + ++ // request range | result ++ // (0, min) | too small ++ // [min, max) | correct ++ // [max, max] | overflow one ++ // (max, +oo) | overflow badly + if (queueOffset < minQueueOffset) { + result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL); + result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset()); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +index 8f8ba98b1..fa01382e1 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +@@ -120,17 +120,19 @@ public class CompositeFlatFile implements CompositeAccess { + return commitLog.getBeginTimestamp(); + } + +- public long getConsumeQueueBaseOffset() { +- return consumeQueue.getBaseOffset(); +- } +- + @Override + public long getCommitLogDispatchCommitOffset() { + return commitLog.getDispatchCommitOffset(); + } + ++ public long getConsumeQueueBaseOffset() { ++ return consumeQueue.getBaseOffset(); ++ } ++ + public long getConsumeQueueMinOffset() { +- return consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; ++ long cqOffset = consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; ++ long effectiveOffset = this.commitLog.getMinConsumeQueueOffset(); ++ return Math.max(cqOffset, effectiveOffset); + } + + public long getConsumeQueueCommitOffset() { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +index 92aea58be..80e1bce50 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java +@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; + import java.nio.ByteBuffer; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicLong; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + import org.apache.rocketmq.tieredstore.common.AppendResult; +@@ -31,6 +32,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + public class TieredCommitLog { + + private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); ++ private static final Long NOT_EXIST_MIN_OFFSET = -1L; + + /** + * item size: int, 4 bytes +@@ -42,10 +44,13 @@ public class TieredCommitLog { + + private final TieredMessageStoreConfig storeConfig; + private final TieredFlatFile flatFile; ++ private final AtomicLong minConsumeQueueOffset; + + public TieredCommitLog(TieredFileAllocator fileQueueFactory, String filePath) { + this.storeConfig = fileQueueFactory.getStoreConfig(); + this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath); ++ this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET); ++ this.correctMinOffset(); + } + + @VisibleForTesting +@@ -61,6 +66,10 @@ public class TieredCommitLog { + return flatFile.getCommitOffset(); + } + ++ public long getMinConsumeQueueOffset() { ++ return minConsumeQueueOffset.get() != NOT_EXIST_MIN_OFFSET ? minConsumeQueueOffset.get() : correctMinOffset(); ++ } ++ + public long getDispatchCommitOffset() { + return flatFile.getDispatchCommitOffset(); + } +@@ -82,6 +91,39 @@ public class TieredCommitLog { + return flatFile.getFileToWrite().getMaxTimestamp(); + } + ++ public synchronized long correctMinOffset() { ++ if (flatFile.getFileSegmentCount() == 0) { ++ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); ++ return NOT_EXIST_MIN_OFFSET; ++ } ++ ++ // queue offset field length is 8 ++ int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8; ++ if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) { ++ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET); ++ return NOT_EXIST_MIN_OFFSET; ++ } ++ ++ try { ++ return this.flatFile.readAsync(this.flatFile.getMinOffset(), length) ++ .thenApply(buffer -> { ++ long offset = MessageBufferUtil.getQueueOffset(buffer); ++ minConsumeQueueOffset.set(offset); ++ log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}", ++ flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset()); ++ return offset; ++ }) ++ .exceptionally(throwable -> { ++ log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}", ++ flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable); ++ return minConsumeQueueOffset.get(); ++ }).get(); ++ } catch (Exception e) { ++ log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e); ++ } ++ return minConsumeQueueOffset.get(); ++ } ++ + public AppendResult append(ByteBuffer byteBuf) { + return flatFile.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf)); + } +@@ -99,7 +141,9 @@ public class TieredCommitLog { + } + + public void cleanExpiredFile(long expireTimestamp) { +- flatFile.cleanExpiredFile(expireTimestamp); ++ if (flatFile.cleanExpiredFile(expireTimestamp) > 0) { ++ correctMinOffset(); ++ } + } + + public void destroyExpiredFile() { +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +index a71323348..90ca843bf 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +@@ -133,6 +133,14 @@ public class TieredFlatFile { + } + } + ++ public String getFilePath() { ++ return filePath; ++ } ++ ++ public FileSegmentType getFileType() { ++ return fileType; ++ } ++ + @VisibleForTesting + public List<TieredFileSegment> getFileSegmentList() { + return fileSegmentList; +@@ -333,10 +341,9 @@ public class TieredFlatFile { + TieredFileSegment fileSegment = this.newSegment(fileType, offset, true); + fileSegmentList.add(fileSegment); + needCommitFileSegmentList.add(fileSegment); +- + Collections.sort(fileSegmentList); +- +- logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType); ++ logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", ++ offset, fileSegment.getPath(), fileType); + return fileSegment; + } finally { + fileSegmentLock.writeLock().unlock(); +@@ -429,7 +436,7 @@ public class TieredFlatFile { + return result; + } + +- public void cleanExpiredFile(long expireTimestamp) { ++ public int cleanExpiredFile(long expireTimestamp) { + Set<Long> needToDeleteSet = new HashSet<>(); + try { + tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> { +@@ -438,32 +445,32 @@ public class TieredFlatFile { + } + }); + } catch (Exception e) { +- logger.error("clean expired failed: filePath: {}, file type: {}, expire timestamp: {}", ++ logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}", + filePath, fileType, expireTimestamp); + } + + if (needToDeleteSet.isEmpty()) { +- return; ++ return 0; + } + + fileSegmentLock.writeLock().lock(); + try { + for (int i = 0; i < fileSegmentList.size(); i++) { ++ TieredFileSegment fileSegment = fileSegmentList.get(i); + try { +- TieredFileSegment fileSegment = fileSegmentList.get(i); + if (needToDeleteSet.contains(fileSegment.getBaseOffset())) { + fileSegment.close(); + fileSegmentList.remove(fileSegment); + needCommitFileSegmentList.remove(fileSegment); + i--; + this.updateFileSegment(fileSegment); +- logger.info("expired file {} is been cleaned", fileSegment.getPath()); ++ logger.debug("Clean expired file, filePath: {}", fileSegment.getPath()); + } else { + break; + } + } catch (Exception e) { +- logger.error("clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}", +- filePath, fileType, expireTimestamp, e); ++ logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}", ++ fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e); + } + } + if (fileSegmentList.size() > 0) { +@@ -476,6 +483,7 @@ public class TieredFlatFile { + } finally { + fileSegmentLock.writeLock().unlock(); + } ++ return needToDeleteSet.size(); + } + + @VisibleForTesting +@@ -493,7 +501,6 @@ public class TieredFlatFile { + fileSegment.destroyFile(); + if (!fileSegment.exists()) { + tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset()); +- logger.info("Destroyed expired file, file path: {}", fileSegment.getPath()); + } + } catch (Exception e) { + logger.error("Destroyed expired file failed, file path: {}, file type: {}", +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +index 5fe511f68..aeca44b8c 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +@@ -223,7 +223,7 @@ public class TieredFlatFileManager { + public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) { + return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> { + try { +- logger.info("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + ++ logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " + + "try to create new flat file: topic: {}, queueId: {}", + messageQueue.getTopic(), messageQueue.getQueueId()); + return new CompositeQueueFlatFile(tieredFileAllocator, mq); +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java +index ad7281510..cb4674ea9 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java +@@ -38,6 +38,7 @@ public class TieredStoreMetricsConstant { + public static final String LABEL_OPERATION = "operation"; + public static final String LABEL_SUCCESS = "success"; + ++ public static final String LABEL_PATH = "path"; + public static final String LABEL_TOPIC = "topic"; + public static final String LABEL_GROUP = "group"; + public static final String LABEL_QUEUE_ID = "queue_id"; +diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +index 8c0d1cbcd..52be90b1d 100644 +--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java ++++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +@@ -41,8 +41,8 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; + + import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; + import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION; ++import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_PATH; + import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS; +-import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC; + + /** + * this class is experimental and may change without notice. +@@ -55,6 +55,7 @@ public class PosixFileSegment extends TieredFileSegment { + private static final String OPERATION_POSIX_READ = "read"; + private static final String OPERATION_POSIX_WRITE = "write"; + ++ private final String fullPath; + private volatile File file; + private volatile FileChannel readFileChannel; + private volatile FileChannel writeFileChannel; +@@ -71,7 +72,7 @@ public class PosixFileSegment extends TieredFileSegment { + // fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset + String brokerClusterName = storeConfig.getBrokerClusterName(); + String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName; +- String fullPath = Paths.get(basePath, clusterBasePath, filePath, ++ this.fullPath = Paths.get(basePath, clusterBasePath, filePath, + fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString(); + logger.info("Constructing Posix FileSegment, filePath: {}", fullPath); + +@@ -80,13 +81,13 @@ public class PosixFileSegment extends TieredFileSegment { + + protected AttributesBuilder newAttributesBuilder() { + return TieredStoreMetricsManager.newAttributesBuilder() +- .put(LABEL_TOPIC, filePath) ++ .put(LABEL_PATH, filePath) + .put(LABEL_FILE_TYPE, fileType.name().toLowerCase()); + } + + @Override + public String getPath() { +- return filePath; ++ return fullPath; + } + + @Override +@@ -107,7 +108,7 @@ public class PosixFileSegment extends TieredFileSegment { + if (file == null) { + synchronized (this) { + if (file == null) { +- File file = new File(filePath); ++ File file = new File(fullPath); + try { + File dir = file.getParentFile(); + if (!dir.exists()) { +@@ -136,8 +137,9 @@ public class PosixFileSegment extends TieredFileSegment { + if (writeFileChannel != null && writeFileChannel.isOpen()) { + writeFileChannel.close(); + } ++ logger.info("Destroy Posix FileSegment, filePath: {}", fullPath); + } catch (IOException e) { +- logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filePath, e); ++ logger.error("Destroy Posix FileSegment failed, filePath: {}", fullPath, e); + } + + if (file.exists()) { +@@ -181,8 +183,9 @@ public class PosixFileSegment extends TieredFileSegment { + } + + @Override +- public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length, +- boolean append) { ++ public CompletableFuture<Boolean> commit0( ++ TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { ++ + Stopwatch stopwatch = Stopwatch.createStarted(); + AttributesBuilder attributesBuilder = newAttributesBuilder() + .put(LABEL_OPERATION, OPERATION_POSIX_WRITE); +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +new file mode 100644 +index 000000000..6693d3cb7 +--- /dev/null ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java +@@ -0,0 +1,108 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.rocketmq.tieredstore.file; ++ ++import java.io.File; ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.util.List; ++import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; ++import org.apache.rocketmq.tieredstore.common.AppendResult; ++import org.apache.rocketmq.tieredstore.common.FileSegmentType; ++import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; ++import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; ++import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; ++import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; ++import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; ++import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; ++import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++public class TieredCommitLogTest { ++ ++ private final String storePath = TieredStoreTestUtil.getRandomStorePath(); ++ private MessageQueue mq; ++ private TieredFileAllocator fileAllocator; ++ private TieredMetadataStore metadataStore; ++ ++ @Before ++ public void setUp() throws ClassNotFoundException, NoSuchMethodException { ++ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); ++ storeConfig.setBrokerName("brokerName"); ++ storeConfig.setStorePathRootDir(storePath); ++ storeConfig.setTieredStoreFilePath(storePath + File.separator); ++ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment"); ++ storeConfig.setCommitLogRollingInterval(0); ++ storeConfig.setTieredStoreCommitLogMaxSize(1000); ++ ++ metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); ++ fileAllocator = new TieredFileAllocator(storeConfig); ++ mq = new MessageQueue("CommitLogTest", storeConfig.getBrokerName(), 0); ++ TieredStoreExecutor.init(); ++ } ++ ++ @After ++ public void tearDown() throws IOException { ++ TieredStoreTestUtil.destroyCompositeFlatFileManager(); ++ TieredStoreTestUtil.destroyMetadataStore(); ++ TieredStoreTestUtil.destroyTempDir(storePath); ++ TieredStoreExecutor.shutdown(); ++ } ++ ++ @Test ++ public void correctMinOffsetTest() { ++ String filePath = TieredStoreUtil.toPath(mq); ++ TieredCommitLog tieredCommitLog = new TieredCommitLog(fileAllocator, filePath); ++ Assert.assertEquals(0L, tieredCommitLog.getMinOffset()); ++ Assert.assertEquals(0L, tieredCommitLog.getCommitOffset()); ++ Assert.assertEquals(0L, tieredCommitLog.getDispatchCommitOffset()); ++ ++ // append some messages ++ for (int i = 6; i < 50; i++) { ++ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer(); ++ byteBuffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, i); ++ Assert.assertEquals(AppendResult.SUCCESS, tieredCommitLog.append(byteBuffer)); ++ } ++ ++ tieredCommitLog.commit(true); ++ tieredCommitLog.correctMinOffset(); ++ ++ // single file store: 1000 / 122 = 8, file count: 44 / 8 = 5 ++ Assert.assertEquals(6, tieredCommitLog.getFlatFile().getFileSegmentCount()); ++ ++ metadataStore.iterateFileSegment(filePath, FileSegmentType.COMMIT_LOG, metadata -> { ++ if (metadata.getBaseOffset() < 1000) { ++ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); ++ metadataStore.updateFileSegment(metadata); ++ } ++ }); ++ ++ // manually delete file ++ List<TieredFileSegment> segmentList = tieredCommitLog.getFlatFile().getFileSegmentList(); ++ segmentList.remove(0).destroyFile(); ++ segmentList.remove(0).destroyFile(); ++ ++ tieredCommitLog.correctMinOffset(); ++ Assert.assertEquals(4, tieredCommitLog.getFlatFile().getFileSegmentCount()); ++ Assert.assertEquals(6 + 8 + 8, tieredCommitLog.getMinConsumeQueueOffset()); ++ } ++} +\ No newline at end of file +diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java +index a1dde0451..26b38b970 100644 +--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java ++++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java +@@ -21,6 +21,7 @@ import java.io.IOException; + import org.apache.rocketmq.tieredstore.TieredMessageFetcher; + import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; + import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; ++import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; + import org.junit.After; + import org.junit.Test; + +@@ -30,9 +31,9 @@ public class TieredStoreMetricsManagerTest { + public void tearDown() throws IOException { + TieredStoreTestUtil.destroyCompositeFlatFileManager(); + TieredStoreTestUtil.destroyMetadataStore(); ++ TieredStoreExecutor.shutdown(); + } + +- + @Test + public void getMetricsView() { + TieredStoreMetricsManager.getMetricsView(); +@@ -40,6 +41,7 @@ public class TieredStoreMetricsManagerTest { + + @Test + public void init() { ++ TieredStoreExecutor.init(); + TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig(); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment"); + TieredStoreMetricsManager.init(OpenTelemetrySdk.builder().build().getMeter(""), +diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml +index b70b42046..a7933b5ef 100644 +--- a/tieredstore/src/test/resources/rmq.logback-test.xml ++++ b/tieredstore/src/test/resources/rmq.logback-test.xml +@@ -23,7 +23,7 @@ + </encoder> + </appender> + +- <root level="debug"> ++ <root level="info"> + <appender-ref ref="STDOUT" /> + </root> + </configuration> +-- +2.32.0.windows.2 + + +From ebad3c8a6b41915edb3db65fca593123b296042d Mon Sep 17 00:00:00 2001 +From: gaoyf <gaoyf@users.noreply.github.com> +Date: Thu, 20 Jul 2023 10:59:40 +0800 +Subject: [PATCH 5/7] [ISSUE #7047] NettyRemotingClient#invokeOneway throw + Exception with address + +--- + .../rocketmq/remoting/netty/NettyRemotingClient.java | 2 +- + .../remoting/netty/NettyRemotingClientTest.java | 11 +++++++++++ + 2 files changed, 12 insertions(+), 1 deletion(-) + +diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +index afd779c83..9715b918a 100644 +--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ++++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +@@ -756,7 +756,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti + } + } else { + this.closeChannel(addr, channel); +- throw new RemotingConnectException(channelRemoteAddr); ++ throw new RemotingConnectException(addr); + } + } + +diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +index efa3eb3d5..8fabbb21d 100644 +--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java ++++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +@@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import org.apache.rocketmq.remoting.InvokeCallback; ++import org.apache.rocketmq.remoting.exception.RemotingConnectException; + import org.apache.rocketmq.remoting.exception.RemotingException; + import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; + import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +@@ -123,4 +124,14 @@ public class NettyRemotingClientTest { + Throwable thrown = catchThrowable(future::get); + assertThat(thrown.getCause()).isInstanceOf(RemotingException.class); + } ++ ++ @Test ++ public void testInvokeOnewayException() throws Exception { ++ String addr = "0.0.0.0"; ++ try { ++ remotingClient.invokeOneway(addr, null, 1000); ++ } catch (RemotingConnectException e) { ++ assertThat(e.getMessage()).contains(addr); ++ } ++ } + } +-- +2.32.0.windows.2 + + +From 804f2d85f22d9ee52573b9c6ee6abae248c9b387 Mon Sep 17 00:00:00 2001 +From: wenbin yao <ywb992134@163.com> +Date: Thu, 20 Jul 2023 11:01:38 +0800 +Subject: [PATCH 6/7] [ISSUE ##7036] rename method: getWriteQueueIdByBroker to + getWriteQueueNumsByBroker(#7037) + +* [ISSUE ##7036] rename method: getWriteQueueIdByBroker to getWriteQueueNumsByBroker + +* [ISSUE #7036] rename method from getWriteQueueIdByBroker to getWriteQueueNumsByBroker +--- + .../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 2 +- + .../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 2 +- + 2 files changed, 2 insertions(+), 2 deletions(-) + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +index a5f840500..275ada7ac 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +@@ -89,7 +89,7 @@ public class TopicPublishInfo { + return this.messageQueueList.get(pos); + } + +- public int getWriteQueueIdByBroker(final String brokerName) { ++ public int getWriteQueueNumsByBroker(final String brokerName) { + for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) { + final QueueData queueData = this.topicRouteData.getQueueDatas().get(i); + if (queueData.getBrokerName().equals(brokerName)) { +diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +index e86238e55..1e1953fad 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ++++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +@@ -69,7 +69,7 @@ public class MQFaultStrategy { + } + + final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); +- int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker); ++ int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker); + if (writeQueueNums > 0) { + final MessageQueue mq = tpInfo.selectOneMessageQueue(); + if (notBestBroker != null) { +-- +2.32.0.windows.2 + + +From af993d28e20922d91862f0911e59f748dcb64e6a Mon Sep 17 00:00:00 2001 +From: guyinyou <36399867+guyinyou@users.noreply.github.com> +Date: Fri, 21 Jul 2023 09:31:56 +0800 +Subject: [PATCH 7/7] [ISSUE #3717][RIP-27] Auto batching in producer + +Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com> +--- + .../rocketmq/client/impl/MQClientManager.java | 21 +- + .../impl/producer/DefaultMQProducerImpl.java | 36 ++ + .../client/producer/DefaultMQProducer.java | 501 +++++++++++------ + .../rocketmq/client/producer/MQProducer.java | 24 +- + .../client/producer/ProduceAccumulator.java | 510 ++++++++++++++++++ + .../producer/DefaultMQProducerTest.java | 38 +- + .../producer/ProduceAccumulatorTest.java | 176 ++++++ + .../rocketmq/common/message/MessageBatch.java | 2 +- + 8 files changed, 1133 insertions(+), 175 deletions(-) + create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java + create mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java + +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +index 49186633f..02eaa66e9 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.ClientConfig; + import org.apache.rocketmq.client.impl.factory.MQClientInstance; ++import org.apache.rocketmq.client.producer.ProduceAccumulator; + import org.apache.rocketmq.remoting.RPCHook; + import org.apache.rocketmq.logging.org.slf4j.Logger; + import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +@@ -31,6 +32,9 @@ public class MQClientManager { + private AtomicInteger factoryIndexGenerator = new AtomicInteger(); + private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = + new ConcurrentHashMap<>(); ++ private ConcurrentMap<String/* clientId */, ProduceAccumulator> accumulatorTable = ++ new ConcurrentHashMap<String, ProduceAccumulator>(); ++ + + private MQClientManager() { + +@@ -43,7 +47,6 @@ public class MQClientManager { + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { + return getOrCreateMQClientInstance(clientConfig, null); + } +- + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + String clientId = clientConfig.buildMQClientId(); + MQClientInstance instance = this.factoryTable.get(clientId); +@@ -62,6 +65,22 @@ public class MQClientManager { + + return instance; + } ++ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) { ++ String clientId = clientConfig.buildMQClientId(); ++ ProduceAccumulator accumulator = this.accumulatorTable.get(clientId); ++ if (null == accumulator) { ++ accumulator = new ProduceAccumulator(clientId); ++ ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator); ++ if (prev != null) { ++ accumulator = prev; ++ log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId); ++ } else { ++ log.info("Created new ProduceAccumulator for clientId:[{}]", clientId); ++ } ++ } ++ ++ return accumulator; ++ } + + public void removeClientFactory(final String clientId) { + this.factoryTable.remove(clientId); +diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +index 4eb0e6924..3f4c6e5f7 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ++++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +@@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements MQProducerInner { + + } + ++ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, ++ final long timeout) throws MQClientException, RemotingTooMuchRequestException { ++ long beginStartTime = System.currentTimeMillis(); ++ this.makeSureStateOK(); ++ Validators.checkMessage(msg, this.defaultMQProducer); ++ ++ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); ++ if (topicPublishInfo != null && topicPublishInfo.ok()) { ++ MessageQueue mq = null; ++ try { ++ List<MessageQueue> messageQueueList = ++ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); ++ Message userMessage = MessageAccessor.cloneMessage(msg); ++ String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); ++ userMessage.setTopic(userTopic); ++ ++ mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); ++ } catch (Throwable e) { ++ throw new MQClientException("select message queue threw exception.", e); ++ } ++ ++ long costTime = System.currentTimeMillis() - beginStartTime; ++ if (timeout < costTime) { ++ throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); ++ } ++ if (mq != null) { ++ return mq; ++ } else { ++ throw new MQClientException("select message queue return null.", null); ++ } ++ } ++ ++ validateNameServerSetting(); ++ throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); ++ } ++ + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { + return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); + } +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +index 6e9ffed8c..c5b1b5223 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators; + import org.apache.rocketmq.client.exception.MQBrokerException; + import org.apache.rocketmq.client.exception.MQClientException; + import org.apache.rocketmq.client.exception.RequestTimeoutException; ++import org.apache.rocketmq.client.impl.MQClientManager; + import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; + import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; + import org.apache.rocketmq.client.trace.TraceDispatcher; +@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.message.Message; + import org.apache.rocketmq.common.message.MessageBatch; + import org.apache.rocketmq.common.message.MessageClientIDSetter; ++import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.message.MessageExt; + import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.common.topic.TopicValidator; +@@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + + /** + * This class is the entry point for applications intending to send messages. </p> +- * ++ * <p> + * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of + * box for most scenarios. </p> +- * ++ * <p> + * This class aggregates various <code>send</code> methods to deliver messages to broker(s). Each of them has pros and + * cons; you'd better understand strengths and weakness of them before actually coding. </p> + * +@@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly + * important when transactional messages are involved. </p> +- * ++ * <p> + * For non-transactional messages, it does not matter as long as it's unique per process. </p> +- * ++ * <p> + * See <a href="https://rocketmq.apache.org/docs/introduction/02concepts">core concepts</a> for more discussion. + */ + private String producerGroup; +@@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p> +- * ++ * <p> + * This may potentially cause message duplication which is up to application developers to resolve. + */ + private int retryTimesWhenSendFailed = 2; + + /** + * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> +- * ++ * <p> + * This may potentially cause message duplication which is up to application developers to resolve. + */ + private int retryTimesWhenSendAsyncFailed = 2; +@@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + */ + private TraceDispatcher traceDispatcher = null; + ++ /** ++ * Switch flag instance for automatic batch message ++ */ ++ private boolean autoBatch = false; ++ /** ++ * Instance for batching message automatically ++ */ ++ private ProduceAccumulator produceAccumulator = null; ++ + /** + * Indicate whether to block message when asynchronous sending traffic is too heavy. + */ +@@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Constructor specifying producer group. + * +- * @param producerGroup Producer group, see the name-sake field. +- * @param rpcHook RPC hook to execute per each remoting command execution. +- * @param enableMsgTrace Switch flag instance for message trace. ++ * @param producerGroup Producer group, see the name-sake field. ++ * @param rpcHook RPC hook to execute per each remoting command execution. ++ * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default +- * trace topic name. ++ * trace topic name. + */ + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, + final String customizedTraceTopic) { +@@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Constructor specifying producer group. + * +- * @param namespace Namespace for this MQ Producer instance. ++ * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. + */ + public DefaultMQProducer(final String namespace, final String producerGroup) { +@@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * Constructor specifying both producer group and RPC hook. + * + * @param producerGroup Producer group, see the name-sake field. +- * @param rpcHook RPC hook to execute per each remoting command execution. ++ * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this(null, producerGroup, rpcHook); +@@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Constructor specifying namespace, producer group and RPC hook. + * +- * @param namespace Namespace for this MQ Producer instance. ++ * @param namespace Namespace for this MQ Producer instance. + * @param producerGroup Producer group, see the name-sake field. +- * @param rpcHook RPC hook to execute per each remoting command execution. ++ * @param rpcHook RPC hook to execute per each remoting command execution. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { + this.namespace = namespace; + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); ++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + } + + /** + * Constructor specifying producer group and enabled msg trace flag. + * +- * @param producerGroup Producer group, see the name-sake field. ++ * @param producerGroup Producer group, see the name-sake field. + * @param enableMsgTrace Switch flag instance for message trace. + */ + public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) { +@@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name. + * +- * @param producerGroup Producer group, see the name-sake field. +- * @param enableMsgTrace Switch flag instance for message trace. ++ * @param producerGroup Producer group, see the name-sake field. ++ * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default +- * trace topic name. ++ * trace topic name. + */ + public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { + this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic); +@@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic + * name. + * +- * @param namespace Namespace for this MQ Producer instance. +- * @param producerGroup Producer group, see the name-sake field. +- * @param rpcHook RPC hook to execute per each remoting command execution. +- * @param enableMsgTrace Switch flag instance for message trace. ++ * @param namespace Namespace for this MQ Producer instance. ++ * @param producerGroup Producer group, see the name-sake field. ++ * @param rpcHook RPC hook to execute per each remoting command execution. ++ * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default +- * trace topic name. ++ * trace topic name. + */ + public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, + boolean enableMsgTrace, final String customizedTraceTopic) { + this.namespace = namespace; + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); ++ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); + //if client open the message trace feature + if (enableMsgTrace) { + try { +@@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + public void start() throws MQClientException { + this.setProducerGroup(withNamespace(this.producerGroup)); + this.defaultMQProducerImpl.start(); ++ if (this.produceAccumulator != null) { ++ this.produceAccumulator.start(); ++ } + if (null != traceDispatcher) { + try { + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); +@@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + @Override + public void shutdown() { + this.defaultMQProducerImpl.shutdown(); ++ if (this.produceAccumulator != null) { ++ this.produceAccumulator.shutdown(); ++ } + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } +@@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic)); + } + ++ private boolean canBatch(Message msg) { ++ // produceAccumulator is full ++ if (!produceAccumulator.tryAddMessage(msg)) { ++ return false; ++ } ++ // delay message do not support batch processing ++ if (msg.getDelayTimeLevel() > 0) { ++ return false; ++ } ++ // retry message do not support batch processing ++ if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { ++ return false; ++ } ++ // message which have been assigned to producer group do not support batch processing ++ if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) { ++ return false; ++ } ++ return true; ++ } ++ + /** + * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p> + * +@@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * @param msg Message to send. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public SendResult send( + Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- return this.defaultMQProducerImpl.send(msg); ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ return sendByAccumulator(msg, null, null); ++ } else { ++ return sendDirect(msg, null, null); ++ } + } + + /** + * Same to {@link #send(Message)} with send timeout specified in addition. + * +- * @param msg Message to send. ++ * @param msg Message to send. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Send message to broker asynchronously. </p> +- * ++ * <p> + * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p> +- * ++ * <p> + * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link + * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and + * application developers are the one to resolve this potential issue. + * +- * @param msg Message to send. ++ * @param msg Message to send. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public void send(Message msg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- this.defaultMQProducerImpl.send(msg, sendCallback); ++ try { ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ sendByAccumulator(msg, null, sendCallback); ++ } else { ++ sendDirect(msg, null, sendCallback); ++ } ++ } catch (Throwable e) { ++ sendCallback.onException(e); ++ } + } + + /** + * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition. + * +- * @param msg message to send. ++ * @param msg message to send. + * @param sendCallback Callback to execute. +- * @param timeout send timeout. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout send timeout. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. + * + * @param msg Message to send. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * Same to {@link #send(Message)} with target message queue specified in addition. + * + * @param msg Message to send. +- * @param mq Target message queue. ++ * @param mq Target message queue. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public SendResult send(Message msg, MessageQueue mq) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq)); ++ mq = queueWithNamespace(mq); ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ return sendByAccumulator(msg, mq, null); ++ } else { ++ return sendDirect(msg, mq, null); ++ } + } + + /** + * Same to {@link #send(Message)} with target message queue and send timeout specified. + * +- * @param msg Message to send. +- * @param mq Target message queue. ++ * @param msg Message to send. ++ * @param mq Target message queue. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #send(Message, SendCallback)} with target message queue specified. + * +- * @param msg Message to send. +- * @param mq Target message queue. ++ * @param msg Message to send. ++ * @param mq Target message queue. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback); ++ mq = queueWithNamespace(mq); ++ try { ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ sendByAccumulator(msg, mq, sendCallback); ++ } else { ++ sendDirect(msg, mq, sendCallback); ++ } ++ } catch (MQBrokerException e) { ++ // ignore ++ } + } + + /** + * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified. + * +- * @param msg Message to send. +- * @param mq Target message queue. ++ * @param msg Message to send. ++ * @param mq Target message queue. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. +- * @param timeout Send timeout. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout Send timeout. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * Same to {@link #sendOneway(Message)} with target message queue specified. + * + * @param msg Message to send. +- * @param mq Target message queue. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param mq Target message queue. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #send(Message)} with message queue selector specified. + * +- * @param msg Message to send. ++ * @param msg Message to send. + * @param selector Message queue selector, through which we get target message queue to deliver message to. +- * @param arg Argument to work along with message queue selector. ++ * @param arg Argument to work along with message queue selector. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- return this.defaultMQProducerImpl.send(msg, selector, arg); ++ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout()); ++ mq = queueWithNamespace(mq); ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ return sendByAccumulator(msg, mq, null); ++ } else { ++ return sendDirect(msg, mq, null); ++ } + } + + /** + * Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified. + * +- * @param msg Message to send. ++ * @param msg Message to send. + * @param selector Message queue selector, through which we get target message queue to deliver message to. +- * @param arg Argument to work along with message queue selector. +- * @param timeout Send timeout. ++ * @param arg Argument to work along with message queue selector. ++ * @param timeout Send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any error with broker. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #send(Message, SendCallback)} with message queue selector specified. + * +- * @param msg Message to send. +- * @param selector Message selector through which to get target message queue. +- * @param arg Argument used along with message queue selector. ++ * @param msg Message to send. ++ * @param selector Message selector through which to get target message queue. ++ * @param arg Argument used along with message queue selector. + * @param sendCallback callback to execute on sending completion. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + msg.setTopic(withNamespace(msg.getTopic())); +- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); ++ try { ++ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout()); ++ mq = queueWithNamespace(mq); ++ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) { ++ sendByAccumulator(msg, mq, sendCallback); ++ } else { ++ sendDirect(msg, mq, sendCallback); ++ } ++ } catch (Throwable e) { ++ sendCallback.onException(e); ++ } + } + + /** + * Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified. + * +- * @param msg Message to send. +- * @param selector Message selector through which to get target message queue. +- * @param arg Argument used along with message queue selector. ++ * @param msg Message to send. ++ * @param selector Message selector through which to get target message queue. ++ * @param arg Argument used along with message queue selector. + * @param sendCallback callback to execute on sending completion. +- * @param timeout Send timeout. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout Send timeout. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); + } + ++ public SendResult sendDirect(Message msg, MessageQueue mq, ++ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { ++ // send in sync mode ++ if (sendCallback == null) { ++ if (mq == null) { ++ return this.defaultMQProducerImpl.send(msg); ++ } else { ++ return this.defaultMQProducerImpl.send(msg, mq); ++ } ++ } else { ++ if (mq == null) { ++ this.defaultMQProducerImpl.send(msg, sendCallback); ++ } else { ++ this.defaultMQProducerImpl.send(msg, mq, sendCallback); ++ } ++ return null; ++ } ++ } ++ ++ public SendResult sendByAccumulator(Message msg, MessageQueue mq, ++ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { ++ // check whether it can batch ++ if (!canBatch(msg)) { ++ return sendDirect(msg, mq, sendCallback); ++ } else { ++ Validators.checkMessage(msg, this); ++ MessageClientIDSetter.setUniqID(msg); ++ if (sendCallback == null) { ++ return this.produceAccumulator.send(msg, mq, this); ++ } else { ++ this.produceAccumulator.send(msg, mq, sendCallback, this); ++ return null; ++ } ++ } ++ } ++ + /** + * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p> + * +@@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially + * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. + * +- * @param msg request message to send ++ * @param msg request message to send + * @param timeout request timeout + * @return reply message +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any broker error. +- * @throws InterruptedException if the thread is interrupted. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any broker error. ++ * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. + */ + @Override +@@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Request asynchronously. </p> + * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p> +- * ++ * <p> + * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link + * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and + * application developers are the one to resolve this potential issue. + * +- * @param msg request message to send ++ * @param msg request message to send + * @param requestCallback callback to execute on request completion. +- * @param timeout request timeout +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout request timeout ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the thread is interrupted. +- * @throws MQBrokerException if there is any broker error. ++ * @throws MQBrokerException if there is any broker error. + */ + @Override + public void request(final Message msg, final RequestCallback requestCallback, final long timeout) +@@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #request(Message, long)} with message queue selector specified. + * +- * @param msg request message to send ++ * @param msg request message to send + * @param selector message queue selector, through which we get target message queue to deliver message to. +- * @param arg argument to work along with message queue selector. +- * @param timeout timeout of request. ++ * @param arg argument to work along with message queue selector. ++ * @param timeout timeout of request. + * @return reply message +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any broker error. +- * @throws InterruptedException if the thread is interrupted. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any broker error. ++ * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. + */ + @Override +@@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified. + * +- * @param msg requst message to send +- * @param selector message queue selector, through which we get target message queue to deliver message to. +- * @param arg argument to work along with message queue selector. ++ * @param msg requst message to send ++ * @param selector message queue selector, through which we get target message queue to deliver message to. ++ * @param arg argument to work along with message queue selector. + * @param requestCallback callback to execute on request completion. +- * @param timeout timeout of request. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout timeout of request. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the thread is interrupted. +- * @throws MQBrokerException if there is any broker error. ++ * @throws MQBrokerException if there is any broker error. + */ + @Override + public void request(final Message msg, final MessageQueueSelector selector, final Object arg, +@@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #request(Message, long)} with target message queue specified in addition. + * +- * @param msg request message to send +- * @param mq target message queue. ++ * @param msg request message to send ++ * @param mq target message queue. + * @param timeout request timeout +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. +- * @throws MQBrokerException if there is any broker error. +- * @throws InterruptedException if the thread is interrupted. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any broker error. ++ * @throws InterruptedException if the thread is interrupted. + * @throws RequestTimeoutException if request timeout. + */ + @Override +@@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified. + * +- * @param msg request message to send +- * @param mq target message queue. ++ * @param msg request message to send ++ * @param mq target message queue. + * @param requestCallback callback to execute on request completion. +- * @param timeout timeout of request. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param timeout timeout of request. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the thread is interrupted. +- * @throws MQBrokerException if there is any broker error. ++ * @throws MQBrokerException if there is any broker error. + */ + @Override + public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) +@@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * Same to {@link #sendOneway(Message)} with message queue selector specified. + * +- * @param msg Message to send. ++ * @param msg Message to send. + * @param selector Message queue selector, through which to determine target message queue to deliver message +- * @param arg Argument used along with message queue selector. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @param arg Argument used along with message queue selector. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Override +@@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * This method is to send transactional messages. + * +- * @param msg Transactional message to send. ++ * @param msg Transactional message to send. + * @param tranExecuter local transaction executor. +- * @param arg Argument used along with local transaction executor. ++ * @param arg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException if there is any client error. + */ +@@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + /** + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * +- * @param key accessKey +- * @param newTopic topic name +- * @param queueNum topic's queue number ++ * @param key accessKey ++ * @param newTopic topic name ++ * @param queueNum topic's queue number + * @param attributes + * @throws MQClientException if there is any client error. + */ + @Deprecated + @Override +- public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException { ++ public void createTopic(String key, String newTopic, int queueNum, ++ Map<String, String> attributes) throws MQClientException { + createTopic(key, withNamespace(newTopic), queueNum, 0, null); + } + +@@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not + * use this method. + * +- * @param key accessKey +- * @param newTopic topic name +- * @param queueNum topic's queue number ++ * @param key accessKey ++ * @param newTopic topic name ++ * @param queueNum topic's queue number + * @param topicSysFlag topic system flag + * @param attributes + * @throws MQClientException if there is any client error. + */ + @Deprecated + @Override +- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException { ++ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, ++ Map<String, String> attributes) throws MQClientException { + this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); + } + + /** + * Search consume queue offset of the given time stamp. + * +- * @param mq Instance of MessageQueue ++ * @param mq Instance of MessageQueue + * @param timestamp from when in milliseconds. + * @return Consume queue offset. + * @throws MQClientException if there is any client error. +@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query maximum offset of the given message queue. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * + * @param mq Instance of MessageQueue +@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query minimum offset of the given message queue. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * + * @param mq Instance of MessageQueue +@@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query the earliest message store time. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * + * @param mq Instance of MessageQueue +@@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query message of the given offset message ID. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * + * @param offsetMsgId message id + * @return Message specified. +- * @throws MQBrokerException if there is any broker error. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any broker error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Deprecated +@@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query message by key. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * +- * @param topic message topic +- * @param key message key index word ++ * @param topic message topic ++ * @param key message key index word + * @param maxNum max message number +- * @param begin from when +- * @param end to when ++ * @param begin from when ++ * @param end to when + * @return QueryResult instance contains matched messages. +- * @throws MQClientException if there is any client error. ++ * @throws MQClientException if there is any client error. + * @throws InterruptedException if the thread is interrupted. + */ + @Deprecated +@@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Query message of the given message ID. +- * ++ * <p> + * This method will be removed in a certain version after April 5, 2020, so please do not use this method. + * + * @param topic Topic + * @param msgId Message ID + * @return Message specified. +- * @throws MQBrokerException if there is any broker error. +- * @throws MQClientException if there is any client error. +- * @throws RemotingException if there is any network-tier error. ++ * @throws MQBrokerException if there is any broker error. ++ * @throws MQClientException if there is any client error. ++ * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ + @Deprecated +@@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + } + + @Override +- public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ++ public void send(Collection<Message> msgs, ++ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), sendCallback); + } + +@@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + @Override + public void send(Collection<Message> msgs, MessageQueue mq, +- SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ++ SendCallback sendCallback, ++ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout); + } + +@@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + return msgBatch; + } + ++ public int getBatchMaxDelayMs() { ++ if (this.produceAccumulator == null) { ++ return 0; ++ } ++ return produceAccumulator.getBatchMaxDelayMs(); ++ } ++ ++ public void batchMaxDelayMs(int holdMs) { ++ if (this.produceAccumulator == null) { ++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); ++ } ++ this.produceAccumulator.batchMaxDelayMs(holdMs); ++ } ++ ++ public long getBatchMaxBytes() { ++ if (this.produceAccumulator == null) { ++ return 0; ++ } ++ return produceAccumulator.getBatchMaxBytes(); ++ } ++ ++ public void batchMaxBytes(long holdSize) { ++ if (this.produceAccumulator == null) { ++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); ++ } ++ this.produceAccumulator.batchMaxBytes(holdSize); ++ } ++ ++ public long getTotalBatchMaxBytes() { ++ if (this.produceAccumulator == null) { ++ return 0; ++ } ++ return produceAccumulator.getTotalBatchMaxBytes(); ++ } ++ ++ public void totalBatchMaxBytes(long totalHoldSize) { ++ if (this.produceAccumulator == null) { ++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); ++ } ++ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize); ++ } ++ ++ public boolean getAutoBatch() { ++ if (this.produceAccumulator == null) { ++ return false; ++ } ++ return this.autoBatch; ++ } ++ ++ public void setAutoBatch(boolean autoBatch) { ++ if (this.produceAccumulator == null) { ++ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch"); ++ } ++ this.autoBatch = autoBatch; ++ } ++ + public String getProducerGroup() { + return producerGroup; + } +@@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { + } + + public boolean isEnableBackpressureForAsyncMode() { +- return enableBackpressureForAsyncMode; ++ return enableBackpressureForAsyncMode; + } + + public void setEnableBackpressureForAsyncMode(boolean enableBackpressureForAsyncMode) { +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +index f70ddb283..78657e623 100644 +--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin { + RemotingException, MQBrokerException, InterruptedException; + + void send(final Message msg, final SendCallback sendCallback) throws MQClientException, +- RemotingException, InterruptedException; ++ RemotingException, InterruptedException, MQBrokerException; + + void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException; +@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin { + + SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; +- +- void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, ++ ++ void send(final Collection<Message> msgs, ++ final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; +- +- void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, ++ ++ void send(final Collection<Message> msgs, final SendCallback sendCallback, ++ final long timeout) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; +- +- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, ++ ++ void send(final Collection<Message> msgs, final MessageQueue mq, ++ final SendCallback sendCallback) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; +- +- void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, ++ ++ void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, ++ final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; +- ++ + //for rpc + Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, + RemotingException, MQBrokerException, InterruptedException; +diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java +new file mode 100644 +index 000000000..46dfcf71d +--- /dev/null ++++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java +@@ -0,0 +1,510 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.client.producer; ++ ++import java.util.Arrays; ++import java.util.Collection; ++import java.util.HashSet; ++import java.util.Iterator; ++import java.util.LinkedList; ++import java.util.Map; ++import java.util.Objects; ++import java.util.Set; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.atomic.AtomicInteger; ++import java.util.concurrent.atomic.AtomicLong; ++import org.apache.rocketmq.client.exception.MQBrokerException; ++import org.apache.rocketmq.client.exception.MQClientException; ++import org.apache.rocketmq.common.ServiceThread; ++import org.apache.rocketmq.common.message.Message; ++import org.apache.rocketmq.common.message.MessageBatch; ++import org.apache.rocketmq.common.message.MessageClientIDSetter; ++import org.apache.rocketmq.common.message.MessageConst; ++import org.apache.rocketmq.common.message.MessageDecoder; ++import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.logging.org.slf4j.Logger; ++import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; ++import org.apache.rocketmq.remoting.exception.RemotingException; ++ ++public class ProduceAccumulator { ++ // totalHoldSize normal value ++ private long totalHoldSize = 32 * 1024 * 1024; ++ // holdSize normal value ++ private long holdSize = 32 * 1024; ++ // holdMs normal value ++ private int holdMs = 10; ++ private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class); ++ private final GuardForSyncSendService guardThreadForSyncSend; ++ private final GuardForAsyncSendService guardThreadForAsyncSend; ++ private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); ++ private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); ++ private AtomicLong currentlyHoldSize = new AtomicLong(0); ++ private final String instanceName; ++ ++ public ProduceAccumulator(String instanceName) { ++ this.instanceName = instanceName; ++ this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName); ++ this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName); ++ } ++ ++ private class GuardForSyncSendService extends ServiceThread { ++ private final String serviceName; ++ ++ public GuardForSyncSendService(String clientInstanceName) { ++ serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName); ++ } ++ ++ @Override public String getServiceName() { ++ return serviceName; ++ } ++ ++ @Override public void run() { ++ log.info(this.getServiceName() + " service started"); ++ ++ while (!this.isStopped()) { ++ try { ++ this.doWork(); ++ } catch (Exception e) { ++ log.warn(this.getServiceName() + " service has exception. ", e); ++ } ++ } ++ ++ log.info(this.getServiceName() + " service end"); ++ } ++ ++ private void doWork() throws InterruptedException { ++ Collection<MessageAccumulation> values = syncSendBatchs.values(); ++ final int sleepTime = Math.max(1, holdMs / 2); ++ for (MessageAccumulation v : values) { ++ v.wakeup(); ++ synchronized (v) { ++ synchronized (v.closed) { ++ if (v.messagesSize.get() == 0) { ++ v.closed.set(true); ++ syncSendBatchs.remove(v.aggregateKey, v); ++ } else { ++ v.notify(); ++ } ++ } ++ } ++ } ++ Thread.sleep(sleepTime); ++ } ++ } ++ ++ private class GuardForAsyncSendService extends ServiceThread { ++ private final String serviceName; ++ ++ public GuardForAsyncSendService(String clientInstanceName) { ++ serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName); ++ } ++ ++ @Override public String getServiceName() { ++ return serviceName; ++ } ++ ++ @Override public void run() { ++ log.info(this.getServiceName() + " service started"); ++ ++ while (!this.isStopped()) { ++ try { ++ this.doWork(); ++ } catch (Exception e) { ++ log.warn(this.getServiceName() + " service has exception. ", e); ++ } ++ } ++ ++ log.info(this.getServiceName() + " service end"); ++ } ++ ++ private void doWork() throws Exception { ++ Collection<MessageAccumulation> values = asyncSendBatchs.values(); ++ final int sleepTime = Math.max(1, holdMs / 2); ++ for (MessageAccumulation v : values) { ++ if (v.readyToSend()) { ++ v.send(null); ++ } ++ synchronized (v.closed) { ++ if (v.messagesSize.get() == 0) { ++ v.closed.set(true); ++ asyncSendBatchs.remove(v.aggregateKey, v); ++ } ++ } ++ } ++ Thread.sleep(sleepTime); ++ } ++ } ++ ++ void start() { ++ guardThreadForSyncSend.start(); ++ guardThreadForAsyncSend.start(); ++ } ++ ++ void shutdown() { ++ guardThreadForSyncSend.shutdown(); ++ guardThreadForAsyncSend.shutdown(); ++ } ++ ++ int getBatchMaxDelayMs() { ++ return holdMs; ++ } ++ ++ void batchMaxDelayMs(int holdMs) { ++ if (holdMs <= 0 || holdMs > 30 * 1000) { ++ throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs)); ++ } ++ this.holdMs = holdMs; ++ } ++ ++ long getBatchMaxBytes() { ++ return holdSize; ++ } ++ ++ void batchMaxBytes(long holdSize) { ++ if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) { ++ throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize)); ++ } ++ this.holdSize = holdSize; ++ } ++ ++ long getTotalBatchMaxBytes() { ++ return holdSize; ++ } ++ ++ void totalBatchMaxBytes(long totalHoldSize) { ++ if (totalHoldSize <= 0) { ++ throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize)); ++ } ++ this.totalHoldSize = totalHoldSize; ++ } ++ ++ private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey, ++ DefaultMQProducer defaultMQProducer) { ++ MessageAccumulation batch = syncSendBatchs.get(aggregateKey); ++ if (batch != null) { ++ return batch; ++ } ++ batch = new MessageAccumulation(aggregateKey, defaultMQProducer); ++ MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch); ++ ++ return previous == null ? batch : previous; ++ } ++ ++ private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey, ++ DefaultMQProducer defaultMQProducer) { ++ MessageAccumulation batch = asyncSendBatchs.get(aggregateKey); ++ if (batch != null) { ++ return batch; ++ } ++ batch = new MessageAccumulation(aggregateKey, defaultMQProducer); ++ MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch); ++ ++ return previous == null ? batch : previous; ++ } ++ ++ SendResult send(Message msg, ++ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ++ AggregateKey partitionKey = new AggregateKey(msg); ++ while (true) { ++ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer); ++ int index = batch.add(msg); ++ if (index == -1) { ++ syncSendBatchs.remove(partitionKey, batch); ++ } else { ++ return batch.sendResults[index]; ++ } ++ } ++ } ++ ++ SendResult send(Message msg, MessageQueue mq, ++ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ++ AggregateKey partitionKey = new AggregateKey(msg, mq); ++ while (true) { ++ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer); ++ int index = batch.add(msg); ++ if (index == -1) { ++ syncSendBatchs.remove(partitionKey, batch); ++ } else { ++ return batch.sendResults[index]; ++ } ++ } ++ } ++ ++ void send(Message msg, SendCallback sendCallback, ++ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException { ++ AggregateKey partitionKey = new AggregateKey(msg); ++ while (true) { ++ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer); ++ if (!batch.add(msg, sendCallback)) { ++ asyncSendBatchs.remove(partitionKey, batch); ++ } else { ++ return; ++ } ++ } ++ } ++ ++ void send(Message msg, MessageQueue mq, ++ SendCallback sendCallback, ++ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException { ++ AggregateKey partitionKey = new AggregateKey(msg, mq); ++ while (true) { ++ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer); ++ if (!batch.add(msg, sendCallback)) { ++ asyncSendBatchs.remove(partitionKey, batch); ++ } else { ++ return; ++ } ++ } ++ } ++ ++ boolean tryAddMessage(Message message) { ++ synchronized (currentlyHoldSize) { ++ if (currentlyHoldSize.get() < totalHoldSize) { ++ currentlyHoldSize.addAndGet(message.getBody().length); ++ return true; ++ } else { ++ return false; ++ } ++ } ++ } ++ ++ private class AggregateKey { ++ public String topic = null; ++ public MessageQueue mq = null; ++ public boolean waitStoreMsgOK = false; ++ public String tag = null; ++ ++ public AggregateKey(Message message) { ++ this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags()); ++ } ++ ++ public AggregateKey(Message message, MessageQueue mq) { ++ this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags()); ++ } ++ ++ public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) { ++ this.topic = topic; ++ this.mq = mq; ++ this.waitStoreMsgOK = waitStoreMsgOK; ++ this.tag = tag; ++ } ++ ++ @Override public boolean equals(Object o) { ++ if (this == o) ++ return true; ++ if (o == null || getClass() != o.getClass()) ++ return false; ++ AggregateKey key = (AggregateKey) o; ++ return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag); ++ } ++ ++ @Override public int hashCode() { ++ return Objects.hash(topic, mq, waitStoreMsgOK, tag); ++ } ++ } ++ ++ private class MessageAccumulation { ++ private final DefaultMQProducer defaultMQProducer; ++ private LinkedList<Message> messages; ++ private LinkedList<SendCallback> sendCallbacks; ++ private Set<String> keys; ++ private AtomicBoolean closed; ++ private SendResult[] sendResults; ++ private AggregateKey aggregateKey; ++ private AtomicInteger messagesSize; ++ private int count; ++ private long createTime; ++ ++ public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) { ++ this.defaultMQProducer = defaultMQProducer; ++ this.messages = new LinkedList<Message>(); ++ this.sendCallbacks = new LinkedList<SendCallback>(); ++ this.keys = new HashSet<String>(); ++ this.closed = new AtomicBoolean(false); ++ this.messagesSize = new AtomicInteger(0); ++ this.aggregateKey = aggregateKey; ++ this.count = 0; ++ this.createTime = System.currentTimeMillis(); ++ } ++ ++ private boolean readyToSend() { ++ if (this.messagesSize.get() > holdSize ++ || System.currentTimeMillis() >= this.createTime + holdMs) { ++ return true; ++ } ++ return false; ++ } ++ ++ public int add( ++ Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ++ int ret = -1; ++ synchronized (this.closed) { ++ if (this.closed.get()) { ++ return ret; ++ } ++ ret = this.count++; ++ this.messages.add(msg); ++ messagesSize.addAndGet(msg.getBody().length); ++ String msgKeys = msg.getKeys(); ++ if (msgKeys != null) { ++ this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR))); ++ } ++ } ++ synchronized (this) { ++ while (!this.closed.get()) { ++ if (readyToSend()) { ++ this.send(); ++ break; ++ } else { ++ this.wait(); ++ } ++ } ++ return ret; ++ } ++ } ++ ++ public boolean add(Message msg, ++ SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException { ++ synchronized (this.closed) { ++ if (this.closed.get()) { ++ return false; ++ } ++ this.count++; ++ this.messages.add(msg); ++ this.sendCallbacks.add(sendCallback); ++ messagesSize.getAndAdd(msg.getBody().length); ++ } ++ if (readyToSend()) { ++ this.send(sendCallback); ++ } ++ return true; ++ ++ } ++ ++ public synchronized void wakeup() { ++ if (this.closed.get()) { ++ return; ++ } ++ this.notify(); ++ } ++ ++ private MessageBatch batch() { ++ MessageBatch messageBatch = new MessageBatch(this.messages); ++ messageBatch.setTopic(this.aggregateKey.topic); ++ messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK); ++ messageBatch.setKeys(this.keys); ++ messageBatch.setTags(this.aggregateKey.tag); ++ MessageClientIDSetter.setUniqID(messageBatch); ++ messageBatch.setBody(MessageDecoder.encodeMessages(this.messages)); ++ return messageBatch; ++ } ++ ++ private void splitSendResults(SendResult sendResult) { ++ if (sendResult == null) { ++ throw new IllegalArgumentException("sendResult is null"); ++ } ++ boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(","); ++ this.sendResults = new SendResult[this.count]; ++ if (!isBatchConsumerQueue) { ++ String[] msgIds = sendResult.getMsgId().split(","); ++ String[] offsetMsgIds = sendResult.getOffsetMsgId().split(","); ++ if (offsetMsgIds.length != this.count || msgIds.length != this.count) { ++ throw new IllegalArgumentException("sendResult is illegal"); ++ } ++ for (int i = 0; i < this.count; i++) { ++ this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i], ++ sendResult.getMessageQueue(), sendResult.getQueueOffset() + i, ++ sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId()); ++ } ++ } else { ++ for (int i = 0; i < this.count; i++) { ++ this.sendResults[i] = sendResult; ++ } ++ } ++ } ++ ++ private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException { ++ synchronized (this.closed) { ++ if (this.closed.getAndSet(true)) { ++ return; ++ } ++ } ++ MessageBatch messageBatch = this.batch(); ++ SendResult sendResult = null; ++ try { ++ if (defaultMQProducer != null) { ++ sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null); ++ this.splitSendResults(sendResult); ++ } else { ++ throw new IllegalArgumentException("defaultMQProducer is null, can not send message"); ++ } ++ } finally { ++ currentlyHoldSize.addAndGet(-messagesSize.get()); ++ this.notifyAll(); ++ } ++ } ++ ++ private void send(SendCallback sendCallback) { ++ synchronized (this.closed) { ++ if (this.closed.getAndSet(true)) { ++ return; ++ } ++ } ++ MessageBatch messageBatch = this.batch(); ++ SendResult sendResult = null; ++ try { ++ if (defaultMQProducer != null) { ++ final int size = messagesSize.get(); ++ defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() { ++ @Override public void onSuccess(SendResult sendResult) { ++ try { ++ splitSendResults(sendResult); ++ int i = 0; ++ Iterator<SendCallback> it = sendCallbacks.iterator(); ++ while (it.hasNext()) { ++ SendCallback v = it.next(); ++ v.onSuccess(sendResults[i++]); ++ } ++ if (i != count) { ++ throw new IllegalArgumentException("sendResult is illegal"); ++ } ++ currentlyHoldSize.addAndGet(-size); ++ } catch (Exception e) { ++ onException(e); ++ } ++ } ++ ++ @Override public void onException(Throwable e) { ++ for (SendCallback v : sendCallbacks) { ++ v.onException(e); ++ } ++ currentlyHoldSize.addAndGet(-size); ++ } ++ }); ++ } else { ++ throw new IllegalArgumentException("defaultMQProducer is null, can not send message"); ++ } ++ } catch (Exception e) { ++ for (SendCallback v : sendCallbacks) { ++ v.onException(e); ++ } ++ } ++ } ++ } ++} +diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +index 658f22ab0..d4153c7cd 100644 +--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java ++++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +@@ -250,7 +250,7 @@ public class DefaultMQProducerTest { + + @Test + public void testBatchSendMessageAsync() +- throws RemotingException, MQClientException, InterruptedException, MQBrokerException { ++ throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + final AtomicInteger cc = new AtomicInteger(0); + final CountDownLatch countDownLatch = new CountDownLatch(4); + +@@ -504,6 +504,42 @@ public class DefaultMQProducerTest { + assertThat(cc.get()).isEqualTo(1); + } + ++ @Test ++ public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { ++ final CountDownLatch countDownLatch = new CountDownLatch(1); ++ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); ++ producer.setAutoBatch(true); ++ producer.send(message, new SendCallback() { ++ @Override ++ public void onSuccess(SendResult sendResult) { ++ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); ++ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); ++ assertThat(sendResult.getQueueOffset()).isEqualTo(456L); ++ countDownLatch.countDown(); ++ } ++ ++ @Override ++ public void onException(Throwable e) { ++ countDownLatch.countDown(); ++ } ++ }); ++ ++ countDownLatch.await(3000L, TimeUnit.MILLISECONDS); ++ producer.setAutoBatch(false); ++ } ++ ++ @Test ++ public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { ++ producer.setAutoBatch(true); ++ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); ++ SendResult sendResult = producer.send(message); ++ ++ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); ++ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); ++ assertThat(sendResult.getQueueOffset()).isEqualTo(456L); ++ producer.setAutoBatch(false); ++ } ++ + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + +diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java +new file mode 100644 +index 000000000..7074fae24 +--- /dev/null ++++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java +@@ -0,0 +1,176 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.rocketmq.client.producer; ++ ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.List; ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.TimeUnit; ++import org.apache.rocketmq.client.exception.MQBrokerException; ++import org.apache.rocketmq.client.exception.MQClientException; ++import org.apache.rocketmq.common.message.Message; ++import org.apache.rocketmq.common.message.MessageBatch; ++import org.apache.rocketmq.common.message.MessageQueue; ++import org.apache.rocketmq.remoting.exception.RemotingException; ++import org.junit.Test; ++ ++import static org.assertj.core.api.Assertions.assertThat; ++ ++public class ProduceAccumulatorTest { ++ private boolean compareMessageBatch(MessageBatch a, MessageBatch b) { ++ if (!a.getTopic().equals(b.getTopic())) { ++ return false; ++ } ++ if (!Arrays.equals(a.getBody(), b.getBody())) { ++ return false; ++ } ++ return true; ++ } ++ ++ private class MockMQProducer extends DefaultMQProducer { ++ private Message beSendMessage = null; ++ private MessageQueue beSendMessageQueue = null; ++ ++ @Override ++ public SendResult sendDirect(Message msg, MessageQueue mq, ++ SendCallback sendCallback) { ++ this.beSendMessage = msg; ++ this.beSendMessageQueue = mq; ++ ++ SendResult sendResult = new SendResult(); ++ sendResult.setMsgId("123"); ++ if (sendCallback != null) { ++ sendCallback.onSuccess(sendResult); ++ } ++ return sendResult; ++ } ++ } ++ ++ @Test ++ public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { ++ MockMQProducer mockMQProducer = new MockMQProducer(); ++ ++ ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); ++ produceAccumulator.start(); ++ ++ final CountDownLatch countDownLatch = new CountDownLatch(1); ++ List<Message> messages = new ArrayList<Message>(); ++ messages.add(new Message("testTopic", "1".getBytes())); ++ messages.add(new Message("testTopic", "22".getBytes())); ++ messages.add(new Message("testTopic", "333".getBytes())); ++ messages.add(new Message("testTopic", "4444".getBytes())); ++ messages.add(new Message("testTopic", "55555".getBytes())); ++ for (Message message : messages) { ++ produceAccumulator.send(message, new SendCallback() { ++ final CountDownLatch finalCountDownLatch = countDownLatch; ++ ++ @Override ++ public void onSuccess(SendResult sendResult) { ++ finalCountDownLatch.countDown(); ++ } ++ ++ @Override ++ public void onException(Throwable e) { ++ finalCountDownLatch.countDown(); ++ } ++ }, mockMQProducer); ++ } ++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); ++ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue(); ++ ++ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage; ++ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages); ++ messageBatch2.setBody(messageBatch2.encode()); ++ ++ assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue(); ++ } ++ ++ @Test ++ public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { ++ final MockMQProducer mockMQProducer = new MockMQProducer(); ++ ++ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); ++ produceAccumulator.start(); ++ ++ List<Message> messages = new ArrayList<Message>(); ++ messages.add(new Message("testTopic", "1".getBytes())); ++ messages.add(new Message("testTopic", "22".getBytes())); ++ messages.add(new Message("testTopic", "333".getBytes())); ++ messages.add(new Message("testTopic", "4444".getBytes())); ++ messages.add(new Message("testTopic", "55555".getBytes())); ++ final CountDownLatch countDownLatch = new CountDownLatch(messages.size()); ++ ++ for (final Message message : messages) { ++ new Thread(new Runnable() { ++ final ProduceAccumulator finalProduceAccumulator = produceAccumulator; ++ final CountDownLatch finalCountDownLatch = countDownLatch; ++ final MockMQProducer finalMockMQProducer = mockMQProducer; ++ final Message finalMessage = message; ++ ++ @Override ++ public void run() { ++ try { ++ finalProduceAccumulator.send(finalMessage, finalMockMQProducer); ++ finalCountDownLatch.countDown(); ++ } catch (Exception e) { ++ e.printStackTrace(); ++ } ++ } ++ }).start(); ++ } ++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); ++ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue(); ++ ++ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage; ++ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages); ++ messageBatch2.setBody(messageBatch2.encode()); ++ ++ assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic()); ++ // The execution order is uncertain, just compare the length ++ assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length); ++ } ++ ++ @Test ++ public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { ++ MockMQProducer mockMQProducer = new MockMQProducer(); ++ ++ MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0); ++ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test"); ++ produceAccumulator.start(); ++ ++ Message message = new Message("testTopic", "1".getBytes()); ++ produceAccumulator.send(message, messageQueue, mockMQProducer); ++ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue); ++ ++ final CountDownLatch countDownLatch = new CountDownLatch(1); ++ produceAccumulator.send(message, messageQueue, new SendCallback() { ++ @Override ++ public void onSuccess(SendResult sendResult) { ++ countDownLatch.countDown(); ++ } ++ ++ @Override ++ public void onException(Throwable e) { ++ countDownLatch.countDown(); ++ } ++ }, mockMQProducer); ++ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue(); ++ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue); ++ } ++} +diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +index a423048c5..30369b8f3 100644 +--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java ++++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable<Message> { + private static final long serialVersionUID = 621335151046335557L; + private final List<Message> messages; + +- private MessageBatch(List<Message> messages) { ++ public MessageBatch(List<Message> messages) { + this.messages = messages; + } + +-- +2.32.0.windows.2 + |