分布式事务之解决方案(最大努力通知)

6.1. 什么是最大努力通知

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

交互流程 :
1、账户系统调用充值系统接口
2、充值系统完成支付处理向账户系统发起充值结果通知
若通知失败,则充值系统按策略进行重复通知
3、账户系统接收到充值结果通知修改充值状态
4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果
通过上边的例子我们总结最大努力通知方案的目标 :
目标 :发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。
具体包括 :
1、有一定的消息重复通知机制。
因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。
如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
最大努力通知与可靠消息一致性有什么不同?
1、解决方案思想不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。
最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2、两者的业务应用场景不同
可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费(业务处理结果)。

6.2. 解决方案

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。
方案1 :

本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下 :
1、发起通知方将通知发给MQ。
使用普通消息机制将通知发给MQ。
注意 :如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。
2、接收通知方监听MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、接收通知方若没有回应ack则MQ会重复通知。
MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
5、接收通知方可通过消息校对接口来校对消息的一致性。
方案2 :
本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图 :

交互流程如下 :
1、发起通知方将通知发给MQ。
使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。
2、通知程序监听MQ,接收MQ的消息。
方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。
通知程序若没有回应ack则MQ会重复通知。
3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。
通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。
4、接收通知方可通过消息校对接口来校对消息的一致性。
方案1和方案2的不同点 :
1、方案1中接收通知方与MQ接口,即接收通知方案监听MQ,此方案主要应用与内部应用之间的通知。
2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

6.3.RocketMQ实现最大努力通知型事务

6.3.1.业务说明

本实例通过RocketMq中间件实现最大努力通知型分布式事务,模拟充值过程。
本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank1数据库,其中有张三账户。充值系统的数据库使用bank1_pay数据库,记录了账户的充值记录。
业务流程如下图 :

交互流程如下 :
1、用户请求充值系统进行充值。
2、充值系统完成充值将充值结果发给MQ。
3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额。
4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

6.3.2.程序组成部分

本示例程序组成部分如下 :
数据库:MySQL-5.7.25
包括bank1和bank1_pay两个数据库。
JDK:64位 jdk1.8.0_201
rocketmq 服务端:RocketMQ-4.5.0
rocketmq 客户端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
微服务及数据库的关系 :
dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 银行2,操作充值记录,连接数据库bank1_pay

交互流程如下 :
1、用户请求充值系统进行充值。
2、充值系统完成充值将充值结果发给MQ。
3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额。
4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

6.3.3.创建数据库

创建bank1库,并导入以下表结构和数据(包含张三账户)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户 主姓名',`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT'帐户密码',`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

创建bank1_pay库,并导入以下表结构:

CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (`id` varchar(64) COLLATE utf8_bin NOT NULL,`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账号', `pay_amount` double NULL DEFAULT NULL COMMENT '充值余额',`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值结果:success,fail',PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

6.3.4.启动RocketMQ

rocketMQ启动方式与RocketMQ实现可靠消息最终一致性事务中完全一致

6.3.5.discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

6.3.6.工程概述

(1)父工程maven依赖说明
在dtx父工程中指定了SpringBoot和SpringCloud版本

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐dependencies</artifactId>         <version>2.1.3.RELEASE</version><type>pom</type><scope>import</scope></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring‐cloud‐dependencies</artifactId> <version>Greenwich.RELEASE</version>
 <type>pom</type><scope>import</scope></dependency>

在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。

<dependency><groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq‐spring‐boot‐starter</artifactId> <version>2.0.2</version></dependency>

(2)配置rocketMQ
在application-local.properties中配置rocketMQ nameServer地址及生产组 :

rocketmq.producer.group = producer_bank2
rocketmq.name-server = 127.0.0.1:9876

6.3.7 dtx-notifydemo-pay

dtx-notifydemo-pay实现如下功能 :
1、充值接口;
2、充值完成要通知;
3、充值结果查询接口。
(2)Dao

@Mapper@Componentpublic interface AccountPayDao {@Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);@Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")AccountPay findByIdTxNo(@Param("txNo") String txNo); }

(3)Service

@Service@Slf4jpublic class AccountPayServiceImpl implements AccountPayService{@AutowiredRocketMQTemplate rocketMQTemplate;@AutowiredAccountPayDao accountPayDao;@Transactional@Overridepublic AccountPay insertAccountPay(AccountPay accountPay) {int result = accountPayDao.insertAccountPay(accountPay.getId(),accountPay.getAccountNo(), accountPay.getPayAmount(), "success");if(result>0){ //发送通知rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);return accountPay; }return null; }@Overridepublic AccountPay getAccountPay(String txNo) {AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
 return accountPay;
 }}

(4)Controller

@RestControllerpublic class AccountPayController {@AutowiredAccountPayService accountPayService;//充值@GetMapping(value = "/paydo")public AccountPay pay(AccountPay accountPay){//事务号String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo);return accountPayService.insertAccountPay(accountPay);}//查询充值结果@GetMapping(value = "/payresult/{txNo}")public AccountPay payresult(@PathVariable("txNo") String txNo){return accountPayService.getAccountPay(txNo); }}

6.3.8 dtx-notifydemo-bank1

dtx-notifydemo-bank1实现如下功能 :
1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。
2、主动查询充值系统,根据充值结果完成账户金额修改。
1)Dao

@Mapper@Componentpublic interface AccountInfoDao {//修改账户金额@Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);//查询幂等记录,用于幂等控制@Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo);//添加事务记录,用于幂等控制@Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo);}

2)AccountInfoService

@Service@Slf4jpublic class AccountInfoServiceImpl implements AccountInfoService {@AutowiredAccountInfoDao accountInfoDao;@AutowiredPayClient payClient; /**
* 更新账号余额,并发送消息 *
* @param accountChange */@Transactional@Overridepublic void updateAccountBalance(AccountChangeEvent accountChange) {//幂等校验int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if(existTx >0){log.info("已处理消息:{}", JSONObject.toJSONString(accountChange));return ; }//添加事务记录 accountInfoDao.addTx(accountChange.getTxNo()); //更新账户金额accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()); }/**
* 主动查询充值结果 *
* @param tx_no */@Overridepublic AccountPay queryPayResult(String tx_no) {//主动请求充值系统查询充值结果AccountPay accountPay = payClient.queryPayResult(tx_no); //充值结果String result = accountPay.getResult(); log.info("主动查询充值结果:{}", JSON.toJSONString(accountPay)); if("success".equals(result)){AccountChangeEvent accountChangeEvent = new AccountChangeEvent();accountChangeEvent.setAccountNo(accountPay.getAccountNo());accountChangeEvent.setAmount(accountPay.getPayAmount());accountChangeEvent.setTxNo(accountPay.getId());updateAccountBalance(accountChangeEvent);}return accountPay; }}
@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)public interface PayClient {@GetMapping("/pay/payresult/{txNo}")AccountPay queryPayResult(@PathVariable("txNo") String txNo); }@Componentpublic class PayFallback implements PayClient {@Overridepublic AccountPay queryPayResult(String txNo) {AccountPay accountPay = new AccountPay();accountPay.setResult("fail");return accountPay;} }

3)监听MQ

@Component@Slf4j@RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1") public class NotifyMsgListener implements RocketMQListener<AccountPay> {@AutowiredAccountInfoService accountInfoService;@Overridepublic void onMessage(AccountPay accountPay) {log.info("接收到消息:{}", JSON.toJSONString(accountPay)); AccountChangeEvent accountChangeEvent = new AccountChangeEvent();accountChangeEvent.setAmount(accountPay.getPayAmount());accountChangeEvent.setAccountNo(accountPay.getAccountNo());accountChangeEvent.setTxNo(accountPay.getId());accountInfoService.updateAccountBalance(accountChangeEvent); log.info("处理消息完成:{}", JSON.toJSONString(accountChangeEvent));} }

4)Controller

@RestController@Slf4jpublic class AccountInfoController {@Autowiredprivate AccountInfoService accountInfoService;//主动查询充值结果@GetMapping(value = "/payresult/{txNo}")public AccountPay result(@PathVariable("txNo") String txNo){AccountPay accountPay = accountInfoService.queryPayResult(txNo);return accountPay; }}

6.3.9 测试场景

  • 充值系统充值成功,账户系统主动查询充值结果,修改账户金额。

  • 充值系统充值成功,发送消息,账户系统接收消息,修改账户金额。

  • 账户系统修改账户金额幂等测试。

6.4. 小结

最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;
最大努力通知方案需要实现如下功能 :
1、消息重复通知机制。
2、消息校对机制。

(0)

相关推荐

  • 优秀程序员必备技能之如何高效阅读源码

    "我能熟练使用这个框架/软件/技术就行了, 为什么要看源码?" "平时不用看源码, 看源码太费时间,还容易忘记,工作中出现问题再针对性地阅读,效率更高." &q ...

  • RocketMQ-入门

    RocketMQ是什么 RocketMQ是一个分布式消息和流数据平台,具有低延迟.高性能.高可靠性.万亿级容量和灵活的可扩展性.RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件,201 ...

  • 分布式事务之解决方案(可靠消息最终一致性)

    5. 分布式事务解决方案之可靠消息最终一致性 5.1. 什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起执行完全本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处 ...

  • 探索RocketMQ的重复消费和乱序问题

    前言 在之前的MQ专题中,我们已经解决了消息中间件的一大难题,消息丢失问题. 但MQ在实际应用中不是说保证消息不丢失就万无一失了,它还有两个令人头疼的问题:重复消费和乱序. 今天我们就来聊一聊这两个常 ...

  • 分布式事务之解决方案(XA和2PC)

    3. 分布式事务解决方案之2PC(两阶段提交) 针对不同的分布式场景业界常见的解决方案有2PC.TCC.可靠消息最终一致性.最大努力通知这几种. 3.1. 什么是2PC 2PC即两阶段提交协议,是将整 ...

  • 聊聊分布式事务,再说说解决方案

    来源:https://www.cnblogs.com/savorboard/p/7679902.html 作者:Savorboard 前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一 ...

  • 分布式事务概述及大厂通用解决方案

    分布式事务概述及大厂通用解决方案

  • 分布式事务的四种解决方案

    例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务. 解决方案 在分布式系统中,要实现分布式事务,无外乎那几种解决方案. 一.两阶段提交(2PC) 两阶段提交(Two-phase Co ...

  • 分布式事务最经典的七种解决方案!

    优质文章,第一时间送达 随着业务的快速发展.业务复杂度越来越高,几乎每个公司的系统都会从单体走向分布式,特别是转向微服务架构.随之而来就必然遇到分布式事务这个难题,这篇文章总结了分布式事务最经典的解决 ...

  • 分布式事务解决方案及实现

    一.事务的ACID原则 数据库事务的几个特性:原子性(Atomicity ).一致性( Consistency ).隔离性或独立性( Isolation)和持久性(Durabilily),简称就是AC ...

  • 分布式事务:两阶段提交与三阶段提交

    在分布式系统中著有 CAP 理论,该理论由加州大学伯克利分校的 Eric Brewer 教授提出,阐述了在一个分布式系统中不可能同时满足 一致性(Consistency).可用性(Availabili ...

  • 分布式事务+DDD+负载均衡+服务治理已撸!微服务不就这点事?

    Go语言中文网 今天 最近有看到"微服务,分久必合.合久必分"的言论,我同意,微服务不是架构演变的终点,细说还有Serverless.FaaS等方向.但纠结要不要拆分是没有必要的, ...