线程池满了:拒绝策略选型失误导致任务丢失

场景:凌晨批处理对账发现 2 万条数据丢失,系统无任何错误日志
路径:模拟提交 → 观察各策略行为 → 检查 handler 类型 → 修复为自定义策略

上篇讲了线程池队列积压导致接口全面超时的排查过程,最后提到「拒绝策略不要轻易用 AbortPolicy」。但今天要聊的,是比 AbortPolicy 更危险的两种策略——因为它们不报错。

一个线程池满了。没有抛异常、没有日志、没有告警。两万条数据在凌晨静默消失——不是网络丢包,不是业务代码 bug,是线程池的拒绝策略把它们吃了,连个饱嗝都没打。

现象:任务丢了,但没有任何报错

对账发现差异

某定时批处理服务负责每天凌晨从 MQ 拉取增量数据,经转换后写入数据库。平时每天处理 5000-8000 条记录,耗时约 10 分钟。

某天业务方反馈当天的报表少了约 2 万条记录。看时序:

  • 凌晨 01:00 — 批处理任务启动,从 MQ 拉取增量数据
  • 01:02 — 前 3 万条正常入库,处理耗时约 2 分钟
  • 01:03 — 线程池队列打满,DiscardPolicy 开始静默丢弃
  • 01:05 — MQ 消费完毕,lastOffset 已提交,应用日志无任何异常
  • 早上 09:00 — 业务对账发现数据差异

排查路径是这样的:

  1. 上游 MQ 确认已下发全部 5 万条消息 ✅
  2. 下游数据库确认接收到的只有 3 万条 ❌
  3. 网络无丢包、MQ 无积压、DB 无死锁
  4. 应用日志无 error、无 warn、无 RejectedExecutionException

问题指向了处理链路内部——数据确实被消费了,但处理到一半消失了。

查看线程池配置

打开代码找到批处理使用的线程池:

// JDK 17
private final ThreadPoolExecutor batchExecutor = new ThreadPoolExecutor(
    2, 4,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadPoolExecutor.DiscardPolicy()    // ←
);

DiscardPolicyRejectedExecutionHandler 的一个实现——rejectedExecution 方法体是空的。

// java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 静默丢弃
    }
}

凌晨高峰时 5 万条任务瞬时涌入。2 个核心线程 + 1000 队列 = 瞬间打满。后续任务触发拒绝——DiscardPolicy 照单全收,全部静默丢弃。2 万条数据就这样消失了,没有留下任何痕迹。

最致命的组合:submit() + DiscardPolicy

更严重的是,这个线程池用的是 submit()

Future<?> future = batchExecutor.submit(() -> process(data));

submit() 将任务包装为 FutureTask,然后调用 execute(futureTask)。当 DiscardPolicy 丢弃这个 FutureTask 时:

  • execute() 没有抛异常
  • FutureTask 永远不会被执行
  • future.get() 永远阻塞——调用方线程被挂住

被丢弃的不只是任务数据,还有调用方线程。如果有 1000 个任务被丢弃,就有 1000 个线程卡在 get() 上。线程泄漏 + 数据丢失,一次选型失误两个后果。

还原:复现四种拒绝策略的行为

复现代码

直接运行四种策略下的线程池行为:

// JDK 17, 4 种拒绝策略对比
// 完整代码: demo/src/main/java/cn/opencao/concurrency/rejectionpolicy/RejectionPolicyDemo.java

static void testPolicy(String name, RejectedExecutionHandler handler)
        throws InterruptedException {
    var pool = new ThreadPoolExecutor(
        1, 1,
        1, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1),
        handler
    );
    var submitted = new AtomicInteger(0);
    var completed = new AtomicInteger(0);
    var rejected  = new AtomicInteger(0);

    // 提交 10 个任务,每个 sleep 500ms
    for (int i = 0; i < 10; i++) {
        try {
            pool.execute(() -> {
                try { Thread.sleep(500); } catch (Exception ignored) { }
                completed.incrementAndGet();
            });
            submitted.incrementAndGet();
        } catch (RejectedExecutionException e) {
            rejected.incrementAndGet();
        }
    }
    pool.shutdown();
    pool.awaitTermination(3, TimeUnit.SECONDS);
    System.out.printf("%-20s 提交=%d 完成=%d 拒绝异常=%d%n",
        name, submitted.get(), completed.get(), rejected.get());
}

四种策略运行结果对比

结果差异非常明显:

策略 提交数 完成数 拒绝异常 任务丢失
AbortPolicy 2 2 8 ❌(异常通知)
CallerRunsPolicy 10 10 0 ❌(调用方线程执行)
DiscardPolicy 10 2 0 ✅ 8 个静默丢失
DiscardOldestPolicy 10 3 0 ✅ 7 个静默丢失

DiscardPolicyDiscardOldestPolicy 的问题一目了然:调用方认为任务提交成功了(submit 返回 true,execute 没抛异常),但实际任务从未被执行

submit() 的陷阱更深

将上面测试改为 submit() 后观察:

for (int i = 0; i < 10; i++) {
    Future<?> future = pool.submit(() -> {
        try { Thread.sleep(500); } catch (Exception ignored) { }
        completed.incrementAndGet();
    });
    submitted.incrementAndGet();
    // future.get() ← 如果在这调用,线程将被永久阻塞
}

submit() 内部调用 execute()。当 DiscardPolicy 拒绝任务时,execute() 静默返回。FutureTask 从未被执行,也永远不会被标记为完成。

// ThreadPoolExecutor.submit()
public Future<?> submit(Runnable task) {
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);  // ← DiscardPolicy 在这里丢弃了 ftask
    return ftask;    // ← 返回的 Future 永远不会 complete
}

submit() + DiscardPolicy = Future 永远不会完成。 调用 get() 的线程永久阻塞。这是 DiscardPolicy 最危险的用法——任务丢了你不知道,线程丢了你还不知道。

submit vs execute 对比图

路径:排查"任务消失"的方法

如果怀疑任务被拒绝策略静默丢弃,三步确认。

第一步:检查 rejectedCount

Arthas 直接读取线程池的积压统计:

# Arthas: 查看 rejectedCount
vmtool --action getInstances \
  --className java.util.concurrent.ThreadPoolExecutor \
  --limit 10 -x 2 \
  | grep -E 'completedTaskCount|rejected|queue'

如果没有运行时监控,直接在代码里给所有线程池的 RejectedExecutionHandler 加日志包装:

// 生产推荐:拒绝日志包装
public class LoggingRejectionHandler implements RejectedExecutionHandler {
    private final RejectedExecutionHandler delegate;
    private final String poolName;

    public LoggingRejectionHandler(String poolName,
            RejectedExecutionHandler delegate) {
        this.poolName = poolName;
        this.delegate = delegate;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        log.warn("[{}] 任务被拒绝: queue={}, active={}, pool={}",
            poolName, e.getQueue().size(),
            e.getActiveCount(), e.getPoolSize());
        delegate.rejectedExecution(r, e);
    }
}

第二步:确认谁在静默丢弃

# Arthas: 查看 Handler 类型
ognl --classLoaderClass org.springframework.boot.loader.LaunchedURLClassLoader \
  '@beanFactory@getBean("batchExecutor").getRejectedExecutionHandler()' \
  -x 4

如果输出是 DiscardPolicyDiscardOldestPolicy——找到了问题。

为什么当初选了 DiscardPolicy?

回头看代码提交记录,原来的开发者写了这样一段注释:

.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
// 批处理偶尔超量,拒绝时不想抛异常影响主流程

动机很清楚:"不想让拒绝抛异常影响主流程"。但有一个隐含判断被忽略了——不抛异常不代表任务就能被处理。DiscardPolicy 只是把"拒绝"藏起来了,任务该丢还是丢。

这个场景需要的其实是两种方案之一:

  • CallerRunsPolicy:调用方线程执行多余任务,自动形成背压,不影响主流程且不丢数据
  • 自定义策略:记录拒绝信息 + 走降级逻辑

选 DiscardPolicy 的潜台词是"我接受了任务丢失是一个可接受的结果"。但实际业务需求证明——2 万条数据丢失是不可接受的。

"有时候,一个抛出来的异常比一个隐藏的 bug 更让人安心——至少你知道出了问题。"

第三步:检查调用方式是 submit 还是 execute

  • execute() 直接抛 RejectedExecutionException(可被 AbortPolicy 触发,DiscardPolicy 不触发)
  • submit() 的异常被封装在 Future 中,不调 get() 永远不会感知
  • submit() + DiscardPolicy 的组合会导致 Future.get() 永久阻塞

排查流程图

解读:四种拒绝策略的设计哲学

JDK 提供了四种内置 RejectedExecutionHandler,行为差异核心在于如何处理"线程池和队列都满了"这一信号。

各策略的行为本质

策略 响应方式 信号传递 推荐场景 风险
AbortPolicy(默认) 抛异常 即时(调用方 catch) 调用方能接受失败 上层没 catch 就 500
CallerRunsPolicy 调用方线程执行 背压(调用方变慢) 大部分场景推荐 调用方线程占满
DiscardPolicy 什么都不做 ❌ 几乎不适用 数据静默丢失
DiscardOldestPolicy 丢队列头任务 日志/监控指标 丢关键数据

为什么不推荐 DiscardPolicy

看看它的源码:

// java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }

空方法。不记录、不告警、不回调。和藏尸有什么区别?

"并发编程的 fail-fast 原则:出问题要出声,不出声的问题才是最大的问题。"

DiscardPolicy 违反了这个原则。一个静默丢弃任务的服务,从外面看一切正常——线程数正常、CPU 正常、没有错误日志。只有对账的时候才发现数据少了,而此时已经过去了十几小时甚至几天。

推荐的模式:自定义策略

// 生产推荐:记录 + 告警 + 降级
public class AlertOnRejectPolicy implements RejectedExecutionHandler {
    private final String poolName;

    public AlertOnRejectPolicy(String poolName) {
        this.poolName = poolName;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 1. 记录拒绝信息
        log.error("[{}] 拒绝任务: queue={}/{}, active={}/{}, pool={}/{}",
            poolName,
            e.getQueue().size(), 
            e.getQueue().remainingCapacity() + e.getQueue().size(),
            e.getActiveCount(), e.getMaximumPoolSize(),
            e.getPoolSize(), e.getMaximumPoolSize());

        // 2. 发告警
        alertService.send("线程池拒绝告警", poolName, ...);

        // 3. 执行降级逻辑
        if (r instanceof RejectedTask task) {
            task.fallback();
        }
    }
}

submit() vs execute():拒绝行为的差异根源

两者底层最终都走到 ThreadPoolExecutor.execute(Runnable),但拒绝后的行为完全不同:

// execute() 方式——异常直接抛出
executor.execute(task);
// 如果被拒绝 → throws RejectedExecutionException

// submit() 方式——异常被封装
Future<?> future = executor.submit(task);
// 如果被拒绝 → FutureTask 被丢弃,future 永远不会 complete
// future.get() ← 永久阻塞!

原因在于 submit() 将任务包装为 FutureTask,然后调用 execute(futureTask) 执行。DiscardPolicy 拒绝时丢弃的是 FutureTask,而调用方持有的 Future 引用指向这个永远不会被执行的 FutureTask

"submit() 不是 execute() 的安全包装——它们对拒绝的语义完全不同。"

submit vs execute 内部流程对比

标记:代码审查 grep 清单

在你的代码里搜这三个模式:

# 搜 DiscardPolicy / DiscardOldestPolicy——看到就得确认
grep -rn "DiscardPolicy\|DiscardOldestPolicy" src/main/java/

# 搜 new ThreadPoolExecutor——检查每个线程池的 handler
grep -rn "new ThreadPoolExecutor" src/main/java/

# 搜 Executors.newXxx——默认策略检查
grep -rn "Executors\." src/main/java/ | grep -E "ThreadPool|Fixed|Cached"

每个线程池创建都问三个问题: 1. 拒绝策略合适吗?——不要用 DiscardPolicy,除非真的不在乎丢数据 2. 用的是 submit 还是 execute?——submit + DiscardPolicy 的组合是线程池版的"已读不回" 3. 拒绝时有日志和告警吗?——没有监控的线程池等于没有消防警报的房子


金句:DiscardPolicy 不是并发策略,它是数据丢失的沉默许可。线程池满了不可怕,可怕的是它满了之后连吱一声都不会。

下篇我们聊核心线程数设错了——过大引发资源竞争、过小导致队列积压,这个看似简单的参数其实是最容易被低估的。

🔗 个人博客:https://opencao.cn 📺 公众号/知识星球:Ai拆代码的曹操