mysql-DuplicateUpdate和java的threadpool的"死锁"

《mysql-DuplicateUpdate和java的threadpool的

大家千万不要被文章的标题给迷惑了,他两在本篇文章是没有关系的, 今天给大家讲讲最近2个有意思的issue,分享一下我学到的

  • mysql DuplicateUpdate的用法要注意的点
  • java的threadpool使用不当会造成“死锁”问题

mysql DuplicateUpdate的用法要注意的点

有个issue说遇到了一个这样的问题, 《mysql-DuplicateUpdate和java的threadpool的

这个朋友使用我开源的job调度框架 https://github.com/yuzd/Hangfire.HttpJob

存储用的是mysql,采用的实现是 https://github.com/arnoldasgudas/Hangfire.MySqlStorage

set表的id是自增主键,正常理解 都是慢慢自增上去的,但是发现是大幅度跳跃式的自增, 真相是什么?

首先针对这个问题,首先我们搞清楚在hangfire中和storage相关的部分如下

《mysql-DuplicateUpdate和java的threadpool的 image

  • hangfire server调度依赖storage
  • storage抽象出来一层api(解耦)
  • 第三方扩展(不关心具体的storage实现)
  • 不同的storage具体实现(比如mysql,sqlserver等)

Hangfire.Httpjob其实只是依赖了storage api那一层,也没有能力去直接写sql去执行, 只能用api去操作hangfire的那几张表(比如set表)

那么问题肯定不是在扩展层,而是得去看看mysqlstorage的实现源码,针对set表的处理逻辑

https://github.com/arnoldasgudas/Hangfire.MySqlStorage/blob/0bd1016f715c8c6617ce22fb7b2ce5b6c328d2fb/Hangfire.MySql/MySqlWriteOnlyTransaction.cs#L155


  public override void AddToSet(string key, string value, double score)
    {
        Logger.TraceFormat("AddToSet key={0} value={1}", key, value);

        AcquireSetLock();
        QueueCommand(x => x.Execute(
            $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
            "VALUES (@Key, @Value, @Score) " +
            "ON DUPLICATE KEY UPDATE `Score` = @Score",
            new { key, value, score }));
    }

这里是用了ON DUPLICATE KEY UPDATE 的语句

这个语法是在mysql 4.1(2005)引入的,意思是 insert的时候遇到主键已存在 就执行后面 的update

但是就是这个功能 会造成自增主键成跳跃式增长,增长跨度和SQL的执行次数成正比

根据朋友提供的截图

《mysql-DuplicateUpdate和java的threadpool的 image

虽说是会跳跃,但是这个增长也太夸张了

打上断点调试发现

是hangfire server 不断的在调用,目的是把下一次执行时间(秒级别的时间戳)写到set表中

《mysql-DuplicateUpdate和java的threadpool的 image
《mysql-DuplicateUpdate和java的threadpool的 image
《mysql-DuplicateUpdate和java的threadpool的 image

打上日志可以看到有非常多相同值的调用,这仅仅是一个job,这个自增速度得再乘以job的个数,难怪了

既然找到原因了,就提个PR 修改下


 public override void AddToSet(string key, string value, double score)
        {
            Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
        
            AcquireSetLock();
            QueueCommand(x =>
            {
                var sql = "";
                if (key == "recurring-jobs") // 只发现这个key存在这个问题
                {
                     // key+value是uniq 改成先update 如果没有成功 再insert
                    sql = $"UPDATE `{_storageOptions.TablesPrefix}Set` set `Score` = @score where `Key` = @key and `Value` = @value";
                    var updateRt = x.Execute(sql, new { score = score, key = key, value = value });
                    if (updateRt < 1)
                    {
                        sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                              "VALUES (@Key, @Value, @Score) ";
                        x.Execute(
                            sql,
                            new { key, value, score });
                    }
                }
                else
                {
                    sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                          "VALUES (@Key, @Value, @Score) " +
                          "ON DUPLICATE KEY UPDATE `Score` = @Score";
                   x.Execute(
                       sql,
                       new { key, value, score });
                }
        
                //Console.WriteLine(sql + " ==> " + key + "@" + value + "@" + score);
            });
        }

改完之后测试,id自增一切正常:

《mysql-DuplicateUpdate和java的threadpool的 image

java的threadpool使用不当会造成“死锁”问题

《mysql-DuplicateUpdate和java的threadpool的 image

这个原因先说出来: threadpool的线程被占用完后,再来的task会往queue里面丢,如果这个时候在这个pool的线程里面 future.get()的话会导致task runner(执行器)被堵住,没人从队列里面取任务了~

(简单来说就是 线程在wait future返回,而这个future在queue里面苦苦等待新释放的线程去执行,就像死锁一样,我在等你的结果,而结果在等待着被执行)

好家伙,这个场景有点熟悉,因为我在项目中也用过Future.get()// 虽说有设置timeout

但是这个问题的重要一点是,这种花式“死锁” jvm是检测不出来的,下面有测试

模拟一下这个场景:

我搞了2个线程池,分别是nio线程池和业务线程池,模拟并发20个请求, 注意看process2方法里的注释,如果去掉那里的代码的话 就不会有这个死锁问题


/**
 * @author yuzd
 */
public class PoolTest {

    // 模拟nio线程池
    static ThreadPoolExecutor nioExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new CustomerNamedThreadFactory("nio", false),
            new ThreadPoolExecutor.AbortPolicy());

    // 业务线程池
    static ThreadPoolExecutor buExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new CustomerNamedThreadFactory("bu", true),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        // 模拟是http请求并发20个
        IntStream.rangeClosed(1, 20).parallel().forEach((index) -> {
            // 交给nio线程池处理            
            nioExecutor.execute(() -> {
                try {
                    httpHandler(index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        });
    }


   
    static void httpHandler(int index) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + " request index :" + index + " staring");
        // 交给业务线程池处理     
       
        Future<String> parentFuture = buExecutor.submit(() -> process1(index));
        String p1Rt = parentFuture.get();  // nio线程在wait
        System.out.println(Thread.currentThread().getName() + " request index :" + p1Rt + " ending");
    }

    // future1
    static String process1(int index) throws ExecutionException, InterruptedException {
        System.out.println( Thread.currentThread().getName() + " process1 index :" + index + " staring");
        Future<String> childFuture = buExecutor.submit(() -> process2(index));
        String p2Rt = childFuture.get();  // 这里是bu线程在wait   这里会发生死锁
        
        System.out.println(Thread.currentThread().getName() + " process1 index :" + index + " ending");
        return p2Rt;
    }

    // future2
    static String process2(int index) throws InterruptedException, ExecutionException {
        System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " staring");
        // 加上就会死锁 
        // 只要不一下子产生足够数量的task(把core全部占掉)就不会死锁 加了这里就会把core全部占据 导致task进入到queue,core线程在wait future.get 无法被释放, 而queue的任务在等待它释放产生新的线程
        Future<String> submit = buExecutor.submit(() -> {
            try {
                Thread.sleep(1000);
                return String.valueOf(index);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        submit.get(); 
        System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " ending");
        return String.valueOf(index);
    }
}

用visualvm分析线程dump,很难直接发现有异常,异步的很难检测,排查起来比较复杂,只看到是在wait

《mysql-DuplicateUpdate和java的threadpool的 image

用jstack没有发现deadlock

《mysql-DuplicateUpdate和java的threadpool的 image

在实际项目中我也看到过一个项目中共用一个线程池,线程池被封装成一个util方法,要执行异步的都用它,这个场景尤其要注意这个场景,也建议大家用带有超时的方式 Future.get(xxxx)

 

《mysql-DuplicateUpdate和java的threadpool的

 

《mysql-DuplicateUpdate和java的threadpool的

 

    原文作者:俞正东
    原文地址: https://www.cnblogs.com/yudongdong/p/16945021.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞