-
Notifications
You must be signed in to change notification settings - Fork 12k
Description
Before Creating the Bug Report
-
I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
OS: Cross-platform (Issue found via static code analysis)
RocketMQ version
Branch: master
JDK Version
JDK 1.8
Describe the Bug
I conducted a static code analysis on
org.apache.rocketmq.remoting.netty.NettyRemotingServer.
I noticed that the publicExecutor is initialized using Executors.newFixedThreadPool.
According to Java best practices (and explicitly mentioned in Alibaba Java Coding Guidelines), Executors.newFixedThreadPool uses an unbounded LinkedBlockingQueue (capacity is Integer.MAX_VALUE) by default.
If the consumption rate of the thread pool is lower than the production rate (e.g., under high concurrency or network jitter), tasks will accumulate in the queue indefinitely. Since the queue is effectively unbounded, this creates a high risk of OutOfMemoryError (OOM).
Steps to Reproduce
- Start the RocketMQ NameServer or Broker.
2.Simulate a scenario where the publicExecutor handles a massive amount of requests, or the processing threads are blocked/slow.
3.Observe the heap memory usage.
4.The queue size of the executor will grow without limit, eventually leading to OOM.
What Did You Expect to See?
The publicExecutor should be initialized using ThreadPoolExecutor with a bounded queue
(e.g., LinkedBlockingQueue with a configurable capacity) and a proper RejectedExecutionHandler to prevent system crashes under load.
What Did You See Instead?
The publicExecutor uses an unbounded queue (Integer.MAX_VALUE), which offers no back-pressure mechanism or memory protection.
Additional Context
Code Location:
https://github.com/apache/rocketmq/blob/master/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
Snippet:
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@OverRide
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
I would be happy to submit a Pull Request to fix this (by replacing it with ThreadPoolExecutor) if the community confirms this issue.