RocketMQ专题02


1. 案例介绍

B站视频:黑马程序员RocketMQ系统精讲 P35 - P88

手敲shop源码: 代码地址

1.1 业务分析

模拟电商网站购物场景中的【下单】和【支付】业务。

1.1.1 下单

在这里插入图片描述

  • 用户请求订单系统下单.
  • 订单系统通过RPC调用订单服务下单.
  • 订单服务调用优惠券服务,扣减优惠券
  • 订单服务调用库存服务,校验并扣减库存
  • 订单服务调用用户服务,扣减用户余额
  • 订单服务完成确认订单

1.1.2 支付

在这里插入图片描述

  • 用户发起支付请求
  • 支付系统调用第三方支付平台API进行发起支付流程
  • 用户通过第三方支付平台支付成功后, 第三方支付平台回调通知支付系统
  • 支付系统调用订单服务修改订单状态
  • 支付系统调用积分服务添加积分
  • 支付系统调用日志服务记录日志

1.2 问题分析

1.2.1 问题1

用户提交订单后,扣减库存成功,扣减优惠券成功,使用余额成功; 但是在确认订单操作失败, 需要对库存,优惠券,余额进行回退。如果保证数据的完整性?
在这里插入图片描述
UML图, 使用MQ保证在下单失败后系统数据的完整性。
在这里插入图片描述

1.2.2 问题2

用户通过第三方支付平台(支付宝,微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户的支付结果, 支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。

商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付平台做出回应?
在这里插入图片描述
通过MQ进行数据分发,提高系统处理性能。
在这里插入图片描述

2. 技术分析

2.1 技术选型

  • SpringBoot
  • Dubbo
  • Zookeeper
  • RocketMQ
  • MySql

在这里插入图片描述

2.2 SpringBoot整合RocketMQ

下载 rocketmq-spring 项目

rocketmq-spring安装到本地仓库, 对应的就是依赖 rocketmq-spring-boot-starter

mvn install -Dmaven.skip.test=true

2.2.1 消息生产者

1)添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.crysw</groupId>
    <artifactId>boot-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>boot-rocketmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--springboot整合rocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
       <!--单独引入rocketMQ-->
       <!-- <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.3</version>
        </dependency>-->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2) 配置文件

application.properties文件添加如下配置内容:

# nameserver
rocketmq.name-server=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 生产者组名
rocketmq.producer.group=my-group

3)启动类

@SpringBootApplication
public class BootRocketmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(BootRocketmqApplication.class, args);
    }
}

4)测试类

@SpringBootTest
public class ProducerTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testSendMsg() {
        rocketMQTemplate.convertAndSend("springboot-rocketmq", "hello springboot-rocketmq");
    }
}

查看rocketmq-console控制台
在这里插入图片描述

2.2.2 消息消费者

1)添加依赖

同消息生产者。

2)配置文件

# nameserver
rocketmq.name-server=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 消费者组名
rocketmq.consumer.group=my-group

3)启动类

同消息生产者。

4)消息监听器

@Component // 将消息监听器注入中IOC容器中
@RocketMQMessageListener(topic = "springboot-rocketmq", consumerGroup = "${rocketmq.consumer.group}")
@Slf4j
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到消息: " + s);
    }
}

5)测试

启动应用后,查看消费者打印的日志

2022-06-22 22:31:58.228  INFO 17644 --- [           main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='my-group', namespace='', nameServer='rocketmq-nameserver1:9876;rocketmq-nameserver2:9876', topic='springboot-rocketmq', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING', tlsEnable=false}
2022-06-22 22:31:58.230  INFO 17644 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:consumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2022-06-22 22:31:58.270  INFO 17644 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-22 22:31:58.286  INFO 17644 --- [           main] c.c.b.BootRocketmqApplication            : Started BootRocketmqApplication in 7.152 seconds (JVM running for 9.301)
# 消费的消息内容
接收到消息: hello springboot
接收到消息: hello springboot-rocketmq

2.3 SpringBoot整合Dubbo

下载 dubbo-spring-boot-starter 依赖包。

dubbo-spring-boot-starter安装到本地仓库,其实下面引入依赖也是一样的。

mvn install -Dmaven.skip.test=true

dubbo调用的流程图
在这里插入图片描述

  • start:
  • register:注册服务提供方到注册中心zookeeper
  • subscribe:服务消费者到注册中心查找服务提供方信息
  • notify:服务提供方信息发生变更,注册中心通知到服务消费者
  • invoke:服务消费者发起远程调用
  • count:上报服务提供方和消费方信息进行监控,从而实现服务治理

2.3.1 搭建Zookeeper集群

1)准备工作

  • 安装JDK

  • 将Zookeeper上传到服务器

  • 解压Zookeeper到服务器指定目录, 并创建data目录,将conf下的zoo_sample.cfg文件改为zoo.cfg

    tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz
    mv zoo_sample.cfg zoo.cfg
    
  • 建立/usr/local/zookeeper-cluster, 将解压后的Zookeeper复制到以下三个目录

    # 复制三份
    cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-1
    cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-2
    cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-3
    
  • 在zoo.cfg中配置每一个Zookeeper的dataDir, clientPort分别为2181、2182、2183;

    修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

    clientPort=2181
    dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
    

    修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

    clientPort=2182
    dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
    

    修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

    clientPort=2183
    dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
    

2)配置集群

  • 在每个zookeeper的data目录下创建一个myid文件,内容分别是1,2,3。这个文件就是记录每个服务器的ID。

    touch zookeeper-1/data/myidmv my
    touch zookeeper-2/data/myid
    touch zookeeper-3/data/myid
    
  • 在每一个zookeeper的zoo.cfg配置客户端访问端口(clientPort)和集群服务器IP列表。集群服务器IP列表如下:

    server.1=192.168.65.129:2881:3881
    server.2=192.168.65.129:2882:3882
    server.3=192.168.65.129:2883:3883
    

    server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口

3) 启动集群

启动集群, 即分别启动每个zk实例:

[root@centos7-01 zookeeper-cluster]# ./zookeeper-1/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/zookeeper-1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@centos7-01 zookeeper-cluster]# ./zookeeper-2/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@centos7-01 zookeeper-cluster]# ./zookeeper-3/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/zookeeper-3/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

2.3.2 RPC服务接口

编写服务提供方的接口。

public interface IUserService {

    String sayHello(String name);
}

2.3.3 服务提供方

1)添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.crysw.shop</groupId>
    <artifactId>springboot-dubbo-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-dubbo-provider</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <!--dubbo-->
        <dependency>
            <groupId>com.alibaba.spring.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!--interface接口依赖-->
        <dependency>
            <groupId>com.crysw.shop</groupId>
            <artifactId>springboot-dubbo-interface</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.9</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2)配置文件内容

# application.properties
server.port=8080
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880

3)启动类

package com.crysw.shop.provider;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author crysw
 */
@SpringBootApplication
@EnableDubboConfiguration // 开启dubbo配置
public class SpringbootDubboProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootDubboProviderApplication.class, args);
    }
}

4)提供接口服务的实现

package com.crysw.shop.provider.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.crysw.shop.infc.service.IUserService;
import org.springframework.stereotype.Component;

/**
 * 描述:服务提供方的接口实现
 * @author crysw
 * @date 2022/6/29 22:17
 * @version 1.0
 */
// 为了不与dubbo包下的@service冲突,这里使用@Component声明Bean
@Component
// 注意应该为dubbo包下的@Service, 与接口进行绑定
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService {
    @Override
    public String sayHello(String name) {
        return "hello, dubbo, i'm " + name;
    }
}

2.3.4 服务消费方

1)添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.crysw.shop</groupId>
    <artifactId>springboot-dubbo-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-dubbo-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!--dubbo-->
        <dependency>
            <groupId>com.alibaba.spring.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!--interface接口依赖-->
        <dependency>
            <groupId>com.crysw.shop</groupId>
            <artifactId>springboot-dubbo-interface</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.9</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2)配置文件

server.port=8081
spring.application.name=dubbo-demo-consumer
spring.dubbo.application.name=dubbo-demo-consumer
spring.dubbo.application.id=dubbo-demo-consumer
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183

3)启动类

package com.crysw.shop.consumer;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author crysw
 */
@SpringBootApplication
@EnableDubboConfiguration
public class SpringbootDubboConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootDubboConsumerApplication.class, args);
    }
}

4)编写Controller

package com.crysw.shop.consumer.controller;

import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.shop.infc.service.IUserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 描述:
 * @author crysw
 * @date 2022/7/2 14:25
 * @version 1.0
 */
@RestController
@RequestMapping("/user")
public class UserController {

    @Reference // dubbo的注解
    private IUserService userService;

    @RequestMapping("/sayHello")
    public String sayHello(String name) {
        return userService.sayHello(name);
    }
}

2.3.5 测试dubbo接口调用

1)启动服务提供方

查看启动日志,连接zookeeper成功。
在这里插入图片描述
启动服务消费方,一样连接上zookeeper,并从zk注册中心获取服务提供方的主机,端口等信息。在浏览器访问http://localhost:8081/user/sayHello?name=crysw
在这里插入图片描述

3. 环境搭建

3.1 数据库

1)优惠券表

表名:trade_coupon

FieldTypeComment
coupon_idbigint(50) not null优惠券ID
coupon_pricedecimal(10,2) null优惠券金额
user_idbigint(50) null用户ID
order_idbigint(32) null订单ID
is_usedint(1) null是否使用 0-未使用 1-已使用
used_timetimestamp null使用时间

2)商品表

表名:trade_goods

FieldTypeComment
goods_idbigint(50) not null主键
goods_namevarchar(255) null商品名称
goods_numberint(11) null商品库存
goods_pricedecimal(10,2) null商品价格
goods_descvarchar(255) null商品描述
add_timetimestamp null添加时间

3)订单表

表名:trade_order

FieldTypeComment
order_idbigint(50) not null订单ID
user_idbigint(50) null用户ID
order_statusint(1) null订单状态 0-未确认 1-已确认 2-已取消 3-无效 4-退款
pay_statusint(1) null支付状态 0-未支付 1-支付中 2-已支付
shipping_statusint(1) null发货状态 0-未发货 1-已发货 2-已发货
addressvarchar(255) null收获地址
consigneevarchar(255) null收货人
goods_idbigint(50) null商品ID
goods_numberint(11) null商品数量
goods_pricedecimal(10,2) null商品价格
goods_amountdecimal(10,2) null商品总价
shipping_feedecimal(10,2) null运费
order_amountdecimal(10,2) null订单金额 (商品数量*商品价格)
coupon_idbigint(50) null优惠券ID
coupon_paiddecimal(10,2) null优惠券
money_paiddecimal(10,2) null已支付金额
pay_amountdecimal(10,2) null支付金额
add_timetimestamp null创建时间
confirm_timetimestamp null订单确认时间
pay_timetimestamp Null支付时间

4)订单商品日志表

表名:trade_goods_number_log

FieldTypeComment
goods_idint(50) not null商品ID
oder_idvarchar(50) not null订单ID
goods_numberint(11) null商品数量
log_timedatetime null记录时间

5) 用户表

表名:trade_user

FieldTypeComment
user_idbigint(50) not null用户ID
user_namevarchar(255) null用户姓名
user_passwordvarchar(255) null用户密码
user_mobilevarchar(255) null手机号
user_scoreint(11) null积分
user_reg_timetimestamp null注册时间
user_moneydecimal(10,2) null用户余额

6)用户余额日志表

表名:trade_user_money_log

FieldTypeComment
user_idbigint(50) not null用户ID
order_idbigint(50) not null订单ID
money_log_typeint(1) not null日志类型 1-订单付款 2-订单退款
use_moneydecimal(10,2) null操作金额
create_timetimestamp null日志时间

7)订单支付表

表名:trade_pay

FieldTypeComment
pay_idbigint(50) not null支付编号
order_idbigint(50) null订单编号
pay_amountdecimal(10,2) null支付金额
is_paidint(1) null是否已支付 1-否 2-是

8)MQ消息生产表

表名:trade_mq_producer_temp

FieldTypeComment
idvarchar(100) not null主键
group_namevarchar(100) null生产者组名
msg_topicvarchar(100) null消息主题
msg_tagvarchar(100) nullTag
msg_keyvarchar(100) nullKey
msg_bodyvarchar(500) null消息内容
msg_statusint(1) null0-未处理 1-已处理
create_timetimestamp not null记录时间

9)MQ消息消费表

表名:trade_mq_consumer_log

FieldTypeComment
msg_idvarchar(100) null消息ID
group_namevarchar(100) not null消费者组名 (primary key)
msg_tagvarchar(100) not nullTag (primary key)
msg_keyvarchar(100) not nullKey (primary key)
msg_bodyvarchar(500) null消息体
consumer_statusint(1) null0-正在处理;1-处理成功;2-处理失败
consumer_timesint(1) null消费次数
consumer_timetimestamp null消费时间
remarkvarchar(500) null

10)SQL脚本

/*
SQLyog Ultimate v8.32 
MySQL - 5.5.49 : Database - trade
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`trade` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `trade`;

/*Table structure for table `trade_coupon` */

DROP TABLE IF EXISTS `trade_coupon`;

CREATE TABLE `trade_coupon` (
  `coupon_id` BIGINT(50) NOT NULL COMMENT '优惠券ID',
  `coupon_price` DECIMAL(10,2) DEFAULT NULL COMMENT '优惠券金额',
  `user_id` BIGINT(50) DEFAULT NULL COMMENT '用户ID',
  `order_id` BIGINT(32) DEFAULT NULL COMMENT '订单ID',
  `is_used` INT(1) DEFAULT NULL COMMENT '是否使用 0未使用 1已使用',
  `used_time` TIMESTAMP NULL DEFAULT NULL COMMENT '使用时间',
  PRIMARY KEY (`coupon_id`),
  KEY `FK_trade_coupon` (`user_id`),
  KEY `FK_trade_coupon2` (`order_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_coupon` */

/*Table structure for table `trade_goods` */

DROP TABLE IF EXISTS `trade_goods`;

CREATE TABLE `trade_goods` (
  `goods_id` BIGINT(50) NOT NULL AUTO_INCREMENT,
  `goods_name` VARCHAR(255) DEFAULT NULL COMMENT '商品名称',
  `goods_number` INT(11) DEFAULT NULL COMMENT '商品库存',
  `goods_price` DECIMAL(10,2) DEFAULT NULL COMMENT '商品价格',
  `goods_desc` VARCHAR(255) DEFAULT NULL COMMENT '商品描述',
  `add_time` TIMESTAMP NULL DEFAULT NULL COMMENT '添加时间',
  PRIMARY KEY (`goods_id`)
) ENGINE=INNODB AUTO_INCREMENT=345959443973935105 DEFAULT CHARSET=utf8;

/*Data for the table `trade_goods` */

INSERT  INTO `trade_goods`(`goods_id`,`goods_name`,`goods_number`,`goods_price`,`goods_desc`,`add_time`) VALUES (345959443973935104,'华为P30',999,'5000.00','夜间拍照更美','2019-07-09 20:38:00');

/*Table structure for table `trade_goods_number_log` */

DROP TABLE IF EXISTS `trade_goods_number_log`;

CREATE TABLE `trade_goods_number_log` (
  `goods_id` BIGINT(50) NOT NULL COMMENT '商品ID',
  `order_id` BIGINT(50) NOT NULL COMMENT '订单ID',
  `goods_number` INT(11) DEFAULT NULL COMMENT '库存数量',
  `log_time` TIMESTAMP NULL DEFAULT NULL,
  PRIMARY KEY (`goods_id`,`order_id`, `goods_number`), -- 如果不设置三个字段的联合主键,创建订单和取消订单登记的日志会存在主键冲突. 创建订单 goods_numer为负数表示扣减库存, 取消订单goods_number为正数表示回退库存.
  KEY `FK_trade_goods_number_log2` (`order_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_goods_number_log` */

/*Table structure for table `trade_mq_consumer_log` */

DROP TABLE IF EXISTS `trade_mq_consumer_log`;

CREATE TABLE `trade_mq_consumer_log` (
  `msg_id` VARCHAR(50) DEFAULT NULL,
  `group_name` VARCHAR(100) NOT NULL,
  `msg_tag` VARCHAR(100) NOT NULL,
  `msg_key` VARCHAR(100) NOT NULL,
  `msg_body` VARCHAR(500) DEFAULT NULL,
  `consumer_status` INT(1) DEFAULT NULL COMMENT '0:正在处理;1:处理成功;2:处理失败',
  `consumer_times` INT(1) DEFAULT NULL,
  `consumer_timestamp` TIMESTAMP NULL DEFAULT NULL,
  `remark` VARCHAR(500) DEFAULT NULL,
  PRIMARY KEY (`group_name`,`msg_tag`,`msg_key`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_mq_consumer_log` */

/*Table structure for table `trade_mq_producer_temp` */

DROP TABLE IF EXISTS `trade_mq_producer_temp`;

CREATE TABLE `trade_mq_producer_temp` (
  `id` VARCHAR(100) NOT NULL,
  `group_name` VARCHAR(100) DEFAULT NULL,
  `msg_topic` VARCHAR(100) DEFAULT NULL,
  `msg_tag` VARCHAR(100) DEFAULT NULL,
  `msg_key` VARCHAR(100) DEFAULT NULL,
  `msg_body` VARCHAR(500) DEFAULT NULL,
  `msg_status` INT(1) DEFAULT NULL COMMENT '0:未处理;1:已经处理',
  `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_mq_producer_temp` */

/*Table structure for table `trade_order` */

DROP TABLE IF EXISTS `trade_order`;

CREATE TABLE `trade_order` (
  `order_id` BIGINT(50) NOT NULL COMMENT '订单ID',
  `user_id` BIGINT(50) DEFAULT NULL COMMENT '用户ID',
  `order_status` INT(1) DEFAULT NULL COMMENT '订单状态 0未确认 1已确认 2已取消 3无效 4退款',
  `pay_status` INT(1) DEFAULT NULL COMMENT '支付状态 0未支付 1支付中 2已支付',
  `shipping_status` INT(1) DEFAULT NULL COMMENT '发货状态 0未发货 1已发货 2已收货',
  `address` VARCHAR(255) DEFAULT NULL COMMENT '收货地址',
  `consignee` VARCHAR(255) DEFAULT NULL COMMENT '收货人',
  `goods_id` BIGINT(50) DEFAULT NULL COMMENT '商品ID',
  `goods_number` INT(11) DEFAULT NULL COMMENT '商品数量',
  `goods_price` DECIMAL(10,2) DEFAULT NULL COMMENT '商品价格',
  `goods_amount` DECIMAL(10,0) DEFAULT NULL COMMENT '商品总价',
  `shipping_fee` DECIMAL(10,2) DEFAULT NULL COMMENT '运费',
  `order_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '订单价格',
  `coupon_id` BIGINT(50) DEFAULT NULL COMMENT '优惠券ID',
  `coupon_paid` DECIMAL(10,2) DEFAULT NULL COMMENT '优惠券',
  `money_paid` DECIMAL(10,2) DEFAULT NULL COMMENT '已付金额',
  `pay_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '支付金额',
  `add_time` TIMESTAMP NULL DEFAULT NULL COMMENT '创建时间',
  `confirm_time` TIMESTAMP NULL DEFAULT NULL COMMENT '订单确认时间',
  `pay_time` TIMESTAMP NULL DEFAULT NULL COMMENT '支付时间',
  PRIMARY KEY (`order_id`),
  KEY `FK_trade_order` (`user_id`),
  KEY `FK_trade_order2` (`goods_id`),
  KEY `FK_trade_order3` (`coupon_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_order` */

/*Table structure for table `trade_pay` */

DROP TABLE IF EXISTS `trade_pay`;

CREATE TABLE `trade_pay` (
  `pay_id` BIGINT(50) NOT NULL COMMENT '支付编号',
  `order_id` BIGINT(50) DEFAULT NULL COMMENT '订单编号',
  `pay_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '支付金额',
  `is_paid` INT(1) DEFAULT NULL COMMENT '是否已支付 1否 2是',
  PRIMARY KEY (`pay_id`),
  KEY `FK_trade_pay` (`order_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

/*Data for the table `trade_pay` */

/*Table structure for table `trade_user` */

DROP TABLE IF EXISTS `trade_user`;

CREATE TABLE `trade_user` (
  `user_id` BIGINT(50) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `user_name` VARCHAR(255) DEFAULT NULL COMMENT '用户姓名',
  `user_password` VARCHAR(255) DEFAULT NULL COMMENT '用户密码',
  `user_mobile` VARCHAR(255) DEFAULT NULL COMMENT '手机号',
  `user_score` INT(11) DEFAULT NULL COMMENT '积分',
  `user_reg_time` TIMESTAMP NULL DEFAULT NULL COMMENT '注册时间',
  `user_money` DECIMAL(10,0) DEFAULT NULL COMMENT '用户余额',
  PRIMARY KEY (`user_id`)
) ENGINE=INNODB AUTO_INCREMENT=345963634385633281 DEFAULT CHARSET=utf8;

/*Data for the table `trade_user` */

INSERT  INTO `trade_user`(`user_id`,`user_name`,`user_password`,`user_mobile`,`user_score`,`user_reg_time`,`user_money`) VALUES (345963634385633280,'刘备','123L','18888888888L',100,'2019-07-09 13:37:03','900');

/*Table structure for table `trade_user_money_log` */

DROP TABLE IF EXISTS `trade_user_money_log`;

CREATE TABLE `trade_user_money_log` (
  `user_id` BIGINT(50) NOT NULL COMMENT '用户ID',
  `order_id` BIGINT(50) NOT NULL COMMENT '订单ID',
  `money_log_type` INT(1) NOT NULL COMMENT '日志类型 1订单付款 2 订单退款',
  `use_money` DECIMAL(10,2) DEFAULT NULL,
  `create_time` TIMESTAMP NULL DEFAULT NULL COMMENT '日志时间',
  PRIMARY KEY (`user_id`,`order_id`,`money_log_type`),
  KEY `FK_trade_user_money_log2` (`order_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

3.2 项目初始化

shop系统基于maven进行项目管理。

3.2.1 工程浏览

共12个系统

  • 父工程:shop-parent
  • 订单系统:shop-order-web
  • 支付系统:shop-pay-web
  • 优惠券服务:shop-coupon-service
  • 订单服务:shop-order-service
  • 支付服务:shop-pay-service
  • 商品服务:shop-goods-service
  • 用户服务:shop-user-service
  • 实体类:shop-pojo
  • 持久层:shop-dao
  • 接口层:shop-api
  • 工具工程:shop-common

3.2.2 工程关系

在这里插入图片描述

3.3 Mybatis逆向工程使用

1)代码生成

使用Mybatis逆向工程针对数据表生成CURD持久层代码

2)代码导入

  • 将实体类导入到shop-pojo工程
  • 在服务层工程中导入对应的Mapper类和对应配置文件

3.4 公共类介绍

  • ID生成器

    IDWorker:Twitter雪花算法

  • 异常处理类

    CustomerException:自定义异常类

    CastException:异常抛出类

  • 常量类

    ShopCode:系统状态类

  • 响应实体类

    Result:封装响应状态和响应信息

4. 下单业务

在这里插入图片描述

4.1 下单基本流程

4.1.1 接口定义

shop-api中添加订单接口 IOrderService

package com.crysw.api;

import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;

/**
 * 描述:订单接口
 * @author crysw
 * @date 2022/7/5 21:55
 * @version 1.0
 */
public interface IOrderService {
    /**
     * 确认订单
     * @param order
     * @return
     */
    Result confirmOrder(TradeOrder order);
}

shop-api中添加商品接口 IGoodsService

package com.crysw.api;

import com.crysw.shop.pojo.TradeGoods;

/**
 * 描述:商品接口
 * @author crysw
 * @date 2022/7/5 22:14
 * @version 1.0
 */
public interface IGoodsService {
    /**
     * 根据商品ID查询商品对象
     * @param goodsId
     * @return
     */
    TradeGoods findOne(Long goodsId);
   
   /**
     * 扣减库存
     * @param goodsNumberLog
     * @return
     */
    Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog);
}

shop-api中添加用户接口 IUserService

package com.crysw.api;

import com.crysw.shop.pojo.TradeUser;

/**
 * 描述:用户接口
 * @author crysw
 * @date 2022/7/5 22:32
 * @version 1.0
 */
public interface IUserService {
    /**
     * 根据用户ID查询用户对象
     * @param userId
     * @return
     */
    TradeUser findOne(Long userId);
}

shop-api中添加用户接口 ICouponService

package com.crysw.api;

import com.crysw.shop.pojo.TradeCoupon;

/**
 * 描述:优惠券接口
 * @author crysw
 * @date 2022/7/11 21:24
 * @version 1.0
 */
public interface ICouponService {

    /**
     * 根据优惠券ID查询优惠券信息
     * @param couponId
     * @return
     */
    TradeCoupon findOne(Long couponId);
}

4.1.2 业务类实现

在模块shop-order-service中添加IOrderService接口的实现类, 进行订单的创建。

package com.crysw.shop.service.impl;

import com.crysw.api.IOrderService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;

/**
 * 描述:
 * @author crysw
 * @date 2022/7/5 21:58
 * @version 1.0
 */
@Component
// dubbo服务接口
@Service(interfaceClass = IOrderService.class)
@Slf4j
public class OrderServiceImpl implements IOrderService {

    @Reference
    private IGoodsService goodsService;
    @Reference
    private IUserService userService;
        @Override
     public Result confirmOrder(TradeOrder order) {
        //1.校验订单
        checkOrder(order);
        //2.生成预订单
        savePreOrder(order);
        try {
            //3.扣减库存
            reduceGoodsNum(order);
            //4.扣减优惠券
            updateCouponStatus(order);
            //5.扣减用户余额
            reduceMoneyPaid(order);
            //6.确认订单
            updateOrderStatus(order);
            //7.返回成功状态
            return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
        } catch (Exception e) {
            //1.确认订单失败,发送消息
            
            //2.返回失败状态
          return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
        }
    }
}

4.1.3 校验订单

在这里插入图片描述

OrderServiceImpl#checkOrder

/**
 * 校验订单
 * @param order
 */
private void checkOrder(TradeOrder order) {
    //1.校验订单是否存在
    if (order == null) {
        CastException.cast(ShopCode.SHOP_ORDER_INVALID);
    }

    //2.校验订单中的商品是否存在
    TradeGoods goods = goodsService.findOne(order.getGoodsId());
    if (goods == null) {
        CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
    }

    //3.校验下单用户是否存在
    TradeUser user = userService.findOne(order.getUserId());
    if (user == null) {
        CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
    }

    //4.校验商品单价是否合法
    if (order.getGoodsPrice().compareTo(goods.getGoodsPrice()) != 0) {
        CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
    }

    //5.校验订单商品数量是否合法
    if (order.getGoodsNumber() >= goods.getGoodsNumber()) {
        CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
    }

    log.info("校验订单通过");
}

4.1.4 生成预订单

在这里插入图片描述

OrderServiceImpl#savePreOrder

/**
 * 生成预订单
 * @param order
 */
private long savePreOrder(TradeOrder order) {
    /**1.设置订单状态为不可见**/
    order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
    /**2.设置订单ID**/
    long orderId = idWorker.nextId();
    order.setOrderId(orderId);
    //3.核算订单运费**/
    // 计算运费
    BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
    // 校验运费是否正确
    if (order.getShippingFee().compareTo(shippingFee) != 0) {
        CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
    }
    /**4. 核算订单总金额是否合法**/
    // 计算订单总金额=订单的商品价格*订单的商品数量
    BigDecimal orderAmounts = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
    // 加上运费
    orderAmounts.add(shippingFee);
    if (order.getOrderAmount().compareTo(orderAmounts) != 0) {
        CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
    }

    /**5.判断用户是否使用余额**/
    BigDecimal moneyPaid = order.getMoneyPaid();
    if (moneyPaid != null) {
        //5.1 订单中余额是否合法
        int r = moneyPaid.compareTo(BigDecimal.ZERO);
        // 余额小于0
        if (r == -1) {
            CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
        }
        //余额大于0
        if (r == 1) {
            // 查询用户信息
            TradeUser user = userService.findOne(order.getUserId());
            if (user == null) {
                CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
            }
            //比较余额是否大于用户账户余额
            if (user.getUserMoney().compareTo(moneyPaid.longValue()) == -1) {
                CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
            }
            order.setMoneyPaid(moneyPaid);
        }

    } else {
        order.setMoneyPaid(BigDecimal.ZERO);
    }
    /**6.判断用户是否使用优惠券**/
    Long couponId = order.getCouponId();
    if (couponId != null) {
        // 查询优惠券信息
        TradeCoupon coupon = couponService.findOne(couponId);
        //6.1 判断优惠券是否存在
        if (coupon == null) {
            CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
        }
        //6.2 判断优惠券是否已经被使用
        if (Objects.equals(ShopCode.SHOP_COUPON_ISUSED.getCode().toString(), coupon.getIsUsed().toString())) {
            CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
        }
        order.setCouponPaid(coupon.getCouponPrice());
    } else {
        // 优惠券不存在
        order.setCouponPaid(BigDecimal.ZERO);
    }

    /**7.核算订单支付金额    订单总金额-已付金额-优惠券金额**/
    BigDecimal payAmount = order.getOrderAmount().subtract(order.getMoneyPaid()).subtract(order.getCouponPaid());
    order.setPayAmount(payAmount);

    /**8.设置下单时间**/
    order.setAddTime(new Date());
    /**9.保存订单到数据库**/
    orderMapper.insert(order);
    /**10.返回订单ID**/
    return orderId;
}

/**
 * 核算运费
 * @param orderAmount 订单金额
 * @return
 */
private BigDecimal calculateShippingFee(BigDecimal orderAmount) {
    // 如果订单价格大于100, 免运费
    if (orderAmount.compareTo(new BigDecimal(100)) == 1) {
        return BigDecimal.ZERO;
    } else {
        // 否则, 需要收取10元运费
        return new BigDecimal(10);
    }
}

4.1.5 扣减库存

通过dubbo调用商品服务完成扣减库存. OrderServiceImpl#reduceGoodsNum

/**
 * 扣减库存
 * @param order
 */
private void reduceGoodsNum(TradeOrder order) {
    TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
    goodsNumberLog.setOrderId(order.getOrderId());
    goodsNumberLog.setGoodsId(order.getGoodsId());
    goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
    Result result = goodsService.reduceGoodsNum(goodsNumberLog);
    if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
        CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
    }
    log.info("订单: " + order.getOrderId() + "扣减库存成功");
}

在商品服务接口中新增扣减库存的方法

/**
 * 描述:商品接口
 * @author crysw
 * @date 2022/7/5 22:14
 * @version 1.0
 */
public interface IGoodsService {
    /**
     * 根据商品ID查询商品对象
     * @param goodsId
     * @return
     */
    TradeGoods findOne(Long goodsId);

    /**
     * 扣减库存
     * @param goodsNumberLog
     * @return
     */
    Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog);
}

商品服务GoodsService扣减库存的实现. GoodsServiceImpl#reduceGoodsNum

@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
    if (goodsNumberLog == null 
       || goodsNumberLog.getGoodsNumber() == null 
       || goodsNumberLog.getOrderId() == null 
       || goodsNumberLog.getGoodsNumber().intValue() <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
    if (goods.getGoodsNumber() < goodsNumberLog.getGoodsNumber()) {
        // 库存不足
        CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
    }
    //减库存
    goods.setGoodsNumber(goods.getGoodsNumber() - goodsNumberLog.getGoodsNumber());
    goodsMapper.updateByPrimaryKey(goods);
    //记录库存操作日志
    goodsNumberLog.setGoodsNumber(-goodsNumberLog.getGoodsNumber());
    goodsNumberLog.setLogTime(new Date());
    goodsNumberLogMapper.insert(goodsNumberLog);
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

4.1.6 扣减优惠券

通过dubbo完成扣减优惠券. OrderServiceImpl#updateCouponStatus

/**
 * 扣减优惠券
 * @param order
 */
private void updateCouponStatus(TradeOrder order) {
    if (order.getCouponId() != null) {
        TradeCoupon coupon = couponService.findOne(order.getCouponId());
        coupon.setOrderId(order.getOrderId());
        coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
        coupon.setUsedTime(new Date());
        //更新优惠券状态
        Result result = couponService.updateCouponStatus(coupon);
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
        }
        log.info("订单:" + order.getOrderId() + ",使用优惠券");
    }
}

优惠券服务接口新增更新优惠券状态接口. ICouponService#updateCouponStatus

/**
 * 描述:优惠券接口
 * @author crysw
 * @date 2022/7/11 21:24
 * @version 1.0
 */
public interface ICouponService {
    /**
     * 根据优惠券ID查询优惠券信息
     * @param couponId
     * @return
     */
    TradeCoupon findOne(Long couponId);

    /**
     * 更新优惠券
     * @param coupon
     * @return
     */
    Result updateCouponStatus(TradeCoupon coupon);
}

优惠券服务CouponService更改优惠券状态 CouponServiceImpl#updateCouponStatus

@Override
public Result updateCouponStatus(TradeCoupon coupon) {
    if (coupon == null || coupon.getCouponId() == null) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    tradeCouponMapper.updateByPrimaryKey(coupon);
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

4.1.7 扣减用户余额

扣减用户余额的流程图
在这里插入图片描述
通过用户服务完成扣减余额 OrderServiceImpl#reduceMoneyPaid

/**
 * 扣减余额
 * @param order
 */
private void reduceMoneyPaid(TradeOrder order) {
    if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
        TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
        userMoneyLog.setOrderId(order.getOrderId());
        userMoneyLog.setUserId(order.getUserId());
        userMoneyLog.setUseMoney(order.getMoneyPaid());
        userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
        Result result = userService.updateMoneyPaid(userMoneyLog);
        if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
            CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
        }
        log.info("订单:" + order.getOrderId() + ",扣减余额成功");
    }
}

用户服务接口新增更新余额的接口方法 IUserService#updateMoneyPaid

/**
 * 描述:用户接口
 * @author crysw
 * @date 2022/7/5 22:32
 * @version 1.0
 */
public interface IUserService {
    /**
     * 根据用户ID查询用户对象
     * @param userId
     * @return
     */
    TradeUser findOne(Long userId);

    /**
     * 扣减余额
     * @param userMoneyLog
     * @return
     */
    Result updateMoneyPaid(TradeUserMoneyLog userMoneyLog);
}

用户服务UserService,更新余额的实现 UserServiceImpl#updateMoneyPaid

@Override
public Result updateMoneyPaid(TradeUserMoneyLog userMoneyLog) {
    /**1.校验参数是否合法**/
    if (userMoneyLog == null ||
            userMoneyLog.getUserId() == null ||
            userMoneyLog.getOrderId() == null ||
            userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    /**2.查询订单余额使用日志**/
    TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
    TradeUserMoneyLogExample.Criteria criteria = userMoneyLogExample.createCriteria();
    criteria.andOrderIdEqualTo(userMoneyLog.getOrderId());
    criteria.andUserIdEqualTo(userMoneyLog.getUserId());
    int r = userMoneyLogMapper.countByExample(userMoneyLogExample);
    TradeUser tradeUser = tradeUserMapper.selectByPrimaryKey(userMoneyLog.getUserId());
    /**3.扣减余额...**/
    if (userMoneyLog.getMoneyLogType().intValue() == ShopCode.SHOP_USER_MONEY_PAID.getCode().intValue()) {
        if (r > 0) {
            // 已付款
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
        }

        // 扣减余额操作
        tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney()).subtract(userMoneyLog.getUseMoney()).longValue());
        tradeUserMapper.updateByPrimaryKey(tradeUser);
    }
    /**4.回退余额...**/
    if (userMoneyLog.getMoneyLogType().intValue() == ShopCode.SHOP_USER_MONEY_REFUND.getCode().intValue()) {
        if (r < 0) {
            // 没有付款记录, 不能退款
            CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
        }
        // 防止多次退款
        TradeUserMoneyLogExample userMoneyLogExample2 = new TradeUserMoneyLogExample();
        userMoneyLogExample2.createCriteria()
                .andOrderIdEqualTo(userMoneyLog.getOrderId())
                .andUserIdEqualTo(userMoneyLog.getUserId())
                .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
        int r2 = userMoneyLogMapper.countByExample(userMoneyLogExample2);
        if (r2 > 0) {
            // 已经退款了
            CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
        }
        // 没有退款, 进行退款操作
        tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney()).add(userMoneyLog.getUseMoney()).longValue());
        tradeUserMapper.updateByPrimaryKey(tradeUser);
    }

    /**5.记录订单余额使用日志**/
    userMoneyLog.setCreateTime(new Date());
    userMoneyLogMapper.insert(userMoneyLog);

    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

4.1.8 确认订单

OrderServiceImpl#updateOrderStatus

/**
 * 确认订单
 * @param order
 */
private void updateOrderStatus(TradeOrder order) {
    order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
    order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
    order.setConfirmTime(new Date());
    int r = orderMapper.updateByPrimaryKey(order);
    if (r <= 0) {
        // 确认订单失败
        CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
    }
    log.info("订单:[" + order.getOrderId() + "]状态修改(确认)成功");
}

4.1.9 测试创建订单

编写测试api: OrderServiceTest#confirmOrder

package com.crysw.test;

import com.crysw.api.IOrderService;
import com.crysw.shop.OrderApplication;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.math.BigDecimal;

/**
 * 描述:
 * @author crysw
 * @date 2022/7/18 22:27
 * @version 1.0
 */
@SpringBootTest(classes = {OrderApplication.class})
@RunWith(SpringRunner.class)
public class OrderServiceTest {

    @Autowired
    private IOrderService orderService;

    @Test
    public void confirmOrder() throws IOException {
        Long coupouId = 345988230098857984L;
        Long goodsId = 345959443973935104L;
        Long userId = 345963634385633280L;

        // 创建订单
        TradeOrder order = new TradeOrder();
        order.setGoodsId(goodsId);
        order.setUserId(userId);
        order.setCouponId(coupouId);
        order.setAddress("北京");
        order.setGoodsNumber(1);
        // 商品价格
        order.setGoodsPrice(new BigDecimal(1000));
        // 运费
        order.setShippingFee(BigDecimal.ZERO);
        // 订单金额 = 商品价格*数量 + 运费
        order.setOrderAmount(new BigDecimal(1000));
        // 已支付金额
        order.setMoneyPaid(new BigDecimal(100));
        orderService.confirmOrder(order);

        System.in.read();
    }
}

测试日志:

# 校验订单
2022-07-25 13:07:17.821  INFO 24040 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 校验订单通过
2022-07-25 13:07:18.143  INFO 24040 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-07-25 13:07:18.147  WARN 24040 --- [           main] com.zaxxer.hikari.util.DriverDataSource  : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation.
2022-07-25 13:07:18.361  INFO 24040 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
# 扣减库存
2022-07-25 13:07:18.567  INFO 24040 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单: 748931450161729536扣减库存成功
# 使用优惠券
2022-07-25 13:07:18.612  INFO 24040 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单:748931450161729536,使用优惠券
# 扣减余额
2022-07-25 13:07:18.942  INFO 24040 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单:748931450161729536,扣减余额成功
# 修改订单状态
2022-07-25 13:07:18.947  INFO 24040 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单:[748931450161729536]状态修改(确认)成功

查看库表记录:

trade_order订单表生成了一条订单, 还没有支付.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <order_id>748931450161729536</order_id>
    <user_id>345963634385633280</user_id>
    <order_status>1</order_status> <!--订单状态 0未确认 1已确认 2已取消 3无效 4退款-->
    <pay_status>0</pay_status> <!--支付状态 0未支付 1支付中 2已支付-->
    <shipping_status>(NULL)</shipping_status>
    <address>北京</address>
    <consignee>(NULL)</consignee>
    <goods_id>345959443973935104</goods_id>
    <goods_number>1</goods_number> <!--商品数量-->
    <goods_price>1000.00</goods_price> <!--商品单价1000.00-->
    <goods_amount>(NULL)</goods_amount>
    <shipping_fee>0.00</shipping_fee>
    <order_amount>1000.00</order_amount> <!--订单金额1000.00-->
    <coupon_id>345988230098857984</coupon_id>
    <coupon_paid>20.00</coupon_paid> <!--使用了20.00的优惠券-->
    <money_paid>100.00</money_paid> <!--已支付金额 100.00-->
    <pay_amount>880.00</pay_amount> <!--待支付订单金额 880.00-->
    <add_time>2022-07-25 13:07:18</add_time>
    <confirm_time>2022-07-25 13:07:19</confirm_time>
    <pay_time>(NULL)</pay_time>
  </row>
</data>

trade_goods_number_log 订单商品日志表, 新增一条订单日志记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <goods_id>345959443973935104</goods_id>
    <order_id>748931450161729536</order_id>
    <goods_number>-1</goods_number>
    <log_time>2022-07-25 13:07:19</log_time>
  </row>
</data>

trade_goods 商品表, 库存减少.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <goods_id>345959443973935104</goods_id>
    <goods_name>JavaSE课程</goods_name>
    <goods_number>991</goods_number>
    <goods_price>1000.00</goods_price>
    <goods_desc>传智播客出品Java视频课程</goods_desc>
    <add_time>2019-07-09 20:38:00</add_time>
  </row>
  <row>
    <goods_id>345959443973935105</goods_id>
    <goods_name>华为P30</goods_name>
    <goods_number>999</goods_number>
    <goods_price>5000.00</goods_price>
    <goods_desc>夜间拍照更美</goods_desc>
    <add_time>2019-07-09 20:38:00</add_time>
  </row>
</data>

trade_coupon 优惠券表的优惠券已使用.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_price>20.00</coupon_price>
    <user_id>345963634385633280</user_id>
    <order_id>748931450161729536</order_id>
    <is_used>1</is_used>  <!--是否使用 0未使用 1已使用-->
    <used_time>2022-07-25 13:07:19</used_time> <!--优惠券使用时间-->
  </row>
</data>

trade_user用户表的用户余额扣减了100, 剩余900.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <user_id>345963634385633280</user_id>
    <user_name>刘备</user_name>
    <user_password>123L</user_password>
    <user_mobile>18888888888L</user_mobile>
    <user_score>100</user_score>
    <user_reg_time>2019-07-09 13:37:03</user_reg_time>
    <user_money>900</user_money>
  </row>
</data>

trade_user_money_log 用户余额日志表, 新增一条扣减用户余额的日志记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <user_id>345963634385633280</user_id>
    <order_id>748931450161729536</order_id>
    <money_log_type>1</money_log_type> <!--日志类型 1订单付款 2 订单退款-->
    <use_money>100.00</use_money> <!-- 操作金额-->
    <create_time>2022-07-25 13:07:19</create_time>
  </row>
</data>

4.2 失败补偿机制

使用MQ异步解耦的方式推送下单失败的消息, MQ集群搭建参考博客RocketMQ专题01

4.2.1 消息发送方

  1. 配置RocketMQ属性值
# RocketMQ
rocketmq.name-server=192.168.65.129:9876;192.168.65.130:9876
# 创建订单失败的生者者组
rocketmq.producer.group=orderProducerGroup
# 创建订单失败的消费者组,topic,tag
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.cancel=order_cancel
  1. 注入模板类和属性值信息 (com.crysw.shop.service.impl.OrderServiceImpl)
@Autowired
 private RocketMQTemplate rocketMQTemplate;

 @Value("${mq.order.topic}")
 private String topic;

 @Value("${mq.order.tag.cancel}")
 private String cancelTag;
  1. 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    checkOrder(order);
    //2.生成预订单
    savePreOrder(order);
    try {
        //3.扣减库存
        reduceGoodsNum(order);
        //4.扣减优惠券
        updateCouponStatus(order);
        //5.扣减用户余额
        reduceMoneyPaid(order);
        // 模拟异常
        CastException.cast(ShopCode.SHOP_FAIL);
        //6.确认订单
        updateOrderStatus(order);
        //7.返回成功状态
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
        //1.确认订单失败,发送消息
        MQEntity mqEntity = new MQEntity();
        mqEntity.setOrderId(order.getOrderId());
        mqEntity.setUserId(order.getUserId());
        mqEntity.setGoodsId(order.getGoodsId());
        mqEntity.setCouponId(order.getCouponId());
        mqEntity.setGoodsNum(order.getGoodsNumber());
        mqEntity.setUserMoney(order.getMoneyPaid());
        try {
            sendCancelOrder(topic, tag, order.getOrderId().toString(), JSON.toJSONString(mqEntity));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        //2.返回失败状态
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}

/**
 * 发送订单确认失败消息
 * @param topic
 * @param tag
 * @param keys
 * @param body
 */
private void sendCancelOrder(String topic, String tag, String keys, String body) throws Exception {
    Message message = new Message(topic, tag, keys, body.getBytes());
    rocketMQTemplate.getProducer().send(message);
}
  1. 测试下单失败, 发送消息.

将优惠券345988230098857984的状态修改为0-未使用, 在上面的下单流程中的扣减余额后面加入模拟异常抛出的代码 ,重新执行单元测试案例.

//5.扣减用户余额
reduceMoneyPaid(order);
// 模拟异常
CastException.cast(ShopCode.SHOP_FAIL);
//6.确认订单
updateOrderStatus(order);

创建订单的测试日志

# 校验订单
2022-07-25 16:35:16.767  INFO 23960 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 校验订单通过
2022-07-25 16:35:16.956  INFO 23960 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-07-25 16:35:16.969  WARN 23960 --- [           main] com.zaxxer.hikari.util.DriverDataSource  : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation.
2022-07-25 16:35:17.249  INFO 23960 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
# 减库存
2022-07-25 16:35:17.340  INFO 23960 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单: 748983790659047424扣减库存成功
# 使用优惠券
2022-07-25 16:35:17.369  INFO 23960 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单:748983790659047424,使用优惠券
# 扣减余额
2022-07-25 16:35:17.423  INFO 23960 --- [           main] c.c.shop.service.impl.OrderServiceImpl   : 订单:748983790659047424,扣减余额成功
# 下单失败,抛出异常
2022-07-25 16:35:17.443 ERROR 23960 --- [           main] com.crysw.exception.CastException        : ShopCode{success=false, code=0, message='错误'}

程序捕获到异常后, 下单失败会推送mq消息.
在这里插入图片描述

4.2.2 消费接收方

0) 分析

订单表, 订单商品日志表, 优惠券表, 用户表, 用户余额日志表都已经插入记录或更新记录成功, 创建订单却是失败的, 正常来说是需要回滚之前的记录.

这里可以通过MQ消费者消费上面推送的订单失败的消息进行数据回滚, 与主流程解耦, 提高处理性能.

配置RocketMQ属性值

# RocketMQ
rocketmq.name-server=192.168.65.129:9876;192.168.65.130:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic

创建监听类,消费消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", 
                         messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{
    @Override
    public void onMessage(MessageExt messageExt) {
        // 消息处理
    }
}
1) 回退库存

创建订单失败后, 通过MQ推送下单失败的消息, 商品服务[shop-goods-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行商品库存的回退及消息日志记录.

流程分析
在这里插入图片描述

消息消费者的回退库存操作

package com.crysw.shop.mq;

import com.alibaba.fastjson.JSON;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeGoodsMapper;
import com.crysw.shop.mapper.TradeGoodsNumberLogMapper;
import com.crysw.shop.mapper.TradeMqConsumerLogMapper;
import com.crysw.shop.pojo.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.Date;

import static com.crysw.constant.ShopCode.*;

/**
 * 描述:创建订单失败的消息消费者-回退库存
 * @author crysw
 * @date 2022/7/25 19:13
 * @version 1.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelGoodsConsumer implements RocketMQListener<MessageExt> {

    @Value("${mq.order.consumer.group.name}")
    private String groupName;

    @Autowired
    private TradeMqConsumerLogMapper mqConsumerLogMapper;

    @Autowired
    private TradeGoodsMapper goodsMapper;

    @Autowired
    private TradeGoodsNumberLogMapper goodsNumberLogMapper;

    @Override
    public void onMessage(MessageExt messageExt) {

        String msgId = null;
        String tags = null;
        String keys = null;
        String body = null;

        try {
            // 1. 解析消息内容
            msgId = messageExt.getMsgId();
            tags = messageExt.getTags();
            keys = messageExt.getKeys();
            body = new String(messageExt.getBody(), "utf-8");
            log.info("接受消息成功");
            // 2. 查询消息消费记录
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgKey(keys);
            primaryKey.setMsgTag(tags);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
            if (mqConsumerLog != null) {
                // 3. 判断, 如果消费过, 获取消息处理状态  0:正在处理;1:处理成功;2:处理失败
                Integer status = mqConsumerLog.getConsumerStatus();
                // 处理过...返回
                if (SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue() == status.intValue()) {
                    log.info("消息:" + msgId + ",已经处理过");
                    return;
                }
                //正在处理...返回
                if (SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue() == status.intValue()) {
                    log.info("消息:" + msgId + ",正在处理");
                    return;
                }

                //处理失败, 判断消息处理次数是否超过重试次数 3次
                if (SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue() == status.intValue()) {
                    // 获取消息已处理的次数
                    Integer times = mqConsumerLog.getConsumerTimes();
                    if (times.intValue() > 3) {
                        log.info("消息:" + msgId + ",消息处理超过3次,不能再进行处理了");
                        return;
                    }
                    // 将消息状态修改为正在处理中
                    mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                    //使用数据库乐观锁更新
                    TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
                    criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag())
                            .andMsgKeyEqualTo(mqConsumerLog.getMsgKey())
                            .andGroupNameEqualTo(mqConsumerLog.getGroupName())
                            .andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
                    int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
                    if (r <= 0) {
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                    }
                }
            } else {
                // 4. 判断, 如果没有消费过
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                mqConsumerLog.setConsumerTimes(0);
                //将消息处理信息添加到数据库
                mqConsumerLogMapper.insert(mqConsumerLog);
            }

            // 5. 回退库存
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            Long goodsId = mqEntity.getGoodsId();
            TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
            goods.setGoodsNumber(goods.getGoodsNumber() + mqEntity.getGoodsNum());
            goodsMapper.updateByPrimaryKey(goods);

            // 记录库存操作日志
            TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
            goodsNumberLog.setOrderId(mqEntity.getOrderId());
            goodsNumberLog.setGoodsId(goodsId);
            goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
            goodsNumberLog.setLogTime(new Date());
            goodsNumberLogMapper.insert(goodsNumberLog);

            // 6. 将消息的处理状态改为处理成功
            mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
            mqConsumerLog.setConsumerTimestamp(new Date());
            mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
            log.info("回退库存成功");

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            TradeMqConsumerLogKey primarykey = new TradeMqConsumerLogKey();
            primarykey.setMsgTag(tags);
            primarykey.setMsgKey(keys);
            primarykey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primarykey);
            if (mqConsumerLog == null) {
                //数据库没有有记录, 消息没有消费过
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
                mqConsumerLog.setConsumerTimes(1);
                mqConsumerLogMapper.insert(mqConsumerLog);
            } else {
                // 消费失败, 消费次数+1, 可以继续重试
                mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes() + 1);
                mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
            }
        }
    }
}
2) 回退优惠券

创建订单失败后, 通过MQ推送下单失败的消息, 优惠券服务[shop-coupon-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行优惠券的回退, 更新优惠券的状态等信息.

package com.crysw.shop.mq;

import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeCouponMapper;
import com.crysw.shop.pojo.TradeCoupon;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * 描述:创建订单失败的消息消费者-回退优惠券
 * @author crysw
 * @date 2022/7/27 20:55
 * @version 1.0
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelCouponConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private TradeCouponMapper couponMapper;

    @Override
    public void onMessage(MessageExt messageExt) {

        try {
            // 1. 解析消息内容
            String body = new String(messageExt.getBody(), "utf-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            if (mqEntity.getCouponId() != null) {
                // 2. 查询优惠券信息
                TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
                // 3. 更改优惠券状态
                coupon.setUsedTime(null);
                coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
                coupon.setOrderId(null);
                couponMapper.updateByPrimaryKey(coupon);
                log.info("回退优惠券成功");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("回退优惠券失败");
        }
    }
}
3) 回退余额

创建订单失败后, 通过MQ推送下单失败的消息, 用户服务[shop-user-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行用户余额的回退.

package com.crysw.shop.mq;

import com.alibaba.fastjson.JSON;
import com.crysw.api.IUserService;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.pojo.TradeUserMoneyLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;

/**
 * 描述:创建订单失败的消息消费者-回退用户余额
 * @author crysw
 * @date 2022/7/27 21:53
 * @version 1.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelUserMoneyConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private IUserService userService;

    @Override
    public void onMessage(MessageExt messageExt) {
        // 1. 解析消息
        try {
            String body = new String(messageExt.getBody(), "utf-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            log.info("接收到消息");
            if (mqEntity.getUserMoney() != null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO) > 0) {
                // 2. 调用业务层, 进行余额修改
                TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
                userMoneyLog.setUseMoney(mqEntity.getUserMoney());
                userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
                userMoneyLog.setUserId(mqEntity.getUserId());
                userMoneyLog.setOrderId(mqEntity.getOrderId());
                userService.updateMoneyPaid(userMoneyLog);
                log.info("余额回退成功");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("余额回退失败");
        }
    }
}

userService.updateMoneyPaid→

4) 取消订单

创建订单失败后, 通过MQ推送下单失败的消息, 订单服务[shop-order-service]订阅下单失败的主题, 监听到下单失败的消息时, 取消订单.

package com.crysw.shop.mq;

import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeOrderMapper;
import com.crysw.shop.pojo.TradeOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * 描述:创建订单失败的消息消费者-取消订单
 * @author crysw
 * @date 2022/7/25 19:13
 * @version 1.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt> {
    @Autowired
    private TradeOrderMapper orderMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            // 1. 解析消息内容
            String body = new String(messageExt.getBody(), "utf-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            log.info("接受消息成功");
            // 2. 查询订单
            if (mqEntity.getOrderId() != null) {
                TradeOrder order = orderMapper.selectByPrimaryKey(mqEntity.getOrderId());
                //3.更新订单状态为取消
                order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
                orderMapper.updateByPrimaryKey(order);
                log.info("订单状态设置为取消");
            }

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("取消订单失败");
        }
    }
}

4.3 测试

4.3.1 准备测试环境

编写测试类.

package com.crysw.test;

import com.crysw.api.IOrderService;
import com.crysw.shop.OrderApplication;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.math.BigDecimal;

/**
 * 描述:
 * @author crysw
 * @date 2022/7/18 22:27
 * @version 1.0
 */
@SpringBootTest(classes = {OrderApplication.class})
@RunWith(SpringRunner.class)
public class OrderServiceTest {

    @Autowired
    private IOrderService orderService;

    @Test
    public void confirmOrder() throws IOException {
        Long coupouId = 345988230098857984L;
        Long goodsId = 345959443973935104L;
        Long userId = 345963634385633280L;

        // 创建订单
        TradeOrder order = new TradeOrder();
        order.setGoodsId(goodsId);
        order.setUserId(userId);
        order.setCouponId(coupouId);
        order.setAddress("北京");
        order.setGoodsNumber(1);
        // 商品价格
        order.setGoodsPrice(new BigDecimal(1000));
        // 运费
        order.setShippingFee(BigDecimal.ZERO);
        // 订单金额 = 商品价格*数量 + 运费
        order.setOrderAmount(new BigDecimal(1000));
        // 已支付金额
        order.setMoneyPaid(new BigDecimal(100));
        orderService.confirmOrder(order);
            // 订单创建出现异常后会中断程序,导致无法正常取消订单; 这里等待录入可以让当前订单服务不中断,继续启用状态
        System.in.read();
    }
}

4.3.2 准备测试数据

先还原优惠券状态, 用户余额, 商品库存等数据. 然后启动用户服务, 商品服务, 优惠券服务, 执行上面的单元测试案例进行下单操作(下单操作已模拟异常), 执行完毕后查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。

4.3.3 查看测试结果

trade_order: 订单表新增一条订单, 因为下单失败, 订单状态已更新为”取消”.

<data>
  <row>
    <order_id>750137168931983360</order_id>
    <user_id>345963634385633280</user_id>
    <order_status>2</order_status> <!--订单状态 0未确认 1已确认 2已取消 3无效 4退款-->
    <pay_status>(NULL)</pay_status>
    <shipping_status>(NULL)</shipping_status>
    <address>北京</address>
    <consignee>(NULL)</consignee>
    <goods_id>345959443973935104</goods_id>
    <goods_number>1</goods_number> 
    <goods_price>1000.00</goods_price>
    <goods_amount>(NULL)</goods_amount>
    <shipping_fee>0.00</shipping_fee>
    <order_amount>1000.00</order_amount>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_paid>20.00</coupon_paid>
    <money_paid>100.00</money_paid>
    <pay_amount>880.00</pay_amount>
    <add_time>2022-07-28 20:58:24</add_time>
    <confirm_time>(NULL)</confirm_time>
    <pay_time>(NULL)</pay_time>
  </row>
</data>

trade_mq_consumer_log: 下单失败, 向mq推送了一条下单失败的消息, 消费者消费消息后进行数据回退.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <msg_id>C0A8000472F818B4AAC28F8BD1D70000</msg_id>
    <group_name>order_orderTopic_cancel_group</group_name>
    <msg_tag>order_cancel</msg_tag>
    <msg_key>750137168931983360</msg_key>
    <msg_body>{"couponId":345988230098857984,"goodsId":345959443973935104,"goodsNum":1,"orderId":750137168931983360,"userId":345963634385633280,"userMoney":100}</msg_body>
    <consumer_status>1</consumer_status> <!--消费状态 0:正在处理;1:处理成功;2:处理失败-->
    <consumer_times>0</consumer_times>
    <consumer_timestamp>(NULL)</consumer_timestamp>
    <remark>(NULL)</remark>
  </row>
</data>

trade_coupon: 优惠券状态还是未使用状态.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_price>20.00</coupon_price>
    <user_id>345963634385633280</user_id>
    <order_id>(NULL)</order_id>
    <is_used>0</is_used>  <!--是否使用 0未使用 1已使用-->
    <used_time>(NULL)</used_time>
  </row>
</data>

trade_user: 用户余额也没有回退成功, 没有减少.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <user_id>345963634385633280</user_id>
    <user_name>刘备</user_name>
    <user_password>123L</user_password>
    <user_mobile>18888888888L</user_mobile>
    <user_score>100</user_score>
    <user_reg_time>2019-07-09 13:37:03</user_reg_time>
    <user_money>1000</user_money>
  </row>
</data>

trade_user_money_log: 用户余额日志表新增一条退款记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <user_id>345963634385633280</user_id>
    <order_id>750137168931983360</order_id>
    <money_log_type>1</money_log_type>
    <use_money>100.00</use_money>
    <create_time>2022-07-28 20:58:25</create_time>
  </row>
  <row>
    <user_id>345963634385633280</user_id>
    <order_id>750137168931983360</order_id>
    <money_log_type>2</money_log_type> <!--日志类型 1订单付款 2 订单退款-->
    <use_money>100.00</use_money>
    <create_time>2022-07-28 20:58:27</create_time>
  </row>
</data>

trade_goods: 商品库存已恢复, 没有减少.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <goods_id>345959443973935104</goods_id>
    <goods_name>JavaSE课程</goods_name>
    <goods_number>1000</goods_number>
    <goods_price>1000.00</goods_price>
    <goods_desc>传智播客出品Java视频课程</goods_desc>
    <add_time>2019-07-09 20:38:00</add_time>
  </row>
  <row>
    <goods_id>345959443973935105</goods_id>
    <goods_name>华为P30</goods_name>
    <goods_number>1000</goods_number> <!--商品库存还是1000-->
    <goods_price>5000.00</goods_price>
    <goods_desc>夜间拍照更美</goods_desc>
    <add_time>2019-07-09 20:38:00</add_time>
  </row>
</data>

5. 支付业务

5.1 创建支付订单

支付业务流程图
在这里插入图片描述

代码实现

支付服务接口新增创建支付信息方法 com.crysw.api.IPayService#createPayment

package com.crysw.api;

import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;

/**
 * 描述:支付服务接口
 * @author crysw
 * @date 2022/7/28 21:56
 * @version 1.0
 */
public interface IPayService {

    /**
     * 创建支付信息
     * @param tradePay 订单支付信息
     */
    Result createPayment(TradePay tradePay);
}

创建支付信息的实现

@Override
public Result createPayment(TradePay tradePay) {
   if (tradePay == null || tradePay.getOrderId() == null) {
      CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
   }
   // 查询订单支付状态
   TradePayExample payExample = new TradePayExample();
   TradePayExample.Criteria criteria = payExample.createCriteria();
   criteria.andOrderIdEqualTo(tradePay.getOrderId())
      .andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
   int count = tradePayMapper.countByExample(payExample);
   if (count > 0) {
      CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
   }

   // 设置订单的未支付信息
   tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
   tradePay.setPayId(idWorker.nextId());
   // 保存支付订单
   tradePayMapper.insert(tradePay);

   return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

5.2 支付回调

5.2.1 流程分析

在这里插入图片描述

5.2.2 代码实现

支付服务接口新增支付回调方法 com.crysw.api.IPayService#callbackPayment

package com.crysw.api;

import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;

/**
 * 描述:支付服务接口
 * @author crysw
 * @date 2022/7/28 21:56
 * @version 1.0
 */
public interface IPayService {

    // ..... 省略部分代码

    /**
     * 支付接口的回调
     * @param tradePay
     * @return
     */
    Result callbackPayment(TradePay tradePay);
}

支付回调的实现 com.crysw.shop.service.impl.PayServiceImpl#callbackPayment

@Override
public Result callbackPayment(TradePay tradePay) {
    log.info(">>>支付回调");
    if (tradePay == null || tradePay.getOrderId() == null) {
        CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
    }
    //1. 判断用户支付状态
    if (tradePay.getIsPaid().intValue() == ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()) {
        //2. 更新支付订单状态为已支付
        Long payId = tradePay.getPayId();
        TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
        //判断支付订单是否存在
        if (pay == null) {
            CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
        }
        pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        int r = tradePayMapper.updateByPrimaryKeySelective(pay);
        if (r == 1) {
            //3. 创建支付成功的消息
            TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
            tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
            tradeMqProducerTemp.setGroupName(groupName);
            tradeMqProducerTemp.setMsgTopic(topic);
            tradeMqProducerTemp.setMsgTag(tag);
            tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
            tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
            tradeMqProducerTemp.setCreateTime(new Date());
            //4. 将消息持久化数据库
            mqProducerTempMapper.insert(tradeMqProducerTemp);
            log.info(">>>将支付成功消息持久化到数据库");

            //在线程池中进行处理(异步处理)
            threadPoolTaskExecutor.submit(() -> {
                //5. 发送消息到MQ
                SendResult result = null;
                try {
                    result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
                } catch (Exception e) {
                    e.printStackTrace();
                }

                if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
                    log.info(">>>消息发送成功");
                    //6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
                    mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
                    log.info(">>>持久化到数据库的消息删除");
                }
            });
        }
        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } else {
        // 状态为未支付
        CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
        return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
}

/**
 * 发送支付成功消息
 * @param topic
 * @param tag
 * @param key
 * @param body
 * @return
 */
private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    if (StringUtils.isEmpty(topic)) {
        CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
    }
    if (StringUtils.isEmpty(body)) {
        CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
    }

    Message message = new Message(topic, tag, key, body.getBytes());
    return rocketMQTemplate.getProducer().send(message);
}

上面推送MQ消息, 以及等待响应后删除支付成功的数据信息是比较耗时的操作, 如果在高并发场景下影响性能,所以使用了线程池进行异步处理。需要在主启动类或配置类上增加线程池的实例对象。

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(100);
    executor.setKeepAliveSeconds(60);
    executor.setThreadNamePrefix("Pool-A");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

5.2.3 处理消息

支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行后续处理。比如:

  • 订单服务修改订单状态为已支付

  • 日志服务记录支付日志

  • 用户服务负责给用户增加积分

在这里插入图片描述

以下用订单服务为例说明消息的处理情况:

1)配置RocketMQ属性值

mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group

2)消费消息

在订单服务中新增支付成功的消息监听, 收到MQ发送的支付成功消息后进行订单支付状态的修改操作.

package com.crysw.shop.mq;

import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeOrderMapper;
import com.crysw.shop.pojo.TradeOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

/**
 * 描述:MQ消息-支付成功的监听消费
 * @author crysw
 * @date 2022/8/11 22:45
 * @version 1.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}", consumerGroup = "${mq.pay.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class PaymentListener implements RocketMQListener<MessageExt> {

    @Autowired
    private TradeOrderMapper orderMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        log.info(">>>接收到支付成功消息");
        try {
            //1. 解析消息内容
            String body = new String(messageExt.getBody(), "utf-8");
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            //2. 查询订单
            TradeOrder order = orderMapper.selectByPrimaryKey(mqEntity.getOrderId());
            //3.更新订单状态为已支付
             order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
            orderMapper.updateByPrimaryKey(order);
            log.info(">>>订单状态设置为已支付");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.info(">>>订单状态修改失败");
        }
    }
}

5.3 单元测试

编写支付服务接口的单元测试

package com.crysw.test;

import com.crysw.api.IPayService;
import com.crysw.constant.ShopCode;
import com.crysw.shop.PayApplication;
import com.crysw.shop.pojo.TradePay;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.math.BigDecimal;

/**
 * 描述:支付服务测试
 0 * @author crysw
 * @date 2022/8/12 21:13
 * @version 1.0
 */
@SpringBootTest(classes = {PayApplication.class})
@RunWith(SpringRunner.class)
public class PayServiceTest {

    @Autowired
    private IPayService payService;

    // 创建订单的待支付信息
    @Test
    public void createPayment() {
        long orderId = 750137168931983360L;
        TradePay tradePay = new TradePay();
        tradePay.setOrderId(orderId);
        tradePay.setPayAmount(new BigDecimal(880));
        payService.createPayment(tradePay);
    }

    @Test
    public void callbackPayment() throws IOException {
        long payId = 4;
        long orderId = 2;
        TradePay tradePay = new TradePay();
        tradePay.setPayId(payId);
        tradePay.setOrderId(orderId);
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        payService.callbackPayment(tradePay);
        System.in.read();
    }
}

执行com.crysw.test.PayServiceTest#createPayment, 创建订单的待支付信息. 执行完成后支付表trade_pay会新增一条待支付的记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <pay_id>755868116554227712</pay_id>  <!--支付ID-->
    <order_id>750137168931983360</order_id> <!--订单ID-->
    <pay_amount>880.00</pay_amount> <!--支付金额-->
    <is_paid>0</is_paid> <!--支付状态:0-未付款-->
  </row>
</data>

然后执行单元测试模拟支付成功的回调, 向MQ发送支付成功的消息.

2022-08-13 17:50:34.271  INFO 15136 --- [main] c.c.shop.service.impl.PayServiceImpl     : >>>支付回调
2022-08-13 17:50:34.301  INFO 15136 --- [main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-08-13 17:50:34.306  WARN 15136 --- [ main] com.zaxxer.hikari.util.DriverDataSource  : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation.
2022-08-13 17:50:34.543  INFO 15136 --- [main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2022-08-13 17:50:34.688  INFO 15136 --- [main] c.c.shop.service.impl.PayServiceImpl     : >>>将支付成功消息持久化到数据库
2022-08-13 17:52:50.145  INFO 15136 --- [lientSelector_1] RocketmqRemoting  : closeChannel: close the connection to remote address[192.168.65.129:10911] result: true

数据库trade_mq_producer_temp消息生产临时表新增一条支付成功的消息记录. 从rocektMQ-console控制台也可以查询到发送的支付成功的消息.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <id>755888109555687424</id>
    <group_name>payProducerGroup</group_name>
    <msg_topic>payTopic</msg_topic>
    <msg_tag>paid</msg_tag>
    <msg_key>755868116554227712</msg_key>
    <msg_body>{"isPaid":2,"orderId":750137168931983360,"payId":755868116554227712}</msg_body>
    <msg_status>(NULL)</msg_status>
    <create_time>2022-08-13 17:50:35</create_time>
  </row>
</data>

在这里插入图片描述

启动优惠券服务, 商品服务, 用户服务, 订单服务 . 订单服务中的消费者监听到支付成功的消息后, 会进行更新订单支付状态的操作.

从debug断点可以看到消费消息的过程.
在这里插入图片描述
消费日志

2022-08-13 18:12:33.637  INFO 6280 --- [MessageThread_1] com.crysw.shop.mq.PaymentListener        : >>>接收到支付成功消息
2022-08-13 18:14:36.533  WARN 6280 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=2m6s521ms847µs400ns).
2022-08-13 18:14:36.547  INFO 6280 --- [MessageThread_1] com.crysw.shop.mq.PaymentListener        : >>>订单状态设置为已支付

订单表trade_order的订单支付状态已更新

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <order_id>750137168931983360</order_id>
    <user_id>345963634385633280</user_id>
    <order_status>1</order_status>
    <pay_status>2</pay_status> <!--支付状态: 0未支付 1支付中 2已支付 -->
    <address>北京</address>
    <goods_id>345959443973935104</goods_id>
    <goods_number>1</goods_number>
    <goods_price>1000.00</goods_price>
    <shipping_fee>0.00</shipping_fee>
    <order_amount>1000.00</order_amount>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_paid>20.00</coupon_paid>
    <money_paid>100.00</money_paid>
    <pay_amount>880.00</pay_amount>
    <add_time>2022-07-28 20:58:24</add_time>
  </row>
</data>

6. 整体联调

通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作. 首先创建shop-order-web和shop-pay-web模块.

6.1 准备工作

6.1.1 编写配置

编写配置类, 提供RestTemplate实例, 用来模拟web端的http请求调用.

package com.crysw.shop.config;


import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestOperations;
import org.springframework.web.client.RestTemplate;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;

/**
 * 描述:RestTemplate配置类
 * @author crysw
 * @date 2022/8/13 22:04
 * @version 1.0
 */
@Configuration
public class RestTemplateConfig {
    @Bean
    @ConditionalOnMissingBean({RestOperations.class, RestTemplate.class})
    public RestTemplate restTemplate(ClientHttpRequestFactory factory) {

        RestTemplate restTemplate = new RestTemplate(factory);

        // 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")
        List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();
        Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();
        while (iterator.hasNext()) {
            HttpMessageConverter<?> converter = iterator.next();
            if (converter instanceof StringHttpMessageConverter) {
                iterator.remove();
            }
        }
        messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));
        return restTemplate;
    }

    @Bean
    @ConditionalOnMissingBean({ClientHttpRequestFactory.class})
    public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // ms
        factory.setReadTimeout(15000);
        // ms
        factory.setConnectTimeout(15000);
        return factory;
    }
}

shop-order-web配置请求地址

server.host=http://localhost
server.servlet.path=/order-web
server.port=8086
# dubbo
spring.application.name=dubbo-order-consumer
spring.dubbo.application.id=dubbo-order-consumer
spring.dubbo.application.name=dubbo-order-consumer
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
###  使用自定义方式获取端口,是因为使用@Value注解获取server.port会获取到系统默认的-1,还没有解析到端口就返回了,导致url不对
order.port=8086
shop.order.baseURI=${server.host}:${order.port}${server.servlet.path}
shop.order.confirm=/order/confirm

shop-pay-web配置请求地址

server.host=http://localhost
server.servlet.path=/pay-web
server.port=8087
# dubbo
spring.application.name=dubbo-pay-consumer
spring.dubbo.application.id=dubbo-pay-consumer
spring.dubbo.application.name=dubbo-pay-consumer
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
# url
pay.port=8087
shop.pay.baseURI=${server.host}:${pay.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callBackPayment

6.1.2 编写controller

编写订单web服务

package com.crysw.shop.controller;

import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.api.IOrderService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 描述:订单web服务
 * @author crysw
 * @date 2022/8/13 22:05
 * @version 1.0
 */
@RestController
@RequestMapping("/order")
public class OrderController {

    @Reference
    private IOrderService orderService;

    /**
     * 确认订单
     * @param order
     * @return
     */
    @RequestMapping("/confirm")
    public Result confirmOrder(@RequestBody TradeOrder order) {
        return orderService.confirmOrder(order);
    }
}

编写支付web服务

package com.crysw.shop.controller;

import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.api.IPayService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 描述:支付web服务
 * @author crysw
 * @date 2022/8/14 21:58
 * @version 1.0
 */
@RestController
@RequestMapping(value = "/pay")
public class PayController {

    @Reference
    private IPayService payService;

    /**
     * 创建订单支付信息
     * @param pay
     * @return
     */
    @RequestMapping("/createPayment")
    public Result createPayment(@RequestBody TradePay pay) {
        return payService.createPayment(pay);
    }

    /**
     * 支付成功的回调操作
     * @param pay
     * @return
     * @throws Exception
     */
    @RequestMapping("/callBackPayment")
    public Result callBackPayment(@RequestBody TradePay pay) throws Exception {
        return payService.callbackPayment(pay);
    }
}

6.2 下单测试

shop-order-web编写测试代码, 模拟web界面发送http请求进行下单操作.

package com.crysw.test;

import com.crysw.shop.OrderWebApplication;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;

import java.math.BigDecimal;

/**
 * 描述:订单web服务测试
 * @author crysw
 * @date 2022/8/14 17:53
 * @version 1.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {OrderWebApplication.class})
public class OrderWebTest {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${shop.order.baseURI}")
    private String baseURI;

    @Value("${shop.order.confirm}")
    private String confirmOrderPath;

    // 模拟界面发送http请求, 访问controller
    @Test
    public void confirmOrder() {
        Long coupouId = 345988230098857984L;
        Long goodsId = 345959443973935104L;
        Long userId = 345963634385633280L;

        // 创建订单
        TradeOrder order = new TradeOrder();
        order.setGoodsId(goodsId);
        order.setUserId(userId);
        order.setCouponId(coupouId);
        order.setAddress("北京");
        order.setGoodsNumber(1);
        // 商品价格
        order.setGoodsPrice(new BigDecimal(1000));
        // 运费
        order.setShippingFee(BigDecimal.ZERO);
        // 订单金额 = 商品价格*数量 + 运费
        order.setOrderAmount(new BigDecimal(1000));
        // 已支付金额
        order.setMoneyPaid(new BigDecimal(100));

        Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();
        System.out.println(result);
    }
}

订单服务的测试日志

2022-08-14 18:24:09.179  INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl   : 校验订单通过
2022-08-14 18:24:09.210  INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl   : 订单: 756258946880245760扣减库存成功
2022-08-14 18:24:09.224  INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl   : 订单:756258946880245760,使用优惠券
2022-08-14 18:24:09.237  INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl   : 订单:756258946880245760,扣减余额成功
2022-08-14 18:24:09.243  INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl   : 订单:[756258946880245760]状态修改(确认)成功

查看库表trade_order也生成了一笔已确认的订单.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <order_id>756258946880245760</order_id>
    <user_id>345963634385633280</user_id>
    <order_status>1</order_status> <!--订单状态 0未确认 1已确认 2已取消 3无效 4退款-->
    <pay_status>0</pay_status> <!--支付状态 0未支付 1支付中 2已支付-->
    <shipping_status>(NULL)</shipping_status>
    <address>北京</address>
    <consignee>(NULL)</consignee>
    <goods_id>345959443973935104</goods_id>
    <goods_number>1</goods_number>  <!--订单的商品数量 1-->
    <goods_price>1000.00</goods_price>
    <goods_amount>(NULL)</goods_amount>
    <shipping_fee>0.00</shipping_fee>
    <order_amount>1000.00</order_amount>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_paid>20.00</coupon_paid>
    <money_paid>100.00</money_paid> <!-- 已使用用户余额支付100-->
    <pay_amount>880.00</pay_amount> <!--减去20优惠券, 订单金额880-->
    <add_time>2022-08-14 18:24:09</add_time>
    <confirm_time>2022-08-14 18:24:09</confirm_time>
    <pay_time>(NULL)</pay_time>
  </row>
</data>

查看trade_coupon表, 优惠券已被使用.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_price>20.00</coupon_price>
    <user_id>345963634385633280</user_id>
    <order_id>756258946880245760</order_id>
    <is_used>1</is_used> <!--是否使用 0未使用 1已使用-->
    <used_time>2022-08-14 18:24:09</used_time>
  </row>
</data>

查看trade_goods表, 库存扣减1.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <goods_id>345959443973935104</goods_id>
    <goods_name>JavaSE课程</goods_name>
    <goods_number>999</goods_number> <!--库存减少了1,剩余999-->
    <goods_price>1000.00</goods_price>
    <goods_desc>传智播客出品Java视频课程</goods_desc>
    <add_time>2019-07-09 20:38:00</add_time>
  </row>
</data>

查看trade_user表, 用户余额扣减100.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <user_id>345963634385633280</user_id>
    <user_name>刘备</user_name>
    <user_password>123L</user_password>
    <user_mobile>18888888888L</user_mobile>
    <user_score>100</user_score>
    <user_reg_time>2019-07-09 13:37:03</user_reg_time>
    <user_money>900</user_money> <!--用户余额扣减100, 剩余900-->
  </row>
</data>

查看trade_goods_number_log表, 新增一条商品订单操作记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <goods_id>345959443973935104</goods_id> <!--商品ID-->
    <order_id>756258946880245760</order_id> <!--订单ID-->
    <goods_number>-1</goods_number> <!--商品数量, 负数表示扣减-->
    <log_time>2022-08-14 18:24:09</log_time>
  </row>
</data>

6.3 支付测试

shop-pay-web编写测试代码, 模拟web界面发送http请求进行下单操作.

package com.crysw.test;

import com.crysw.constant.ShopCode;
import com.crysw.shop.PayWebApplication;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;

import java.math.BigDecimal;

/**
 * 描述:支付web服务测试
 * @author crysw
 * @date 2022/8/14 22:12
 * @version 1.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {PayWebApplication.class})
public class PayWebTest {

    @Autowired
    private RestTemplate restTemplate;

    @Value("${shop.pay.baseURI}")
    private String baseURI;

    @Value("${shop.pay.createPayment}")
    private String createPaymentPath;

    @Value("${shop.pay.callbackPayment}")
    private String callBackPaymentPath;

    /**
     * 创建订单支付信息测试
     */
    @Test
    public void createPayment() {
        long orderId = 756258946880245760L;
        TradePay tradePay = new TradePay();
        tradePay.setOrderId(orderId);
        tradePay.setPayAmount(new BigDecimal(880));
        Result result = restTemplate.postForEntity(baseURI + createPaymentPath, tradePay, Result.class).getBody();
        System.out.println(result);
    }

    /**
     * 支付成功的回调操作测试
     */
    @Test
    public void callbackPayment() {
        long payId = 756324893167067136L;
        long orderId = 756258946880245760L;
        TradePay tradePay = new TradePay();
        tradePay.setPayId(payId);
        tradePay.setOrderId(orderId);
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        Result result = restTemplate.postForEntity(baseURI + callBackPaymentPath, tradePay, Result.class).getBody();
        System.out.println(result);
    }
}

启动shop-pay-service, shop-pay-web服务, 执行createPayment单元测试, 查看库表trade_pay新增一条订单待支付的记录.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <pay_id>756324893167067136</pay_id> <!--支付ID-->
    <order_id>756258946880245760</order_id> <!--订单ID-->
    <pay_amount>880.00</pay_amount> <!--支付金额-->
    <is_paid>0</is_paid> <!--支付状态: 0-未付款 1-正在付款 2-已付款-->
  </row>
</data>

执行callbackPayment单元测试, 查看库表trade_pay的支付状态已经更新为已付款.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <pay_id>756324893167067136</pay_id> <!--支付ID-->
    <order_id>756258946880245760</order_id> <!--订单ID-->
    <pay_amount>880.00</pay_amount> <!--支付金额-->
    <is_paid>2</is_paid> <!--支付状态: 0-未付款 1-正在付款 2-已付款-->
  </row>
</data>

查看库表trade_order, 订单的支付状态也更新为已支付.

<?xml version="1.0" encoding="utf-8"?>
<data>
  <row>
    <order_id>756258946880245760</order_id>
    <user_id>345963634385633280</user_id>
    <order_status>1</order_status>
    <pay_status>2</pay_status>  <!--支付状态 0未支付 1支付中 2已支付-->
    <address>北京</address>
    <goods_id>345959443973935104</goods_id>
    <goods_number>1</goods_number>
    <goods_price>1000.00</goods_price>
    <shipping_fee>0.00</shipping_fee>
    <order_amount>1000.00</order_amount>
    <coupon_id>345988230098857984</coupon_id>
    <coupon_paid>20.00</coupon_paid>
    <money_paid>100.00</money_paid>
    <pay_amount>880.00</pay_amount>
    <add_time>2022-08-14 18:24:09</add_time>
    <confirm_time>2022-08-14 18:24:09</confirm_time>
  </row>
</data> 

文章作者: 王子
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 王子 !
评论
  目录