线程池满了:拒绝策略选型失误导致任务丢失
场景:凌晨批处理对账发现 2 万条数据丢失,系统无任何错误日志
路径:模拟提交 → 观察各策略行为 → 检查 handler 类型 → 修复为自定义策略
上篇讲了线程池队列积压导致接口全面超时的排查过程,最后提到「拒绝策略不要轻易用 AbortPolicy」。但今天要聊的,是比 AbortPolicy 更危险的两种策略——因为它们不报错。
一个线程池满了。没有抛异常、没有日志、没有告警。两万条数据在凌晨静默消失——不是网络丢包,不是业务代码 bug,是线程池的拒绝策略把它们吃了,连个饱嗝都没打。
现象:任务丢了,但没有任何报错
对账发现差异
某定时批处理服务负责每天凌晨从 MQ 拉取增量数据,经转换后写入数据库。平时每天处理 5000-8000 条记录,耗时约 10 分钟。
某天业务方反馈当天的报表少了约 2 万条记录。看时序:
- 凌晨 01:00 — 批处理任务启动,从 MQ 拉取增量数据
- 01:02 — 前 3 万条正常入库,处理耗时约 2 分钟
- 01:03 — 线程池队列打满,DiscardPolicy 开始静默丢弃
- 01:05 — MQ 消费完毕,lastOffset 已提交,应用日志无任何异常
- 早上 09:00 — 业务对账发现数据差异
排查路径是这样的:
- 上游 MQ 确认已下发全部 5 万条消息 ✅
- 下游数据库确认接收到的只有 3 万条 ❌
- 网络无丢包、MQ 无积压、DB 无死锁
- 应用日志无 error、无 warn、无 RejectedExecutionException
问题指向了处理链路内部——数据确实被消费了,但处理到一半消失了。
查看线程池配置
打开代码找到批处理使用的线程池:
// JDK 17
private final ThreadPoolExecutor batchExecutor = new ThreadPoolExecutor(
2, 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.DiscardPolicy() // ←
);
DiscardPolicy。RejectedExecutionHandler 的一个实现——rejectedExecution 方法体是空的。
// java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 静默丢弃
}
}
凌晨高峰时 5 万条任务瞬时涌入。2 个核心线程 + 1000 队列 = 瞬间打满。后续任务触发拒绝——DiscardPolicy 照单全收,全部静默丢弃。2 万条数据就这样消失了,没有留下任何痕迹。
最致命的组合:submit() + DiscardPolicy
更严重的是,这个线程池用的是 submit():
Future<?> future = batchExecutor.submit(() -> process(data));
submit() 将任务包装为 FutureTask,然后调用 execute(futureTask)。当 DiscardPolicy 丢弃这个 FutureTask 时:
execute()没有抛异常FutureTask永远不会被执行future.get()永远阻塞——调用方线程被挂住
被丢弃的不只是任务数据,还有调用方线程。如果有 1000 个任务被丢弃,就有 1000 个线程卡在 get() 上。线程泄漏 + 数据丢失,一次选型失误两个后果。
还原:复现四种拒绝策略的行为
复现代码
直接运行四种策略下的线程池行为:
// JDK 17, 4 种拒绝策略对比
// 完整代码: demo/src/main/java/cn/opencao/concurrency/rejectionpolicy/RejectionPolicyDemo.java
static void testPolicy(String name, RejectedExecutionHandler handler)
throws InterruptedException {
var pool = new ThreadPoolExecutor(
1, 1,
1, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
handler
);
var submitted = new AtomicInteger(0);
var completed = new AtomicInteger(0);
var rejected = new AtomicInteger(0);
// 提交 10 个任务,每个 sleep 500ms
for (int i = 0; i < 10; i++) {
try {
pool.execute(() -> {
try { Thread.sleep(500); } catch (Exception ignored) { }
completed.incrementAndGet();
});
submitted.incrementAndGet();
} catch (RejectedExecutionException e) {
rejected.incrementAndGet();
}
}
pool.shutdown();
pool.awaitTermination(3, TimeUnit.SECONDS);
System.out.printf("%-20s 提交=%d 完成=%d 拒绝异常=%d%n",
name, submitted.get(), completed.get(), rejected.get());
}

结果差异非常明显:
| 策略 | 提交数 | 完成数 | 拒绝异常 | 任务丢失 |
|---|---|---|---|---|
| AbortPolicy | 2 | 2 | 8 | ❌(异常通知) |
| CallerRunsPolicy | 10 | 10 | 0 | ❌(调用方线程执行) |
| DiscardPolicy | 10 | 2 | 0 | ✅ 8 个静默丢失 |
| DiscardOldestPolicy | 10 | 3 | 0 | ✅ 7 个静默丢失 |
DiscardPolicy 和 DiscardOldestPolicy 的问题一目了然:调用方认为任务提交成功了(submit 返回 true,execute 没抛异常),但实际任务从未被执行。
submit() 的陷阱更深
将上面测试改为 submit() 后观察:
for (int i = 0; i < 10; i++) {
Future<?> future = pool.submit(() -> {
try { Thread.sleep(500); } catch (Exception ignored) { }
completed.incrementAndGet();
});
submitted.incrementAndGet();
// future.get() ← 如果在这调用,线程将被永久阻塞
}
submit() 内部调用 execute()。当 DiscardPolicy 拒绝任务时,execute() 静默返回。FutureTask 从未被执行,也永远不会被标记为完成。
// ThreadPoolExecutor.submit()
public Future<?> submit(Runnable task) {
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask); // ← DiscardPolicy 在这里丢弃了 ftask
return ftask; // ← 返回的 Future 永远不会 complete
}
submit() + DiscardPolicy = Future 永远不会完成。 调用 get() 的线程永久阻塞。这是 DiscardPolicy 最危险的用法——任务丢了你不知道,线程丢了你还不知道。

路径:排查"任务消失"的方法
如果怀疑任务被拒绝策略静默丢弃,三步确认。
第一步:检查 rejectedCount
Arthas 直接读取线程池的积压统计:
# Arthas: 查看 rejectedCount
vmtool --action getInstances \
--className java.util.concurrent.ThreadPoolExecutor \
--limit 10 -x 2 \
| grep -E 'completedTaskCount|rejected|queue'
如果没有运行时监控,直接在代码里给所有线程池的 RejectedExecutionHandler 加日志包装:
// 生产推荐:拒绝日志包装
public class LoggingRejectionHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegate;
private final String poolName;
public LoggingRejectionHandler(String poolName,
RejectedExecutionHandler delegate) {
this.poolName = poolName;
this.delegate = delegate;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.warn("[{}] 任务被拒绝: queue={}, active={}, pool={}",
poolName, e.getQueue().size(),
e.getActiveCount(), e.getPoolSize());
delegate.rejectedExecution(r, e);
}
}
第二步:确认谁在静默丢弃
# Arthas: 查看 Handler 类型
ognl --classLoaderClass org.springframework.boot.loader.LaunchedURLClassLoader \
'@beanFactory@getBean("batchExecutor").getRejectedExecutionHandler()' \
-x 4
如果输出是 DiscardPolicy 或 DiscardOldestPolicy——找到了问题。
为什么当初选了 DiscardPolicy?
回头看代码提交记录,原来的开发者写了这样一段注释:
.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
// 批处理偶尔超量,拒绝时不想抛异常影响主流程
动机很清楚:"不想让拒绝抛异常影响主流程"。但有一个隐含判断被忽略了——不抛异常不代表任务就能被处理。DiscardPolicy 只是把"拒绝"藏起来了,任务该丢还是丢。
这个场景需要的其实是两种方案之一:
- CallerRunsPolicy:调用方线程执行多余任务,自动形成背压,不影响主流程且不丢数据
- 自定义策略:记录拒绝信息 + 走降级逻辑
选 DiscardPolicy 的潜台词是"我接受了任务丢失是一个可接受的结果"。但实际业务需求证明——2 万条数据丢失是不可接受的。
"有时候,一个抛出来的异常比一个隐藏的 bug 更让人安心——至少你知道出了问题。"
第三步:检查调用方式是 submit 还是 execute
execute()直接抛RejectedExecutionException(可被AbortPolicy触发,DiscardPolicy不触发)submit()的异常被封装在Future中,不调get()永远不会感知submit()+DiscardPolicy的组合会导致Future.get()永久阻塞

解读:四种拒绝策略的设计哲学
JDK 提供了四种内置 RejectedExecutionHandler,行为差异核心在于如何处理"线程池和队列都满了"这一信号。
各策略的行为本质
| 策略 | 响应方式 | 信号传递 | 推荐场景 | 风险 |
|---|---|---|---|---|
| AbortPolicy(默认) | 抛异常 | 即时(调用方 catch) | 调用方能接受失败 | 上层没 catch 就 500 |
| CallerRunsPolicy | 调用方线程执行 | 背压(调用方变慢) | ✅ 大部分场景推荐 | 调用方线程占满 |
| DiscardPolicy | 什么都不做 | 无 | ❌ 几乎不适用 | 数据静默丢失 |
| DiscardOldestPolicy | 丢队列头任务 | 无 | 日志/监控指标 | 丢关键数据 |
为什么不推荐 DiscardPolicy
看看它的源码:
// java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
空方法。不记录、不告警、不回调。和藏尸有什么区别?
"并发编程的 fail-fast 原则:出问题要出声,不出声的问题才是最大的问题。"
DiscardPolicy 违反了这个原则。一个静默丢弃任务的服务,从外面看一切正常——线程数正常、CPU 正常、没有错误日志。只有对账的时候才发现数据少了,而此时已经过去了十几小时甚至几天。
推荐的模式:自定义策略
// 生产推荐:记录 + 告警 + 降级
public class AlertOnRejectPolicy implements RejectedExecutionHandler {
private final String poolName;
public AlertOnRejectPolicy(String poolName) {
this.poolName = poolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 1. 记录拒绝信息
log.error("[{}] 拒绝任务: queue={}/{}, active={}/{}, pool={}/{}",
poolName,
e.getQueue().size(),
e.getQueue().remainingCapacity() + e.getQueue().size(),
e.getActiveCount(), e.getMaximumPoolSize(),
e.getPoolSize(), e.getMaximumPoolSize());
// 2. 发告警
alertService.send("线程池拒绝告警", poolName, ...);
// 3. 执行降级逻辑
if (r instanceof RejectedTask task) {
task.fallback();
}
}
}
submit() vs execute():拒绝行为的差异根源
两者底层最终都走到 ThreadPoolExecutor.execute(Runnable),但拒绝后的行为完全不同:
// execute() 方式——异常直接抛出
executor.execute(task);
// 如果被拒绝 → throws RejectedExecutionException
// submit() 方式——异常被封装
Future<?> future = executor.submit(task);
// 如果被拒绝 → FutureTask 被丢弃,future 永远不会 complete
// future.get() ← 永久阻塞!
原因在于 submit() 将任务包装为 FutureTask,然后调用 execute(futureTask) 执行。DiscardPolicy 拒绝时丢弃的是 FutureTask,而调用方持有的 Future 引用指向这个永远不会被执行的 FutureTask。
"submit() 不是 execute() 的安全包装——它们对拒绝的语义完全不同。"

标记:代码审查 grep 清单
在你的代码里搜这三个模式:
# 搜 DiscardPolicy / DiscardOldestPolicy——看到就得确认
grep -rn "DiscardPolicy\|DiscardOldestPolicy" src/main/java/
# 搜 new ThreadPoolExecutor——检查每个线程池的 handler
grep -rn "new ThreadPoolExecutor" src/main/java/
# 搜 Executors.newXxx——默认策略检查
grep -rn "Executors\." src/main/java/ | grep -E "ThreadPool|Fixed|Cached"
每个线程池创建都问三个问题: 1. 拒绝策略合适吗?——不要用 DiscardPolicy,除非真的不在乎丢数据 2. 用的是 submit 还是 execute?——submit + DiscardPolicy 的组合是线程池版的"已读不回" 3. 拒绝时有日志和告警吗?——没有监控的线程池等于没有消防警报的房子
金句:DiscardPolicy 不是并发策略,它是数据丢失的沉默许可。线程池满了不可怕,可怕的是它满了之后连吱一声都不会。
下篇我们聊核心线程数设错了——过大引发资源竞争、过小导致队列积压,这个看似简单的参数其实是最容易被低估的。
🔗 个人博客:https://opencao.cn 📺 公众号/知识星球:Ai拆代码的曹操