From 737c1e53383350a5671fa207ee0e4ce932850bac Mon Sep 17 00:00:00 2001
From: rongtong
Date: Tue, 18 Jul 2023 14:12:39 +0800
Subject: [PATCH 1/7] [ISSUE #7029] Add a config to determine whether pop
response should return the actual retry topic or tamper with the original
topic (#7030)
---
.../broker/processor/PopMessageProcessor.java | 4 ++--
.../org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 464f8f4fd..53e172561 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -591,8 +591,8 @@ public class PopMessageProcessor implements NettyRequestProcessor {
atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
String brokerName = brokerController.getBrokerConfig().getBrokerName();
for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
- // We should not recode buffer for normal topic message
- if (!isRetry) {
+ // We should not recode buffer when popResponseReturnActualRetryTopic is true or topic is not retry topic
+ if (brokerController.getBrokerConfig().isPopResponseReturnActualRetryTopic() || !isRetry) {
getMessageResult.addMessage(mapedBuffer);
} else {
List messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f5f0db101..a4d82d1c5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -381,6 +381,11 @@ public class BrokerConfig extends BrokerIdentity {
*/
private long fetchNamesrvAddrInterval = 10 * 1000;
+ /**
+ * Pop response returns the actual retry topic rather than tampering with the original topic
+ */
+ private boolean popResponseReturnActualRetryTopic = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1676,4 +1681,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) {
this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval;
}
+
+ public boolean isPopResponseReturnActualRetryTopic() {
+ return popResponseReturnActualRetryTopic;
+ }
+
+ public void setPopResponseReturnActualRetryTopic(boolean popResponseReturnActualRetryTopic) {
+ this.popResponseReturnActualRetryTopic = popResponseReturnActualRetryTopic;
+ }
}
--
2.32.0.windows.2
From 7996ec3b3f7ccea01f66951ac639b48303bbf7a6 Mon Sep 17 00:00:00 2001
From: leeyiyu <43566239+leeyiyu@users.noreply.github.com>
Date: Tue, 18 Jul 2023 20:58:56 +0800
Subject: [PATCH 2/7] [ISSUE #6879] ConcurrentHashMapUtils fails to solve the
loop bug in JDK8 (#6883)
---
.../common/utils/ConcurrentHashMapUtils.java | 16 +++++++++++++++-
.../common/utils/ConcurrentHashMapUtilsTest.java | 2 ++
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
index 1f1b4dd89..6fd9c21c9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common.utils;
+import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
@@ -40,10 +41,23 @@ public abstract class ConcurrentHashMapUtils {
* @see https://bugs.openjdk.java.net/browse/JDK-8161372
*/
public static V computeIfAbsent(ConcurrentMap map, K key, Function super K, ? extends V> func) {
+ Objects.requireNonNull(func);
if (isJdk8) {
V v = map.get(key);
if (null == v) {
- v = map.computeIfAbsent(key, func);
+// v = map.computeIfAbsent(key, func);
+
+ // this bug fix methods maybe cause `func.apply` multiple calls.
+ v = func.apply(key);
+ if (null == v) {
+ return null;
+ }
+ final V res = map.putIfAbsent(key, v);
+ if (null != res) {
+ // if pre value present, means other thread put value already, and putIfAbsent not effect
+ // return exist value
+ return res;
+ }
}
return v;
} else {
diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
index 8e32fc93a..fa97ddb1c 100644
--- a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
@@ -35,5 +35,7 @@ public class ConcurrentHashMapUtilsTest {
assertEquals("2342", value1);
String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342");
assertEquals("1111", value2);
+// map.computeIfAbsent("AaAa", key->map.computeIfAbsent("BBBB",key2->"42"));
+ ConcurrentHashMapUtils.computeIfAbsent(map, "AaAa", key -> map.computeIfAbsent("BBBB", key2 -> "42"));
}
}
\ No newline at end of file
--
2.32.0.windows.2
From e0f5295fed8791d93bfa5b8420074c00b651ddfe Mon Sep 17 00:00:00 2001
From: lk
Date: Wed, 19 Jul 2023 15:55:11 +0800
Subject: [PATCH 3/7] passing the renew event type to create the correct
context (#7045)
---
.../apache/rocketmq/proxy/common/RenewEvent.java | 14 +++++++++++++-
.../proxy/processor/ReceiptHandleProcessor.java | 2 +-
.../receipt/DefaultReceiptHandleManager.java | 6 +++---
3 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
index fdf9833cc..0ff65c1cc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/RenewEvent.java
@@ -23,11 +23,19 @@ import org.apache.rocketmq.client.consumer.AckResult;
public class RenewEvent {
protected MessageReceiptHandle messageReceiptHandle;
protected long renewTime;
+ protected EventType eventType;
protected CompletableFuture future;
- public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, CompletableFuture future) {
+ public enum EventType {
+ RENEW,
+ STOP_RENEW,
+ CLEAR_GROUP
+ }
+
+ public RenewEvent(MessageReceiptHandle messageReceiptHandle, long renewTime, EventType eventType, CompletableFuture future) {
this.messageReceiptHandle = messageReceiptHandle;
this.renewTime = renewTime;
+ this.eventType = eventType;
this.future = future;
}
@@ -39,6 +47,10 @@ public class RenewEvent {
return renewTime;
}
+ public EventType getEventType() {
+ return eventType;
+ }
+
public CompletableFuture getFuture() {
return future;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index fc49e7622..460842a86 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -38,7 +38,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor {
public ReceiptHandleProcessor(MessagingProcessor messagingProcessor, ServiceManager serviceManager) {
super(messagingProcessor, serviceManager);
StateEventListener eventListener = event -> {
- ProxyContext context = createContext("RenewMessage");
+ ProxyContext context = createContext(event.getEventType().name());
MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle();
ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index c7633d658..9f35435f0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -188,7 +188,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
}
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), future));
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()), RenewEvent.EventType.RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
@@ -218,7 +218,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
}
RetryPolicy retryPolicy = subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy();
CompletableFuture future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), future));
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()), RenewEvent.EventType.STOP_RENEW, future));
future.whenComplete((ackResult, throwable) -> {
if (throwable != null) {
log.error("error when nack in renew. handle:{}", messageReceiptHandle, throwable);
@@ -246,7 +246,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
try {
handleGroup.computeIfPresent(msgID, handle, messageReceiptHandle -> {
CompletableFuture future = new CompletableFuture<>();
- eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), future));
+ eventListener.fireEvent(new RenewEvent(messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), RenewEvent.EventType.CLEAR_GROUP, future));
return CompletableFuture.completedFuture(null);
});
} catch (Exception e) {
--
2.32.0.windows.2
From 2c5808b9fdab8cae63318c89f34ad48a1ab6e962 Mon Sep 17 00:00:00 2001
From: lizhimins <707364882@qq.com>
Date: Wed, 19 Jul 2023 20:14:02 +0800
Subject: [PATCH 4/7] [#ISSUE 7035] Fix correct min offset behavior in tiered
storage (#7038)
---
.../tieredstore/TieredDispatcher.java | 2 +-
.../tieredstore/TieredMessageFetcher.java | 5 +
.../tieredstore/file/CompositeFlatFile.java | 12 +-
.../tieredstore/file/TieredCommitLog.java | 46 +++++++-
.../tieredstore/file/TieredFlatFile.java | 29 +++--
.../file/TieredFlatFileManager.java | 2 +-
.../metrics/TieredStoreMetricsConstant.java | 1 +
.../provider/posix/PosixFileSegment.java | 19 +--
.../tieredstore/file/TieredCommitLogTest.java | 108 ++++++++++++++++++
.../TieredStoreMetricsManagerTest.java | 4 +-
.../src/test/resources/rmq.logback-test.xml | 2 +-
11 files changed, 201 insertions(+), 29 deletions(-)
create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 6584b0e89..523b0c2cd 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -352,7 +352,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
case SUCCESS:
long offset = MessageBufferUtil.getQueueOffset(message);
if (queueOffset != offset) {
- logger.error("Message cq offset in commitlog does not meet expectations, " +
+ logger.warn("Message cq offset in commitlog does not meet expectations, " +
"result={}, topic={}, queueId={}, cq offset={}, msg offset={}",
AppendResult.OFFSET_INCORRECT, topic, queueId, queueOffset, offset);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 8802a73a3..c4fed54bd 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -473,6 +473,11 @@ public class TieredMessageFetcher implements MessageStoreFetcher {
return CompletableFuture.completedFuture(result);
}
+ // request range | result
+ // (0, min) | too small
+ // [min, max) | correct
+ // [max, max] | overflow one
+ // (max, +oo) | overflow badly
if (queueOffset < minQueueOffset) {
result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index 8f8ba98b1..fa01382e1 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -120,17 +120,19 @@ public class CompositeFlatFile implements CompositeAccess {
return commitLog.getBeginTimestamp();
}
- public long getConsumeQueueBaseOffset() {
- return consumeQueue.getBaseOffset();
- }
-
@Override
public long getCommitLogDispatchCommitOffset() {
return commitLog.getDispatchCommitOffset();
}
+ public long getConsumeQueueBaseOffset() {
+ return consumeQueue.getBaseOffset();
+ }
+
public long getConsumeQueueMinOffset() {
- return consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+ long cqOffset = consumeQueue.getMinOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE;
+ long effectiveOffset = this.commitLog.getMinConsumeQueueOffset();
+ return Math.max(cqOffset, effectiveOffset);
}
public long getConsumeQueueCommitOffset() {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 92aea58be..80e1bce50 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
public class TieredCommitLog {
private static final Logger log = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private static final Long NOT_EXIST_MIN_OFFSET = -1L;
/**
* item size: int, 4 bytes
@@ -42,10 +44,13 @@ public class TieredCommitLog {
private final TieredMessageStoreConfig storeConfig;
private final TieredFlatFile flatFile;
+ private final AtomicLong minConsumeQueueOffset;
public TieredCommitLog(TieredFileAllocator fileQueueFactory, String filePath) {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
+ this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
+ this.correctMinOffset();
}
@VisibleForTesting
@@ -61,6 +66,10 @@ public class TieredCommitLog {
return flatFile.getCommitOffset();
}
+ public long getMinConsumeQueueOffset() {
+ return minConsumeQueueOffset.get() != NOT_EXIST_MIN_OFFSET ? minConsumeQueueOffset.get() : correctMinOffset();
+ }
+
public long getDispatchCommitOffset() {
return flatFile.getDispatchCommitOffset();
}
@@ -82,6 +91,39 @@ public class TieredCommitLog {
return flatFile.getFileToWrite().getMaxTimestamp();
}
+ public synchronized long correctMinOffset() {
+ if (flatFile.getFileSegmentCount() == 0) {
+ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
+ return NOT_EXIST_MIN_OFFSET;
+ }
+
+ // queue offset field length is 8
+ int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
+ if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
+ this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
+ return NOT_EXIST_MIN_OFFSET;
+ }
+
+ try {
+ return this.flatFile.readAsync(this.flatFile.getMinOffset(), length)
+ .thenApply(buffer -> {
+ long offset = MessageBufferUtil.getQueueOffset(buffer);
+ minConsumeQueueOffset.set(offset);
+ log.info("Correct commitlog min cq offset success, filePath={}, min cq offset={}, range={}-{}",
+ flatFile.getFilePath(), offset, flatFile.getMinOffset(), flatFile.getCommitOffset());
+ return offset;
+ })
+ .exceptionally(throwable -> {
+ log.warn("Correct commitlog min cq offset error, filePath={}, range={}-{}",
+ flatFile.getFilePath(), flatFile.getMinOffset(), flatFile.getCommitOffset(), throwable);
+ return minConsumeQueueOffset.get();
+ }).get();
+ } catch (Exception e) {
+ log.error("Correct commitlog min cq offset error, filePath={}", flatFile.getFilePath(), e);
+ }
+ return minConsumeQueueOffset.get();
+ }
+
public AppendResult append(ByteBuffer byteBuf) {
return flatFile.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf));
}
@@ -99,7 +141,9 @@ public class TieredCommitLog {
}
public void cleanExpiredFile(long expireTimestamp) {
- flatFile.cleanExpiredFile(expireTimestamp);
+ if (flatFile.cleanExpiredFile(expireTimestamp) > 0) {
+ correctMinOffset();
+ }
}
public void destroyExpiredFile() {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index a71323348..90ca843bf 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -133,6 +133,14 @@ public class TieredFlatFile {
}
}
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public FileSegmentType getFileType() {
+ return fileType;
+ }
+
@VisibleForTesting
public List getFileSegmentList() {
return fileSegmentList;
@@ -333,10 +341,9 @@ public class TieredFlatFile {
TieredFileSegment fileSegment = this.newSegment(fileType, offset, true);
fileSegmentList.add(fileSegment);
needCommitFileSegmentList.add(fileSegment);
-
Collections.sort(fileSegmentList);
-
- logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType);
+ logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}",
+ offset, fileSegment.getPath(), fileType);
return fileSegment;
} finally {
fileSegmentLock.writeLock().unlock();
@@ -429,7 +436,7 @@ public class TieredFlatFile {
return result;
}
- public void cleanExpiredFile(long expireTimestamp) {
+ public int cleanExpiredFile(long expireTimestamp) {
Set needToDeleteSet = new HashSet<>();
try {
tieredMetadataStore.iterateFileSegment(filePath, fileType, metadata -> {
@@ -438,32 +445,32 @@ public class TieredFlatFile {
}
});
} catch (Exception e) {
- logger.error("clean expired failed: filePath: {}, file type: {}, expire timestamp: {}",
+ logger.error("Clean expired file, filePath: {}, file type: {}, expire timestamp: {}",
filePath, fileType, expireTimestamp);
}
if (needToDeleteSet.isEmpty()) {
- return;
+ return 0;
}
fileSegmentLock.writeLock().lock();
try {
for (int i = 0; i < fileSegmentList.size(); i++) {
+ TieredFileSegment fileSegment = fileSegmentList.get(i);
try {
- TieredFileSegment fileSegment = fileSegmentList.get(i);
if (needToDeleteSet.contains(fileSegment.getBaseOffset())) {
fileSegment.close();
fileSegmentList.remove(fileSegment);
needCommitFileSegmentList.remove(fileSegment);
i--;
this.updateFileSegment(fileSegment);
- logger.info("expired file {} is been cleaned", fileSegment.getPath());
+ logger.debug("Clean expired file, filePath: {}", fileSegment.getPath());
} else {
break;
}
} catch (Exception e) {
- logger.error("clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
- filePath, fileType, expireTimestamp, e);
+ logger.error("Clean expired file failed: filePath: {}, file type: {}, expire timestamp: {}",
+ fileSegment.getPath(), fileSegment.getFileType(), expireTimestamp, e);
}
}
if (fileSegmentList.size() > 0) {
@@ -476,6 +483,7 @@ public class TieredFlatFile {
} finally {
fileSegmentLock.writeLock().unlock();
}
+ return needToDeleteSet.size();
}
@VisibleForTesting
@@ -493,7 +501,6 @@ public class TieredFlatFile {
fileSegment.destroyFile();
if (!fileSegment.exists()) {
tieredMetadataStore.deleteFileSegment(filePath, fileType, metadata.getBaseOffset());
- logger.info("Destroyed expired file, file path: {}", fileSegment.getPath());
}
} catch (Exception e) {
logger.error("Destroyed expired file failed, file path: {}, file type: {}",
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 5fe511f68..aeca44b8c 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -223,7 +223,7 @@ public class TieredFlatFileManager {
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue messageQueue) {
return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
try {
- logger.info("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
+ logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
"try to create new flat file: topic: {}, queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId());
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
index ad7281510..cb4674ea9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
@@ -38,6 +38,7 @@ public class TieredStoreMetricsConstant {
public static final String LABEL_OPERATION = "operation";
public static final String LABEL_SUCCESS = "success";
+ public static final String LABEL_PATH = "path";
public static final String LABEL_TOPIC = "topic";
public static final String LABEL_GROUP = "group";
public static final String LABEL_QUEUE_ID = "queue_id";
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 8c0d1cbcd..52be90b1d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -41,8 +41,8 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_PATH;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS;
-import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
/**
* this class is experimental and may change without notice.
@@ -55,6 +55,7 @@ public class PosixFileSegment extends TieredFileSegment {
private static final String OPERATION_POSIX_READ = "read";
private static final String OPERATION_POSIX_WRITE = "write";
+ private final String fullPath;
private volatile File file;
private volatile FileChannel readFileChannel;
private volatile FileChannel writeFileChannel;
@@ -71,7 +72,7 @@ public class PosixFileSegment extends TieredFileSegment {
// fullPath: basePath/hash_cluster/broker/topic/queueId/fileType/baseOffset
String brokerClusterName = storeConfig.getBrokerClusterName();
String clusterBasePath = TieredStoreUtil.getHash(brokerClusterName) + UNDERLINE + brokerClusterName;
- String fullPath = Paths.get(basePath, clusterBasePath, filePath,
+ this.fullPath = Paths.get(basePath, clusterBasePath, filePath,
fileType.toString(), TieredStoreUtil.offset2FileName(baseOffset)).toString();
logger.info("Constructing Posix FileSegment, filePath: {}", fullPath);
@@ -80,13 +81,13 @@ public class PosixFileSegment extends TieredFileSegment {
protected AttributesBuilder newAttributesBuilder() {
return TieredStoreMetricsManager.newAttributesBuilder()
- .put(LABEL_TOPIC, filePath)
+ .put(LABEL_PATH, filePath)
.put(LABEL_FILE_TYPE, fileType.name().toLowerCase());
}
@Override
public String getPath() {
- return filePath;
+ return fullPath;
}
@Override
@@ -107,7 +108,7 @@ public class PosixFileSegment extends TieredFileSegment {
if (file == null) {
synchronized (this) {
if (file == null) {
- File file = new File(filePath);
+ File file = new File(fullPath);
try {
File dir = file.getParentFile();
if (!dir.exists()) {
@@ -136,8 +137,9 @@ public class PosixFileSegment extends TieredFileSegment {
if (writeFileChannel != null && writeFileChannel.isOpen()) {
writeFileChannel.close();
}
+ logger.info("Destroy Posix FileSegment, filePath: {}", fullPath);
} catch (IOException e) {
- logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filePath, e);
+ logger.error("Destroy Posix FileSegment failed, filePath: {}", fullPath, e);
}
if (file.exists()) {
@@ -181,8 +183,9 @@ public class PosixFileSegment extends TieredFileSegment {
}
@Override
- public CompletableFuture commit0(TieredFileSegmentInputStream inputStream, long position, int length,
- boolean append) {
+ public CompletableFuture commit0(
+ TieredFileSegmentInputStream inputStream, long position, int length, boolean append) {
+
Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
new file mode 100644
index 000000000..6693d3cb7
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredCommitLogTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
+import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TieredCommitLogTest {
+
+ private final String storePath = TieredStoreTestUtil.getRandomStorePath();
+ private MessageQueue mq;
+ private TieredFileAllocator fileAllocator;
+ private TieredMetadataStore metadataStore;
+
+ @Before
+ public void setUp() throws ClassNotFoundException, NoSuchMethodException {
+ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setBrokerName("brokerName");
+ storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredStoreFilePath(storePath + File.separator);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ storeConfig.setCommitLogRollingInterval(0);
+ storeConfig.setTieredStoreCommitLogMaxSize(1000);
+
+ metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+ fileAllocator = new TieredFileAllocator(storeConfig);
+ mq = new MessageQueue("CommitLogTest", storeConfig.getBrokerName(), 0);
+ TieredStoreExecutor.init();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ TieredStoreTestUtil.destroyCompositeFlatFileManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
+ TieredStoreExecutor.shutdown();
+ }
+
+ @Test
+ public void correctMinOffsetTest() {
+ String filePath = TieredStoreUtil.toPath(mq);
+ TieredCommitLog tieredCommitLog = new TieredCommitLog(fileAllocator, filePath);
+ Assert.assertEquals(0L, tieredCommitLog.getMinOffset());
+ Assert.assertEquals(0L, tieredCommitLog.getCommitOffset());
+ Assert.assertEquals(0L, tieredCommitLog.getDispatchCommitOffset());
+
+ // append some messages
+ for (int i = 6; i < 50; i++) {
+ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ byteBuffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, i);
+ Assert.assertEquals(AppendResult.SUCCESS, tieredCommitLog.append(byteBuffer));
+ }
+
+ tieredCommitLog.commit(true);
+ tieredCommitLog.correctMinOffset();
+
+ // single file store: 1000 / 122 = 8, file count: 44 / 8 = 5
+ Assert.assertEquals(6, tieredCommitLog.getFlatFile().getFileSegmentCount());
+
+ metadataStore.iterateFileSegment(filePath, FileSegmentType.COMMIT_LOG, metadata -> {
+ if (metadata.getBaseOffset() < 1000) {
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ metadataStore.updateFileSegment(metadata);
+ }
+ });
+
+ // manually delete file
+ List segmentList = tieredCommitLog.getFlatFile().getFileSegmentList();
+ segmentList.remove(0).destroyFile();
+ segmentList.remove(0).destroyFile();
+
+ tieredCommitLog.correctMinOffset();
+ Assert.assertEquals(4, tieredCommitLog.getFlatFile().getFileSegmentCount());
+ Assert.assertEquals(6 + 8 + 8, tieredCommitLog.getMinConsumeQueueOffset());
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
index a1dde0451..26b38b970 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.junit.After;
import org.junit.Test;
@@ -30,9 +31,9 @@ public class TieredStoreMetricsManagerTest {
public void tearDown() throws IOException {
TieredStoreTestUtil.destroyCompositeFlatFileManager();
TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreExecutor.shutdown();
}
-
@Test
public void getMetricsView() {
TieredStoreMetricsManager.getMetricsView();
@@ -40,6 +41,7 @@ public class TieredStoreMetricsManagerTest {
@Test
public void init() {
+ TieredStoreExecutor.init();
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
TieredStoreMetricsManager.init(OpenTelemetrySdk.builder().build().getMeter(""),
diff --git a/tieredstore/src/test/resources/rmq.logback-test.xml b/tieredstore/src/test/resources/rmq.logback-test.xml
index b70b42046..a7933b5ef 100644
--- a/tieredstore/src/test/resources/rmq.logback-test.xml
+++ b/tieredstore/src/test/resources/rmq.logback-test.xml
@@ -23,7 +23,7 @@
-
+
--
2.32.0.windows.2
From ebad3c8a6b41915edb3db65fca593123b296042d Mon Sep 17 00:00:00 2001
From: gaoyf
Date: Thu, 20 Jul 2023 10:59:40 +0800
Subject: [PATCH 5/7] [ISSUE #7047] NettyRemotingClient#invokeOneway throw
Exception with address
---
.../rocketmq/remoting/netty/NettyRemotingClient.java | 2 +-
.../remoting/netty/NettyRemotingClientTest.java | 11 +++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index afd779c83..9715b918a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -756,7 +756,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
} else {
this.closeChannel(addr, channel);
- throw new RemotingConnectException(channelRemoteAddr);
+ throw new RemotingConnectException(addr);
}
}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index efa3eb3d5..8fabbb21d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -123,4 +124,14 @@ public class NettyRemotingClientTest {
Throwable thrown = catchThrowable(future::get);
assertThat(thrown.getCause()).isInstanceOf(RemotingException.class);
}
+
+ @Test
+ public void testInvokeOnewayException() throws Exception {
+ String addr = "0.0.0.0";
+ try {
+ remotingClient.invokeOneway(addr, null, 1000);
+ } catch (RemotingConnectException e) {
+ assertThat(e.getMessage()).contains(addr);
+ }
+ }
}
--
2.32.0.windows.2
From 804f2d85f22d9ee52573b9c6ee6abae248c9b387 Mon Sep 17 00:00:00 2001
From: wenbin yao
Date: Thu, 20 Jul 2023 11:01:38 +0800
Subject: [PATCH 6/7] [ISSUE ##7036] rename method: getWriteQueueIdByBroker to
getWriteQueueNumsByBroker(#7037)
* [ISSUE ##7036] rename method: getWriteQueueIdByBroker to getWriteQueueNumsByBroker
* [ISSUE #7036] rename method from getWriteQueueIdByBroker to getWriteQueueNumsByBroker
---
.../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 2 +-
.../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index a5f840500..275ada7ac 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -89,7 +89,7 @@ public class TopicPublishInfo {
return this.messageQueueList.get(pos);
}
- public int getWriteQueueIdByBroker(final String brokerName) {
+ public int getWriteQueueNumsByBroker(final String brokerName) {
for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
if (queueData.getBrokerName().equals(brokerName)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index e86238e55..1e1953fad 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -69,7 +69,7 @@ public class MQFaultStrategy {
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);
+ int writeQueueNums = tpInfo.getWriteQueueNumsByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
--
2.32.0.windows.2
From af993d28e20922d91862f0911e59f748dcb64e6a Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Fri, 21 Jul 2023 09:31:56 +0800
Subject: [PATCH 7/7] [ISSUE #3717][RIP-27] Auto batching in producer
Co-authored-by: guyinyou
---
.../rocketmq/client/impl/MQClientManager.java | 21 +-
.../impl/producer/DefaultMQProducerImpl.java | 36 ++
.../client/producer/DefaultMQProducer.java | 501 +++++++++++------
.../rocketmq/client/producer/MQProducer.java | 24 +-
.../client/producer/ProduceAccumulator.java | 510 ++++++++++++++++++
.../producer/DefaultMQProducerTest.java | 38 +-
.../producer/ProduceAccumulatorTest.java | 176 ++++++
.../rocketmq/common/message/MessageBatch.java | 2 +-
8 files changed, 1133 insertions(+), 175 deletions(-)
create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
create mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 49186633f..02eaa66e9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.ProduceAccumulator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -31,6 +32,9 @@ public class MQClientManager {
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap factoryTable =
new ConcurrentHashMap<>();
+ private ConcurrentMap accumulatorTable =
+ new ConcurrentHashMap();
+
private MQClientManager() {
@@ -43,7 +47,6 @@ public class MQClientManager {
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
-
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
@@ -62,6 +65,22 @@ public class MQClientManager {
return instance;
}
+ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) {
+ String clientId = clientConfig.buildMQClientId();
+ ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+ if (null == accumulator) {
+ accumulator = new ProduceAccumulator(clientId);
+ ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator);
+ if (prev != null) {
+ accumulator = prev;
+ log.warn("Returned Previous ProduceAccumulator for clientId:[{}]", clientId);
+ } else {
+ log.info("Created new ProduceAccumulator for clientId:[{}]", clientId);
+ }
+ }
+
+ return accumulator;
+ }
public void removeClientFactory(final String clientId) {
this.factoryTable.remove(clientId);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4eb0e6924..3f4c6e5f7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
+ public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
+ final long timeout) throws MQClientException, RemotingTooMuchRequestException {
+ long beginStartTime = System.currentTimeMillis();
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+ if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ MessageQueue mq = null;
+ try {
+ List messageQueueList =
+ mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+ Message userMessage = MessageAccessor.cloneMessage(msg);
+ String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
+ userMessage.setTopic(userTopic);
+
+ mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
+ } catch (Throwable e) {
+ throw new MQClientException("select message queue threw exception.", e);
+ }
+
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTime) {
+ throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
+ }
+ if (mq != null) {
+ return mq;
+ } else {
+ throw new MQClientException("select message queue return null.", null);
+ }
+ }
+
+ validateNameServerSetting();
+ throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
+ }
+
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 6e9ffed8c..c5b1b5223 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
* This class is the entry point for applications intending to send messages.
- *
+ *
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
* box for most scenarios.
- *
+ *
* This class aggregates various send
methods to deliver messages to broker(s). Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding.
*
@@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
- *
+ *
* For non-transactional messages, it does not matter as long as it's unique per process.
- *
+ *
* See core concepts for more discussion.
*/
private String producerGroup;
@@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
- *
+ *
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
- *
+ *
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
@@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private TraceDispatcher traceDispatcher = null;
+ /**
+ * Switch flag instance for automatic batch message
+ */
+ private boolean autoBatch = false;
+ /**
+ * Instance for batching message automatically
+ */
+ private ProduceAccumulator produceAccumulator = null;
+
/**
* Indicate whether to block message when asynchronous sending traffic is too heavy.
*/
@@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
@@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String namespace, final String producerGroup) {
@@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this(null, producerGroup, rpcHook);
@@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying namespace, producer group and RPC hook.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
* Constructor specifying producer group and enabled msg trace flag.
*
- * @param producerGroup Producer group, see the name-sake field.
+ * @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
@@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
* name.
*
- * @param namespace Namespace for this MQ Producer instance.
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
//if client open the message trace feature
if (enableMsgTrace) {
try {
@@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.start();
+ }
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
@@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.shutdown();
+ }
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
@@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
}
+ private boolean canBatch(Message msg) {
+ // produceAccumulator is full
+ if (!produceAccumulator.tryAddMessage(msg)) {
+ return false;
+ }
+ // delay message do not support batch processing
+ if (msg.getDelayTimeLevel() > 0) {
+ return false;
+ }
+ // retry message do not support batch processing
+ if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ return false;
+ }
+ // message which have been assigned to producer group do not support batch processing
+ if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+ return false;
+ }
+ return true;
+ }
+
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes.
*
@@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, null, null);
+ } else {
+ return sendDirect(msg, null, null);
+ }
}
/**
* Same to {@link #send(Message)} with send timeout specified in addition.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Send message to broker asynchronously.
- *
+ *
* This method returns immediately. On sending completion, sendCallback
will be executed.
- *
+ *
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, sendCallback);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, null, sendCallback);
+ } else {
+ sendDirect(msg, null, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with send timeout specified in addition.
*
- * @param msg message to send.
+ * @param msg message to send.
* @param sendCallback Callback to execute.
- * @param timeout send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
*
* @param msg Message to send.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Same to {@link #send(Message)} with target message queue specified in addition.
*
* @param msg Message to send.
- * @param mq Target message queue.
+ * @param mq Target message queue.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message)} with target message queue and send timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with target message queue specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
+ mq = queueWithNamespace(mq);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (MQBrokerException e) {
+ // ignore
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Same to {@link #sendOneway(Message)} with target message queue specified.
*
* @param msg Message to send.
- * @param mq Target message queue.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param mq Target message queue.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message)} with message queue selector specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
+ * @param arg Argument to work along with message queue selector.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, selector, arg);
+ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
- * @param timeout Send timeout.
+ * @param arg Argument to work along with message queue selector.
+ * @param timeout Send timeout.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with message queue selector specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+ try {
+ MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
+ public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ // send in sync mode
+ if (sendCallback == null) {
+ if (mq == null) {
+ return this.defaultMQProducerImpl.send(msg);
+ } else {
+ return this.defaultMQProducerImpl.send(msg, mq);
+ }
+ } else {
+ if (mq == null) {
+ this.defaultMQProducerImpl.send(msg, sendCallback);
+ } else {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+ }
+ return null;
+ }
+ }
+
+ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ // check whether it can batch
+ if (!canBatch(msg)) {
+ return sendDirect(msg, mq, sendCallback);
+ } else {
+ Validators.checkMessage(msg, this);
+ MessageClientIDSetter.setUniqID(msg);
+ if (sendCallback == null) {
+ return this.produceAccumulator.send(msg, mq, this);
+ } else {
+ this.produceAccumulator.send(msg, mq, sendCallback, this);
+ return null;
+ }
+ }
+ }
+
/**
* Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message.
*
@@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param timeout request timeout
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Request asynchronously.
* This method returns immediately. On receiving reply message, requestCallback
will be executed.
- *
+ *
* Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param requestCallback callback to execute on request completion.
- * @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
@@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, long)} with message queue selector specified.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param selector message queue selector, through which we get target message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
- * @param timeout timeout of request.
+ * @param arg argument to work along with message queue selector.
+ * @param timeout timeout of request.
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
*
- * @param msg requst message to send
- * @param selector message queue selector, through which we get target message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
+ * @param msg requst message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
@@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, long)} with target message queue specified in addition.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
@@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which to determine target message queue to deliver message
- * @param arg Argument used along with message queue selector.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param arg Argument used along with message queue selector.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method is to send transactional messages.
*
- * @param msg Transactional message to send.
+ * @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
- * @param arg Argument used along with local transaction executor.
+ * @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
- * @param key accessKey
- * @param newTopic topic name
- * @param queueNum topic's queue number
+ * @param key accessKey
+ * @param newTopic topic name
+ * @param queueNum topic's queue number
* @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, Map attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum,
+ Map attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
@@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
* use this method.
*
- * @param key accessKey
- * @param newTopic topic name
- * @param queueNum topic's queue number
+ * @param key accessKey
+ * @param newTopic topic name
+ * @param queueNum topic's queue number
* @param topicSysFlag topic system flag
* @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
+ Map attributes) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
* Search consume queue offset of the given time stamp.
*
- * @param mq Instance of MessageQueue
+ * @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return Consume queue offset.
* @throws MQClientException if there is any client error.
@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query maximum offset of the given message queue.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query minimum offset of the given message queue.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query the earliest message store time.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given offset message ID.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param offsetMsgId message id
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message by key.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
- * @param topic message topic
- * @param key message key index word
+ * @param topic message topic
+ * @param key message key index word
* @param maxNum max message number
- * @param begin from when
- * @param end to when
+ * @param begin from when
+ * @param end to when
* @return QueryResult instance contains matched messages.
- * @throws MQClientException if there is any client error.
+ * @throws MQClientException if there is any client error.
* @throws InterruptedException if the thread is interrupted.
*/
@Deprecated
@@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given message ID.
- *
+ *
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param topic Topic
* @param msgId Message ID
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
@Override
- public void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ public void send(Collection msgs,
+ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
@@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void send(Collection msgs, MessageQueue mq,
- SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback, timeout);
}
@@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return msgBatch;
}
+ public int getBatchMaxDelayMs() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxDelayMs();
+ }
+
+ public void batchMaxDelayMs(int holdMs) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxDelayMs(holdMs);
+ }
+
+ public long getBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxBytes();
+ }
+
+ public void batchMaxBytes(long holdSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxBytes(holdSize);
+ }
+
+ public long getTotalBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getTotalBatchMaxBytes();
+ }
+
+ public void totalBatchMaxBytes(long totalHoldSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
+ }
+
+ public boolean getAutoBatch() {
+ if (this.produceAccumulator == null) {
+ return false;
+ }
+ return this.autoBatch;
+ }
+
+ public void setAutoBatch(boolean autoBatch) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
+ }
+ this.autoBatch = autoBatch;
+ }
+
public String getProducerGroup() {
return producerGroup;
}
@@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
public boolean isEnableBackpressureForAsyncMode() {
- return enableBackpressureForAsyncMode;
+ return enableBackpressureForAsyncMode;
}
public void setEnableBackpressureForAsyncMode(boolean enableBackpressureForAsyncMode) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb283..78657e623 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
- RemotingException, InterruptedException;
+ RemotingException, InterruptedException, MQBrokerException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
- void send(final Collection msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
+
+ void send(final Collection msgs,
+ final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
-
- void send(final Collection msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+
+ void send(final Collection msgs, final SendCallback sendCallback,
+ final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
+
+ void send(final Collection msgs, final MessageQueue mq,
+ final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
+
+ void send(final Collection msgs, final MessageQueue mq, final SendCallback sendCallback,
+ final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
-
+
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
new file mode 100644
index 000000000..46dfcf71d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+ // totalHoldSize normal value
+ private long totalHoldSize = 32 * 1024 * 1024;
+ // holdSize normal value
+ private long holdSize = 32 * 1024;
+ // holdMs normal value
+ private int holdMs = 10;
+ private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class);
+ private final GuardForSyncSendService guardThreadForSyncSend;
+ private final GuardForAsyncSendService guardThreadForAsyncSend;
+ private Map syncSendBatchs = new ConcurrentHashMap();
+ private Map asyncSendBatchs = new ConcurrentHashMap();
+ private AtomicLong currentlyHoldSize = new AtomicLong(0);
+ private final String instanceName;
+
+ public ProduceAccumulator(String instanceName) {
+ this.instanceName = instanceName;
+ this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+ this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+ }
+
+ private class GuardForSyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForSyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws InterruptedException {
+ Collection values = syncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ v.wakeup();
+ synchronized (v) {
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ syncSendBatchs.remove(v.aggregateKey, v);
+ } else {
+ v.notify();
+ }
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ private class GuardForAsyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForAsyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws Exception {
+ Collection values = asyncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ if (v.readyToSend()) {
+ v.send(null);
+ }
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ asyncSendBatchs.remove(v.aggregateKey, v);
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ void start() {
+ guardThreadForSyncSend.start();
+ guardThreadForAsyncSend.start();
+ }
+
+ void shutdown() {
+ guardThreadForSyncSend.shutdown();
+ guardThreadForAsyncSend.shutdown();
+ }
+
+ int getBatchMaxDelayMs() {
+ return holdMs;
+ }
+
+ void batchMaxDelayMs(int holdMs) {
+ if (holdMs <= 0 || holdMs > 30 * 1000) {
+ throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+ }
+ this.holdMs = holdMs;
+ }
+
+ long getBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void batchMaxBytes(long holdSize) {
+ if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+ throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+ }
+ this.holdSize = holdSize;
+ }
+
+ long getTotalBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void totalBatchMaxBytes(long totalHoldSize) {
+ if (totalHoldSize <= 0) {
+ throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+ }
+ this.totalHoldSize = totalHoldSize;
+ }
+
+ private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ SendResult send(Message msg,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ SendResult send(Message msg, MessageQueue mq,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ void send(Message msg, SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ void send(Message msg, MessageQueue mq,
+ SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ boolean tryAddMessage(Message message) {
+ synchronized (currentlyHoldSize) {
+ if (currentlyHoldSize.get() < totalHoldSize) {
+ currentlyHoldSize.addAndGet(message.getBody().length);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private class AggregateKey {
+ public String topic = null;
+ public MessageQueue mq = null;
+ public boolean waitStoreMsgOK = false;
+ public String tag = null;
+
+ public AggregateKey(Message message) {
+ this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
+ }
+
+ public AggregateKey(Message message, MessageQueue mq) {
+ this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
+ }
+
+ public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
+ this.topic = topic;
+ this.mq = mq;
+ this.waitStoreMsgOK = waitStoreMsgOK;
+ this.tag = tag;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ AggregateKey key = (AggregateKey) o;
+ return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+ }
+ }
+
+ private class MessageAccumulation {
+ private final DefaultMQProducer defaultMQProducer;
+ private LinkedList messages;
+ private LinkedList sendCallbacks;
+ private Set keys;
+ private AtomicBoolean closed;
+ private SendResult[] sendResults;
+ private AggregateKey aggregateKey;
+ private AtomicInteger messagesSize;
+ private int count;
+ private long createTime;
+
+ public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
+ this.defaultMQProducer = defaultMQProducer;
+ this.messages = new LinkedList();
+ this.sendCallbacks = new LinkedList();
+ this.keys = new HashSet();
+ this.closed = new AtomicBoolean(false);
+ this.messagesSize = new AtomicInteger(0);
+ this.aggregateKey = aggregateKey;
+ this.count = 0;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ private boolean readyToSend() {
+ if (this.messagesSize.get() > holdSize
+ || System.currentTimeMillis() >= this.createTime + holdMs) {
+ return true;
+ }
+ return false;
+ }
+
+ public int add(
+ Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+ int ret = -1;
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return ret;
+ }
+ ret = this.count++;
+ this.messages.add(msg);
+ messagesSize.addAndGet(msg.getBody().length);
+ String msgKeys = msg.getKeys();
+ if (msgKeys != null) {
+ this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+ }
+ }
+ synchronized (this) {
+ while (!this.closed.get()) {
+ if (readyToSend()) {
+ this.send();
+ break;
+ } else {
+ this.wait();
+ }
+ }
+ return ret;
+ }
+ }
+
+ public boolean add(Message msg,
+ SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return false;
+ }
+ this.count++;
+ this.messages.add(msg);
+ this.sendCallbacks.add(sendCallback);
+ messagesSize.getAndAdd(msg.getBody().length);
+ }
+ if (readyToSend()) {
+ this.send(sendCallback);
+ }
+ return true;
+
+ }
+
+ public synchronized void wakeup() {
+ if (this.closed.get()) {
+ return;
+ }
+ this.notify();
+ }
+
+ private MessageBatch batch() {
+ MessageBatch messageBatch = new MessageBatch(this.messages);
+ messageBatch.setTopic(this.aggregateKey.topic);
+ messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+ messageBatch.setKeys(this.keys);
+ messageBatch.setTags(this.aggregateKey.tag);
+ MessageClientIDSetter.setUniqID(messageBatch);
+ messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+ return messageBatch;
+ }
+
+ private void splitSendResults(SendResult sendResult) {
+ if (sendResult == null) {
+ throw new IllegalArgumentException("sendResult is null");
+ }
+ boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
+ this.sendResults = new SendResult[this.count];
+ if (!isBatchConsumerQueue) {
+ String[] msgIds = sendResult.getMsgId().split(",");
+ String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+ if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
+ throw new IllegalArgumentException("sendResult is illegal");
+ }
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
+ sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
+ sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
+ }
+ } else {
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = sendResult;
+ }
+ }
+ }
+
+ private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
+ this.splitSendResults(sendResult);
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+ }
+ } finally {
+ currentlyHoldSize.addAndGet(-messagesSize.get());
+ this.notifyAll();
+ }
+ }
+
+ private void send(SendCallback sendCallback) {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ final int size = messagesSize.get();
+ defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+ try {
+ splitSendResults(sendResult);
+ int i = 0;
+ Iterator it = sendCallbacks.iterator();
+ while (it.hasNext()) {
+ SendCallback v = it.next();
+ v.onSuccess(sendResults[i++]);
+ }
+ if (i != count) {
+ throw new IllegalArgumentException("sendResult is illegal");
+ }
+ currentlyHoldSize.addAndGet(-size);
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ @Override public void onException(Throwable e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ currentlyHoldSize.addAndGet(-size);
+ }
+ });
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is null, can not send message");
+ }
+ } catch (Exception e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ }
+ }
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 658f22ab0..d4153c7cd 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -250,7 +250,7 @@ public class DefaultMQProducerTest {
@Test
public void testBatchSendMessageAsync()
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
@@ -504,6 +504,42 @@ public class DefaultMQProducerTest {
assertThat(cc.get()).isEqualTo(1);
}
+ @Test
+ public void testBatchSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ producer.setAutoBatch(true);
+ producer.send(message, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ });
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ producer.setAutoBatch(false);
+ }
+
+ @Test
+ public void testBatchSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ producer.setAutoBatch(true);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ SendResult sendResult = producer.send(message);
+
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ producer.setAutoBatch(false);
+ }
+
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
new file mode 100644
index 000000000..7074fae24
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProduceAccumulatorTest {
+ private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
+ if (!a.getTopic().equals(b.getTopic())) {
+ return false;
+ }
+ if (!Arrays.equals(a.getBody(), b.getBody())) {
+ return false;
+ }
+ return true;
+ }
+
+ private class MockMQProducer extends DefaultMQProducer {
+ private Message beSendMessage = null;
+ private MessageQueue beSendMessageQueue = null;
+
+ @Override
+ public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) {
+ this.beSendMessage = msg;
+ this.beSendMessageQueue = mq;
+
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("123");
+ if (sendCallback != null) {
+ sendCallback.onSuccess(sendResult);
+ }
+ return sendResult;
+ }
+ }
+
+ @Test
+ public void testProduceAccumulator_async() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ List messages = new ArrayList();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ for (Message message : messages) {
+ produceAccumulator.send(message, new SendCallback() {
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ finalCountDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ finalCountDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ }
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue();
+
+ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage;
+ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+ messageBatch2.setBody(messageBatch2.encode());
+
+ assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue();
+ }
+
+ @Test
+ public void testProduceAccumulator_sync() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ final MockMQProducer mockMQProducer = new MockMQProducer();
+
+ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ List messages = new ArrayList();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ final CountDownLatch countDownLatch = new CountDownLatch(messages.size());
+
+ for (final Message message : messages) {
+ new Thread(new Runnable() {
+ final ProduceAccumulator finalProduceAccumulator = produceAccumulator;
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+ final MockMQProducer finalMockMQProducer = mockMQProducer;
+ final Message finalMessage = message;
+
+ @Override
+ public void run() {
+ try {
+ finalProduceAccumulator.send(finalMessage, finalMockMQProducer);
+ finalCountDownLatch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessage instanceof MessageBatch).isTrue();
+
+ MessageBatch messageBatch1 = (MessageBatch) mockMQProducer.beSendMessage;
+ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+ messageBatch2.setBody(messageBatch2.encode());
+
+ assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic());
+ // The execution order is uncertain, just compare the length
+ assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length);
+ }
+
+ @Test
+ public void testProduceAccumulator_sendWithMessageQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ MessageQueue messageQueue = new MessageQueue("topicTest", "brokerTest", 0);
+ final ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ Message message = new Message("testTopic", "1".getBytes());
+ produceAccumulator.send(message, messageQueue, mockMQProducer);
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ produceAccumulator.send(message, messageQueue, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ assertThat(countDownLatch.await(3000L, TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a423048c5..30369b8f3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements Iterable {
private static final long serialVersionUID = 621335151046335557L;
private final List messages;
- private MessageBatch(List messages) {
+ public MessageBatch(List messages) {
this.messages = messages;
}
--
2.32.0.windows.2