国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

springboot整合rocketmq實現(xiàn)分布式事務(wù)

瀏覽:5日期:2023-03-08 14:54:37
目錄1 執(zhí)行流程2 工程2.1 pom2.2 application.yml2.3 TransactionListenerImpl2.4 SpringTransactionProducer2.5 SpringTxConsumer2.6 ProducerController2.7 RocketApplication3 測試3.1 正常消費(fèi)測試3.2 回查代碼測試1 執(zhí)行流程

springboot整合rocketmq實現(xiàn)分布式事務(wù)

(1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半消息。(3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。(4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。(5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時間后MQ Server 將對該消息發(fā)起消息回查。(6) 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。(7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對半消息進(jìn)行操作。

2 工程

springboot整合rocketmq實現(xiàn)分布式事務(wù)

2.1 pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent> <properties><java.version>1.8</java.version> </properties> <dependencies><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version></dependency> </dependencies> <build><plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version> </plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration> <source>1.8</source> <target>1.8</target></configuration> </plugin></plugins> </build>2.2 application.yml

rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = 'transaction-producer-group')@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執(zhí)行業(yè)務(wù)邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try { System.out.println('用戶A賬戶減500元.'); System.out.println('用戶B賬戶加500元.'); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) { e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info('回查消息 -> transId ={} , state = {}', transId, STATE_MAP.get(transId));return STATE_MAP.get(transId); }}2.4 SpringTransactionProducer

@Component@Slf4jpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發(fā)送消息 * */ public void sendMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendMessageInTransaction('transaction-producer-group', topic, message, null);log.info('發(fā)送成功'); }}2.5 SpringTxConsumer

@Component@RocketMQMessageListener(topic = 'pay_topic',consumerGroup = 'transaction-consumer-group',selectorExpression = '*')@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) {log.info('接收到消息 -> {}', msg); }}2.6 ProducerController

@RestController@RequestMapping('/producer')public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping('/sendMsg') public String sendMsg() {springTransactionProducer.sendMsg('pay_topic', '用戶A賬戶減500元,用戶B賬戶加500元。');return '發(fā)送成功'; }}2.7 RocketApplication

@SpringBootApplicationpublic class RocketApplication { public static void main(String[] args) {SpringApplication.run(RocketApplication.class); }}3 測試3.1 正常消費(fèi)測試

描述: 正常啟動及可。

springboot整合rocketmq實現(xiàn)分布式事務(wù)

springboot整合rocketmq實現(xiàn)分布式事務(wù)

3.2 回查代碼測試

描述: 執(zhí)行本地事務(wù)時添加異常,重啟測試,發(fā)現(xiàn)消費(fèi)者沒有收到消息。

springboot整合rocketmq實現(xiàn)分布式事務(wù)

springboot整合rocketmq實現(xiàn)分布式事務(wù)

springboot整合rocketmq實現(xiàn)分布式事務(wù)

到此這篇關(guān)于springboot整合rocketmq實現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)springboot 分布式事務(wù)內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Spring
相關(guān)文章:
主站蜘蛛池模板: 国产免费一区不卡在线 | 免费一级a毛片 | 久久久久日韩精品无 | 波多野结衣一区二区在线 | 91在线永久 | 亚洲一区浅井舞香在线播放 | 九九在线免费视频 | 成人毛片在线观看 | 国产精品久久久久久影院 | 欧美亚洲网站 | 男女一级 | 毛片视频在线免费观看 | 欧美精品hdxxxxx| 精品免费国产一区二区三区 | a级片在线观看 | 97久久精品午夜一区二区 | 欧美精品国产一区二区三区 | 国产精品免费观在线 | 日韩在线视频免费 | 三级精品在线观看 | 国产91香蕉视频 | 午夜免费片在线观看不卡 | 国产成人在线免费观看 | 精品国产一区二区三区在线 | 国产精品天堂avav在线 | 欧美三级不卡在线观看视频 | a免费视频 | 91tv成人影院免费 | 亚洲美女精品视频 | 久久久久综合给合狠狠狠 | 日韩在线不卡一区在线观看 | 在线毛片免费 | 久草久| 真人一级毛片 | 欧美人牲囗毛片 | 两性午夜视频 | av国产精品 | 自拍成人 | 国产欧美亚洲三区久在线观看 | 九九视频精品全部免费播放 | 亚洲精品毛片久久久久久久 |