Seata Server 1.5.2 源码学习

Seata 包括 Server端和Client端。Seata中有三种角色:TC、TM、RM,其中,Server端就是TC,TM和RM属Client端。Client端的源码学习上一篇已讲过,详见 《Seata 1.5.2源码学习》,今天来学习Server端的源码。

源码下载地址:https://github.com/seata/seata

启动类 ServerApplication 没什么好说的,重点是ServerRunner

《Seata Server 1.5.2 源码学习》

ServerRunner 是一个 CommandLineRunner 实例,因此在Spring Boot启动完成后会回调其run()方法。而在ServerRunner的run()方法中调用了Server.start()方法。

《Seata Server 1.5.2 源码学习》

在Server#start()方法中,初始化了包括id生成器在内的很多组件,我们先不管这些,重点关注以下几行代码:

NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

DefaultCoordinator是一个单例Bean,在整个应用中只有一个DefaultCoordinator实例

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

DefaultCoordinator 实现了 TransactionMessageHandler

NettyRemotingServer#setHandler()设置的正是TransactionMessageHandler

《Seata Server 1.5.2 源码学习》

DefaultCoordinator#onRequest()

《Seata Server 1.5.2 源码学习》

重点是这三行:

AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
transactionRequest.handle(context);

DefaultCoordinator实现了TCInboundHandler接口,所以它不仅是一个TransactionMessageHandler,还是一个TCInboundHandler

这里transactionRequest.setTCInboundHandler(this),就是指定AbstractTransactionRequestToTC中的TCInboundHandler设置为DefaultCoordinator

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

AbstractTransactionRequest#handle()

《Seata Server 1.5.2 源码学习》

不同的请求分发给对应的处理器去处理

《Seata Server 1.5.2 源码学习》

现在请求和对应的处理器都有了,下面具体看一下每种请求都是如何被处理的

1. 开启全局事务

《Seata Server 1.5.2 源码学习》

开启事务直接调用子类DefaultCoordinator#doGlobalBegin(),同时放在一个处理模板中执行

《Seata Server 1.5.2 源码学习》

在doGlobalBean()中调用DefaultCore#begin()并返回全局事务ID(xid)

《Seata Server 1.5.2 源码学习》

new GlobalSession()

《Seata Server 1.5.2 源码学习》

添加一个SessionManager作为Session的监听器

《Seata Server 1.5.2 源码学习》

Core

《Seata Server 1.5.2 源码学习》

总结一下,开启事务:

  1. 创建一个GlobalSession
  2. 给GlobalSession添加一个监听器SessionManager
  3. session.begin()

《Seata Server 1.5.2 源码学习》

开启事务,创建一个全局事务,如果是seata.store.mode=db的话,向global_table表插入一条记录

2. 分支事务注册

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

DefaultCore#branchRegister()

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

如果是AT模式,这里调用的就是ATCore#branchSessionLock()

ATCore#branchSessionLock()检查是否拿到锁了

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

具体每种锁的实现就不往下看了,挑其中一个看下,就RedisLocker吧

《Seata Server 1.5.2 源码学习》

总之,分支注册的时候需要检查锁,拿到本次事务中所涉及的所有需要加锁的行的锁才能注册成功

所有行都加锁成功,分支注册才算成功,才会返回true

再回到AbstractCore#branchRegister(),整个方法是放在SessionHolder#lockAndExecute()中执行的

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

总结一下,分支注册:

  1. 创建一个BranchSession
  2. 加锁,获取所有行的锁
  3. 将BranchSession加到GlobalSession中
  4. 返回branchId

《Seata Server 1.5.2 源码学习》

分支注册,创建BranchSession,获取全局锁成功后将branchSession加入globalSession

3. 提交全局事务

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

首先判断全局事务状态是否为begin,如果不是则不应该提交。如果是,则将事务active置为false,释放全局锁,判断是否可以异步提交。分支类型是AT的都可以异步提交,因此AT模式,默认是异步提交。如果不能异步提交,则采取同步提交。

3.1. 同步提交

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

遍历所有已注册的分支事务,向分支发送同步请求,告诉它全局事务开始提交了,不出意外的情况下返回分支状态是二阶段提交成功。当所有分支都提交成功,则返回true,于是全局事务提交成功,返回全局事务状态为已提交。如果有分支提交失败,则返回false,全局事务提交失败,返回全局状态为提交失败。如果抛异常了,则会有定时任务稍后重试提交。

3.2. 异步提交

《Seata Server 1.5.2 源码学习》

异步提交只是将全局状态置为异步提交中,剩下的事情交给定时任务去执行

启动的时候调用了DefaultCoordinator#init()方法,启动定时任务

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

每次,定时任务执行前,要先获取一把分布式锁,这个锁是io.seata.core.store.DistributedLocker,不是分支注册时的那把锁io.seata.core.lock.Locker

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

异步提交首先将全局状态设置为AsyncCommitting,返回返回全局状态Committed。后台有定时任务扫描,找到所有状态为AsyncCommitting的全局事务,循环遍历。对于每个全局事务提交,调用DefaultCore#doGlobalCommit(),遍历所有已注册的分支事务,向分支事务发请求,通知其提交事务,分支事务返回二阶段提交成功,表示该分支事务提交成功,当所有分支事务都二阶段提交成功,则全局事务提交成功。

《Seata Server 1.5.2 源码学习》

4. 回滚全局事务

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

回滚,首先检查全局事务状态是否为Begin,不是的话直接结束。遍历当前全局事务中已注册的分支事务,依次给每个分支发请求,告诉分支事务需要回滚。如果所有分支返回回滚成功,则全局回滚成功。如果有分支回滚失败且不重试,则直接直接结束。如果失败且可重试,或者执行过程中抛异常,则稍后会有定时任务重试回滚操作。

《Seata Server 1.5.2 源码学习》

5. 分支上报

RM向TC报告分支事务状态

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

只是更新一下分支状态及相关数据

6. 查询全局事务状态

《Seata Server 1.5.2 源码学习》

7. 查询全局锁

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

《Seata Server 1.5.2 源码学习》

挑RedisLocker看一下吧

《Seata Server 1.5.2 源码学习》

关于Seata Server 的源码学习就先到这里,欢迎交流,多谢点赞 (^_^)

    原文作者:废物大师兄
    原文地址: https://www.cnblogs.com/cjsblog/p/16878067.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞