00 拼团交易平台系统

拼团交易平台系统

拼团需求分析

项目背景

拼团系统可以用于,这类带有支付场景购买商品的系统。我们这里以这样两个系统作为使用场景作为举例。也可以作用于任何其他的交易类系统

针对目前的小型支付商城系统,商品购买交易同比增速放缓,需要引入新的营销策略促进商品交易量。在交易数据统计分析中得到,市场存在同类竞品,商品价格设定低于目前我们的商品定价,所以用户购买意愿偏低

所以为了盘活沉睡用户,需要适当降低商品价格。但为了达到传播的效果,所以需要引入拼团方式,以客带客,靠用户自身传播的方式进行交易拉新。这样的处理方式对比于 KOL,会让利商品价值到用户自身。【KOL 等同于抖音大主播直播卖货】

产品方案

因为我们所实现的是一个平台类系统,可以满足各类交易场景的拼团需求接入。所以在实现这套系统时候,不要与其他系统耦合。并提供相关的研发侧对接标准。

一般早期运营,先把商品价格促销大家,提高拼团的稀缺性。让大量用户知道拼团商品,之后调整为到期即可成团,只要参与就能拿到商品。

1.1 商品前端界面

进入商品页后,查询是否配置了拼团活动。并进行优惠试算,拼团成团价,最低优惠展示。

参与首次拼团、参与拼团中拼团。拼团完成则不在展示此条拼团。

所有参与中的拼团统计拼团人员。

1.2 运营后台管理

提供配置需要提供如图所示信息。

人群标签为提前设定标签类型,产生对应的人群数据。

1.3 功能流程

首先,由运营配置商品拼团活动,增加折扣方式。因为有人群标签的过滤,所以可以控制哪些人可参与拼团。

之后,用户可见拼团商品并参与拼团。用户可自主分享拼团或者等待拼团。因为拼团有非常大的折扣刺激用户自主分享,以此可以节省营销推广费用。

最后,拼团完成,触达商品发货。这里有两种,一种运营手段是拼团成团稀有性,必须打成拼团才可以。另外一种是虚拟拼团,无论是否打成,到时都完成拼团。

拼团库表设计

进入公司接触新的项目时,可以先从库表进行了解。知道它们的流转关系,之后在看系统设计和代码实现会更加清晰

库表关系

首先,站在运营的角度,要为这次拼团配置对应的拼团活动。那么就会涉及到:给哪个渠道的什么商品ID配置拼团,这样用户在进入商品页就可以看到带有拼团商品的信息了。之后要考虑,这个拼团的商品所提供的规则信息,包括:折扣、时间、人数等。还要拿到折扣的一个试算金额。这个试算出来的金额,就是告诉用户,通过拼团可以拿到的最低价格。

之后,站在用户的角度,是参与拼团。首次发起一个拼团与参与已存在的拼团进行数据的记录,达成拼团约定拼团人数后,开始进行通知。这个通知的设计站在平台角度可以提供回调,那么任何的系统也就都可以接入了。

另外,为了支撑这套库表,也会有人群的设计。人群是互联网公司中非常常用的手段,比如:要把所有符合某个条件的用户ID全部写入到一个特定的 Redis 记录中,之后就可以专门为这些人做特定的拼团活动了。

那么,拼团活动表,为什么会把折扣拆分出来呢?因为这里的折扣可能有多种迭代到一个拼团上。比如:给一个商品添加了直减10元的优惠,又对符合的人群ID的用户,额外打9折,这样就有了2个折扣迭代。所以拆分出来会更好维护。这是对常变的元素和稳定的元素进行设计的思考。

库表设计

1.1 拼团配置表

如图,详细设计表:拼团活动表、折扣配置表、人群标签表、人群标签任务表、人群标签明细表。

拼团活动表:设定了拼团的成团规则,人群标签的使用可以限定哪些人可见,哪些人可参与。

折扣配置表:拆分出拼团优惠到一个新的表进行多条配置。如果折扣还有更多的复杂规则,则可以配置新的折扣规则表进行处理。

人群标签表:专门来做人群设计记录的,这3张表就是为了把符合规则的人群ID,也就是用户ID,全部跑任务到一个记录下进行使用。比如:黑玫瑰人群、高净值人群、拼团履约率90%以上的人群等。

1.2 参与拼团表

如图,详细设计表:拼团账户表、用户拼单表、用户拼单明细、回调任务表

拼团账户表:记录用户的拼团参与数据,一个是为了限制用户的参与拼团次数,另外是为了人群标签任务统计数据。

用户拼单表:当有用户发起首次拼单的时候,产生拼单ID,并记录所需成团的拼单记录,另外是写上拼团的状态、唯一索引、回调接口等。这样拼团完成就可以回调对接的平台,通知完成了。【微信支付也是这样的设计,回调支付结果,这样的设计可以方便平台化对接】。当再有用户参与后,则写入用户拼单明细表。直至达成拼团。

回调任务表:当拼团完成后,要做回调处理。但可能会有失败,所以加入任务的方式进行补偿。如果仍然失败,则需要对接的平台,自己查询拼团结果。

研发系统设计

需求:研发不能只是为了功能而直接开发。还要遵守一系列的流程,确保开发迭代的需求,都能平稳的交付。所以要有研发设计、要有评审、要有测试、要有预发、要有黑白名单验证和功能切量

1. 用户用例图

用例图(英语:use case diagram)是用户与系统交互的最简表示形式,展现了用户和与他相关的用例之间的关系。通过用例图,人们可以获知系统不同种类的用户和用例。用例图也经常和其他图表配合使用。

这样的用例图,它有点项目目标结果驱动,让你知道最终的产品形态要提供什么功能。所以有这样的一个指导是可以很好的完成后续的建模设计的。

站在用户视角,会有:查看拼团商品、发起拼团、参与拼团、查看拼团结果、查看商品记录。

站在运营视角,会有:配置拼团活动、配置人群标签、审核拼团配置、查看拼团流水。

2. 四色建模图

在 MVC 的系统开发中,是很少做建模的事情的。因为它都是面向过程开发,过程就是一个流程需要什么就编写什么,但随着系统的复杂,会逐步发现,A 流程、B 流程、C 流程,越来越多的流程加入,但这些流程中又存在了交叉、复用。也就是大家看到的 MVC 里的代码,总是一个功能,被复制了好多次,好多地方编写。所以使用 DDD 思想的情况,会对这些内容提前思考,进行建模设计,划分出领域。让一个个单元变得容易管理。

MVC 有点像睡大车店子(大通铺的一个大炕),DDD 划分了每家每户,每个床睡着该睡的人。

2.1 建模方式

DDD 的建模过程,是以一个用户为起点,通过行为命令,发起行为动作,串联整个业务。而这个用户的起点最初来自于用例图的分析。用例图是用户与系统交互的最简表示形式,展现了用户和与他相关的用例之间的关系。通过用例图,我们可以分析出所有的行为动作。

在 DDD 中用于完成用户的行为命令和动作分析的过程,是一个四色建模的过程,也称作风暴模型。在使用 DDD 的标准对系统建模前,一堆人要先了解 DDD 的操作手段,这样才能让产品、研发、测试、运营等了解业务的伙伴,都能在同一个语言下完成系统建模。

此图是整个四色建模的指导图,通过寻找领域事件,发起事件命令,完成领域事件的过程,完成 DDD 工程建模,左下角的示意图为:是一个用户,通过一个策略命令,使用领域对象,通过业务流程,完成 2 个领域事件,调用 1 次外部接口个过程。我们在整个 DDD 建模过程中,就是在寻找这些节点。

蓝色:决策命令,是用户发起的行为动作,如:开始签到、开始抽奖、查看额度等。

黄色:领域事件,过去时态描述。如:签到完成、抽奖完成、奖品发放完成。它所阐述的都是这个领域要完成的终态。

粉色:外部系统,如:你的系统需要调用外部的接口完成流程。

红色:业务流程,用于串联决策命令到领域事件,所实现的业务流程。一些简单的场景则直接有决策命令到领域事件就可以了。

绿色:只读模型,做一些读取数据的动作,没有写库的操作。

棕色:领域对象,每个决策命令的发起,都是含有一个对应的领域对象。

2.2 寻找事件

寻找领域事件的过程,就是寻找系统中流程节点的结果态。什么结束了、什么完成了、什么终止。其实这样的手段,也是一种目标结果驱动的手段,当有人为你指明了,你要去哪里,之后你会有更目标感的前进。

目前我们可以根据需求找到,如图中黄色部分的领域事件。包括:发起拼团完成、参与拼团完成、拼团目标达成、回调通知完成、拼团发货完成、人群标签生成完成、拼团活动创建完成。

如果后续我们在迭代新的功能的话,还会引入其他领域事件。你也可以思考,还可能存在什么领域事件。

2.3 划分领域

领域的划分是基于寻找领域事件之后,所有的领域事件都应该有一个对应的发起方,我们管这个叫决策命令。比如:你去超时购物完成,是你媳妇发起的决策命令让你去买胖东来的0添加酱油。那么,我们这里就要找到完成这些领域事件的决策命令

蓝色为以用户维度的决策命令,你发出的所有命令,都会有一个对应的结果态承接。

如果是复杂的业务,要做一些列的流程,则会出现一个 Business Policy 业务流程。这个业务流程里可以通过设计模式来完成功能的设计。经过这样的处理,你的代码也会变得非常清晰。

3. 业务流程

研发功能流程设计有一粗一细,粗的是流程图为了让大家快速的了解这些功能节点的串联关系,方便讲解的时候,可以让多方了解。细的是 UML 时序图,这个会把流程中的个细节体现出来,指导研发做功能实现。

3.1 流程图

分别以用户和运营视角,了解一套拼团的配置到用户的参与。

3.2 时序图

时序图,展示了整个拼团过程所涉及的系统模块和流转关系。这部分直接看图即可。

4. 系统架构

DDD 是软件开发设计的一套指导思想,但不是直接决定了你如何编码,而是提供了对需求设计为研发提供编码指引的手段。所以,DDD 建模后,你可以使用 3 层架构 MVC 开发,也可以使用六边形架构、整洁架构、菱形架构开发。这也就是为什么有些人的简历上,专业技能栏里会有相关架构内容的体现。

在这方面,阿里最早推出的 cola 考拉🐨架构,就是整洁架构的代表。张毅老师推出的菱形架构,南向网关、北向网关,属于六边形这一类的架构。这些都是为了更好的落地 DDD 这套指导思想而出现的架构分层设计。

所以,DDD 是指导思想,帮助你建模设计,划分领域。六边形架构、整洁架构、菱形架构,是为了帮你落地知道思想到具体编码实现上。

那么,MVC 分层架构不也可以开发 DDD 知道思想的编码吗?可以,但相对来说不会那么优雅。MVC 的出现,并不是在有了分布式微服务之后,所以一些分布式资源模块的承载和领域思想的处理,在 MVC 中都是比较别扭的。所以,我们选择新的架构形态可以更好的承载 DDD 指导思想。

如图,展示了分布式微服务架构项目,各项开发过程中所需的资源在两套分层架构中的体现。

MVC 基本是把所有的内容都用服务来承载,无论是自身的服务,还是外部的服务。这样会导致维护的混乱,长期迭代成本的增加。

六边形架构,对于MVC的劣势做了针对性的架构处理,合理的划分了各项资源在不同层的承载。domain 也更专注于领域功能的实现。也就是把以前的 service 内容,划分到 domain 中处理。

初始工程搭建

接口定义 - api :因为微服务中引用的 RPC 需要对外提供接口的描述信息,也就是调用方在使用的时候,需要引入 Jar 包,让调用方好能依赖接口的定义做代理。

应用封装 - app :这是应用启动和配置的一层,如:一些 AOP 切面或者 config 配置,以及打包镜像都是在这一层处理。你可以把它理解为专门为了启动服务而存在的。

领域封装 - domain :领域模型服务,是一个非常重要的模块。无论怎么做 DDD 的分层架构,domain 都是肯定存在的。在一层中会有一个个细分的领域服务,在每个服务包中会有【模型、仓库、服务】这样3部分。

仓储服务 - infrastructure :基础层依赖于 domain 领域层,因为在 domain 层定义了仓储接口需要在基础层实现。这是依赖倒置的一种设计方式。

领域封装 - trigger :触发器层,用于提供接口实现、消息接收、任务执行等。包括:http接口调用、job接口调用、listener接口调用。

类型定义 - types :通用类型定义层,在我们的系统开发中,会有很多类型的定义,包括:基本的 Response、Constants 和枚举。它会被其他的层进行引用使用。

试算模型抽象模板设计

1. 模型设计

这是一种链式的多分支规则树模型结构,由功能节点自行决定后续流程的执行链路。它的设计比责任链的扩展性更好,自由度也更高。

首先,定义抽象的通用的规则树模型结构。涵盖:StrategyMapper - 策略映射器、StrategyHandler - 策略处理器、AbstractStrategyRouter - 策略路由抽象类。通过泛型设计允许使用方可以自定义出入参和动态上下文,让抽象模板模型具有通用性。

之后,由使用方自定义出工厂、功能抽象类和一个个流程流转的节点。这些节点可以自由组装进行流转,相比于责任链它的实现方式更具有灵活性。

2. 编码实现

2.1 模板定义

在项目工程的 types 模块中,添加通用设计模式模板。如果是公司里的项目工程,还可以单独提供一个通用的设计模式模板工程,这样每个项目都可以引入使用了。

目前,先添加 tree 规则树模型结构。后续随着开发需要在添加其他通用设计模式

策略映射器:

12345678910111213public interface StrategyMapper { /** * 获取待执行策略 * * @param requestParameter 入参 * @param dynamicContext 上下文 * @return 返参 * @throws Exception 异常 */ StrategyHandler get(T requestParameter, D dynamicContext) throws Exception;}

StrategyMapper 的 get 方法用于获取每一个要执行的节点,相当于一个流程走完进入到下一个流程的过程。这在我们处理复杂的业务代码时是非常重要的,避免把所有逻辑都写到一个类的方法中。

泛型 T - 入参、D - 上下文、R - 返参。

策略受理器:

12345678910111213/** * @description 受理策略处理 * T 入参类型 * D 上下文参数 * R 返参类型 */public interface StrategyHandler { StrategyHandler DEFAULT = (T, D) -> null; R apply(T requestParameter, D dynamicContext) throws Exception;}

StrategyHandler 的 apply 受理执行的业务流程。每个业务流程执行时,如果有数据是从前面节点到后面节点要使用的,那么可以填充到 dynamicContext 上下文中。

策略路由器:

12345678910111213141516/** * @description 策略路由抽象类 */public abstract class AbstractStrategyRouter implements StrategyMapper, StrategyHandler { @Getter @Setter protected StrategyHandler defaultStrategyHandler = StrategyHandler.DEFAULT; public R router(T requestParameter, D dynamicContext) throws Exception { StrategyHandler strategyHandler = get(requestParameter, dynamicContext); if(null != strategyHandler) return strategyHandler.apply(requestParameter, dynamicContext); return defaultStrategyHandler.apply(requestParameter, dynamicContext); }}

通过调用策略映射器 get 方法,控制节点流程的走向。

3. 首页试算

当一个用户进入到购物首页查看商品的优惠信息,我们可以把这个过程定义为试算过程。试算:试试算一下,这个用户进入到首页看这个商品的时候,商品的营销优惠信息。包括:原始价格、折扣价格、拼团目标、拼团时效、是否可见优惠、是否可参与拼团等,这些东西都是试算拿到的结果。

类似的,当你进入到一个信贷贷款的页面,也会告诉你分期多少,还款多少期,每期多少钱。这些东西也都是试算。

在 domain 领域层,实现拼团活动服务功能。model 是对象的定义,这个对象就类似于你在 MVC 中定义的 vo、req、res 这样的对象。只不过这里一般叫 xxxEntity 实体,它是一种面向对象的思维。

trial 试算模块,把整个首页商品展示的信息都通过试算完成。后续会重点实现,本节先定义框架分层结构。

目前这里的代码还没有具体的实现代码,只有分层结构。可以直接参考工程的代码。

3.1 营销商品实体信息

12345678910111213141516171819/** * @description 营销商品实体信息,通过这样一个信息获取商品优惠信息 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class MarketProductEntity { /** 用户ID */ private String userId; /** 商品ID */ private String goodsId; /** 渠道 */ private String source; /** 来源 */ private String channel;}

3.2 试算结果实体对象

12345678910111213141516171819202122232425262728293031323334353637383940/** * @description 试算结果实体对象(给用户展示拼团可获得的优惠信息) */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class TrialBalanceEntity { /** 商品ID */ private String goodsId; /** 商品名称 */ private String goodsName; /** 原始价格 */ private BigDecimal originalPrice; // 折扣金额 private BigDecimal deductionPrice; // 支付金额 private BigDecimal payPrice; /** 拼团目标数量 */ private Integer targetCount; /** 拼团开始时间 */ private Date startTime; /** 拼团结束时间 */ private Date endTime; /** 是否可见拼团 */ private Boolean isVisible; /** 是否可参与进团 */ private Boolean isEnable;}

暂定一些试算的所需的结果,后续随着功能迭代再补充

3.3 抽象的拼团营销支撑类

123456/** * @description 抽象的拼团营销支撑类 */public abstract class AbstractGroupBuyMarketSupport extends AbstractStrategyRouter {}

3.4 活动策略工厂定义

1234567891011121314151617181920212223@Servicepublic class DefaultActivityStrategyFactory { // 获取根节点 private final RootNode rootNode; public DefaultActivityStrategyFactory(RootNode rootNode) { this.rootNode = rootNode; } // 获取根节点的方法的 public StrategyHandler strategyHandler() { return rootNode; } @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { }}

3.5 构建一个节点(根节点举例)

12345678910111213141516171819202122/** * @description 根节点 */@Slf4j@Servicepublic class RootNode extends AbstractGroupBuyMarketSupport { // 将开关节点注入 @Resource private SwitchNode switchNode; @Override public TrialBalanceEntity apply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return null; } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return null; }}

根节点,实现 AbstractGroupBuyMarketSupport 抽象类。这个抽象类目前是空实现,后续一些通用的方法会放入这个抽象类里。

3.6 创建一个开关节点

123456789101112131415161718/** * @description 开关节点,用于消费的降级 */@Slf4j@Servicepublic class SwitchNode extends AbstractGroupBuyMarketSupport { @Override public TrialBalanceEntity apply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return null; } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return null; }}

3.7 添加一个首页的方法

12345678/** * @description 首页营销服务接口 */public interface IIndexGroupBuyMarketService { TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception;}

123456789101112131415161718192021/** * @description 首页营销服务 */@Servicepublic class IndexGroupBuyMarketServiceImpl implements IIndexGroupBuyMarketService { @Resource private DefaultActivityStrategyFactory defaultActivityStrategyFactory; @Override public TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception { // 获取执行策略 StrategyHandler strategyHandler = defaultActivityStrategyFactory.strategyHandler(); // 受理试算操作 TrialBalanceEntity trialBalanceEntity = TastrategyHandler.apply(marketProductEntity, new DefaultActivityStrategyFactory.DynamicContext()); return trialBalanceEntity; }}

多线程异步数据加载

扩展规则树模型结构,增加异步数据加载区。将用于试算营销优惠的接口使用异步线程进行加载,之后写入上下文,用于后续的逻辑处理。这部分的模型设计是非常巧妙的,通过解耦逻辑和划分功能区,让代码具有了文档属性,看到对应的类和类下的方法区,就可以轻松的理解代码实现方式。这样的处理非常有利于后续功能的迭代。

首先,对通用设计模式树结构扩展出异步数据加载区,这样可以把接口实现中所需的数据前置到异步数据加载区完成加载操作。以此提高接口的响应效率。

之后,串联功能节点,并在 MarketNode 节点,添加数据加载操作。

另外,注意本节需要新增加一个表 sku,也就是商品信息表,通过商品信息表获得当前商品的价格配置,以此来做商品的折扣计算。这块在实际生产中有两种实现方式,一种是每次都调用外部接口获取商品,另外一种是有商品统一同步库可以查询。我们这里先通过一个统一的商品库进行处理。那么后续谁要对接这个系统,就调用 sku 商品库,同步好商品即可。

1. 增加多线程模块

在 types 模块的 design 模型定义中,新增加抽象类设计多线程数据加载区

12345678910111213141516171819202122232425262728293031323334353637/** * @description 异步资源加载策略 */public abstract class AbstractMultiThreadStrategyRouter implements StrategyMapper, StrategyHandler { @Getter @Setter protected StrategyHandler defaultStrategyHandler = StrategyHandler.DEFAULT; public R router(T requestParameter, D dynamicContext) throws Exception { StrategyHandler strategyHandler = get(requestParameter, dynamicContext); if(null != strategyHandler) return strategyHandler.apply(requestParameter, dynamicContext); return defaultStrategyHandler.apply(requestParameter, dynamicContext); } // 前面的和AbstractStrategyRouter这个是一样的,这里的需要重写方法 @Override public R apply(T requestParameter, D dynamicContext) throws Exception { // 异步加载数据 multiThread(requestParameter, dynamicContext); // 业务流程受理 return doApply(requestParameter, dynamicContext); } /** * 异步加载数据 */ protected abstract void multiThread(T requestParameter, D dynamicContext) throws ExecutionException, InterruptedException, TimeoutException; /** * 业务流程受理 */ protected abstract R doApply(T requestParameter, D dynamicContext) throws Exception;}

主要是添加了一个 multiThread 方法,这个方法在执行 apply 受理业务功能逻辑实现前完成对数据的加载调用。

之后在提供一个 doApply 受理的抽象方法,让使用方完成这个方法的实现即可。

这块的逻辑对新人可能有点绕,你可以在本节最后的代码测试调试时进行验证。debug 断点调试代码对编码来说非常重要

2. 节点逻辑

2.1 RootNode

1234567891011121314151617181920212223242526@Slf4j@Servicepublic class RootNode extends AbstractGroupBuyMarketSupport { @Resource private SwitchNode switchNode; @Override protected TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-RootNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); // 参数判断 // userId、goodsId、source、channel if (StringUtils.isBlank(requestParameter.getUserId()) || StringUtils.isBlank(requestParameter.getGoodsId()) || StringUtils.isBlank(requestParameter.getSource()) || StringUtils.isBlank(requestParameter.getChannel())) { throw new AppException(ResponseCode.ILLEGAL_PARAMETER.getCode(), ResponseCode.ILLEGAL_PARAMETER.getInfo()); } return router(requestParameter, dynamicContext); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return switchNode; }}

根节点一般会做数据的初始操作、信息判断、缓存处理等功能。

2.2 SwitchNode

123456789101112131415161718@Slf4j@Servicepublic class SwitchNode extends AbstractGroupBuyMarketSupport { @Resource private MarketNode marketNode; @Override public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return router(requestParameter, dynamicContext); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return marketNode; }}

开关功能的节点在后续在做实现。

2.3 MarketNode

这个节点是这个部分最重要实现的一个节点,让大家理解多线程在实际场景中的使用。

当你一个节点要执行业务逻辑时候,需要为这些业务逻辑准备接口、数据库、Redis、配置中心等各样的数据,但有些时候外部的接口依赖的很多,那么就不能一个个顺序的查询,这样会导致接口的时长很大。所以要多线程并行处理,那么所有的时长也就只是那个耗时最长的接口时长了。

首先,新增加异步 FutureTask 任务,完成对活动配置的查询,商品的查询。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657package cn.bugstack.domain.activity.service.trial.thread;/** * @description 查询营销配置任务 */public class QueryGroupBuyActivityDiscountVOThreadTask implements Callable { /** * 来源 */ private final String source; /** * 渠道 */ private final String channel; /** * 活动仓储 */ private final IActivityRepository activityRepository; public QueryGroupBuyActivityDiscountVOThreadTask(String source, String channel, IActivityRepository activityRepository) { this.source = source; this.channel = channel; this.activityRepository = activityRepository; } @Override public GroupBuyActivityDiscountVO call() throws Exception { return activityRepository.queryGroupBuyActivityDiscountVO(source, channel); }}/** * @description 查询商品信息任务 */public class QuerySkuVOFromDBThreadTask implements Callable { private final String goodsId; private final IActivityRepository activityRepository; public QuerySkuVOFromDBThreadTask(String goodsId, IActivityRepository activityRepository) { this.goodsId = goodsId; this.activityRepository = activityRepository; } @Override public SkuVO call() throws Exception { return activityRepository.querySkuByGoodsId(goodsId); }}

这里是两个实现了 Callable 接口的任务,分别查询不同的数据。

在实际的工作中,查询的不只是数据,还有很多其他方提供的 RPC、HTTP 接口。

接下来,我们就可以在 MarketNode 节点,来处理这些数据任务的调用操作。

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950@Slf4j@Servicepublic class MarketNode extends AbstractGroupBuyMarketSupport { // 线程池 @Resource private ThreadPoolExecutor threadPoolExecutor; // endNode节点 @Resource private EndNode endNode; @Override protected void multiThread(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws ExecutionException, InterruptedException, TimeoutException { // 异步查询活动配置 QueryGroupBuyActivityDiscountVOThreadTask queryGroupBuyActivityDiscountVOThreadTask = new QueryGroupBuyActivityDiscountVOThreadTask(requestParameter.getSource(), requestParameter.getChannel(), repository); // FutureTask 是 Java 并发编程中的一个重要类,它结合了 Runnable 和 Future 的特性,主要用于异步计算和任务管理,FutureTask 可以包装一个 Callable 或 Runnable 任务,将任务提交给线程池或单独线程执行,实现异步计算 FutureTask groupBuyActivityDiscountVOFutureTask = new FutureTask<>(queryGroupBuyActivityDiscountVOThreadTask); threadPoolExecutor.execute(groupBuyActivityDiscountVOFutureTask); // 异步查询商品信息 - 在实际生产中,商品有同步库或者调用接口查询。这里暂时使用DB方式查询。 QuerySkuVOFromDBThreadTask querySkuVOFromDBThreadTask = new QuerySkuVOFromDBThreadTask(requestParameter.getGoodsId(), repository); FutureTask skuVOFutureTask = new FutureTask<>(querySkuVOFromDBThreadTask); threadPoolExecutor.execute(skuVOFutureTask); // 写入上下文 - 对于一些复杂场景,获取数据的操作,有时候会在下N个节点获取,这样前置查询数据,可以提高接口响应效率 // 这块是将上面创建的两个数据写到上下文环境中 // 设置一个超时时间 dynamicContext.setGroupBuyActivityDiscountVO(groupBuyActivityDiscountVOFutureTask.get(timeout, TimeUnit.MINUTES)); dynamicContext.setSkuVO(skuVOFutureTask.get(timeout, TimeUnit.MINUTES)); log.info("拼团商品查询试算服务-MarketNode userId:{} 异步线程加载数据「GroupBuyActivityDiscountVO、SkuVO」完成", requestParameter.getUserId()); } @Override public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return null; } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return endNode; }}

multiThread 方法中,启动对异步数据的查询处理,之后在使用动态上下文承接数据。

doApply 受理业务流程的实现放在后续在处理。

threadPoolExecutor 线程池配置的是 CallerRunsPolicy 策略。当线程池中的任务队列已满,并且没有空闲线程可以执行新任务时,CallerRunsPolicy 会将任务回退到调用者线程中运行。这种策略适用于不希望丢失任务且可以接受调用者线程被阻塞的场景。

新增一个活动仓储接口方法类

123456789package cn.bugstack.domain.activity.adapter.repository;/** * @description 活动仓储 */public interface IActivityRepository {}

新增里面的传值VO对象,这其实是属于一种防腐的设计,不把数据库的对象暴露出来,因为在开发的过程中,传入的对象是不断变化的

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124package cn.bugstack.domain.activity.model.valobj;/** * @description 拼团活动营销配置值对象 */@Getter@Builder@AllArgsConstructor@NoArgsConstructorpublic class GroupBuyActivityDiscountVO { /** * 活动ID */ private Long activityId; /** * 活动名称 */ private String activityName; /** * 来源 */ private String source; /** * 渠道 */ private String channel; /** * 商品ID */ private String goodsId; /** * 折扣配置,就是下面那个内部类 */ private GroupBuyDiscount groupBuyDiscount; /** * 拼团方式(0自动成团、1达成目标拼团) */ private Integer groupType; /** * 拼团次数限制 */ private Integer takeLimitCount; /** * 拼团目标 */ private Integer target; /** * 拼团时长(分钟) */ private Integer validTime; /** * 活动状态(0创建、1生效、2过期、3废弃) */ private Integer status; /** * 活动开始时间 */ private Date startTime; /** * 活动结束时间 */ private Date endTime; /** * 人群标签规则标识 */ private String tagId; /** * 人群标签规则范围 */ private String tagScope; // 折扣类 @Getter @Builder @AllArgsConstructor @NoArgsConstructor public static class GroupBuyDiscount { /** * 折扣标题 */ private String discountName; /** * 折扣描述 */ private String discountDesc; /** * 折扣类型(0:base、1:tag) */ private DiscountTypeEnum discountType; /** * 营销优惠计划(ZJ:直减、MJ:满减、N元购) */ private String marketPlan; /** * 营销优惠表达式 */ private String marketExpr; /** * 人群标签,特定优惠限定 */ private String tagId; }}

添加 Sku 库表信息,就是商品库的信息,然后写一个查询的方法接口

123456789101112131415161718192021222324252627282930313233343536package cn.bugstack.infrastructure.dao.po;/** * @description 商品信息 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class Sku { /** 自增 */ private Long id; /** 来源 */ private String source; /** 渠道 */ private String channel; /** 商品ID */ private String goodsId; /** 商品名称 */ private String goodsName; /** 原始价格 */ private BigDecimal originalPrice; /** 创建时间 */ private Date createTime; /** 更新时间 */ private Date updateTime;}

1234567891011package cn.bugstack.infrastructure.dao;/** * @description 商品查询 */@Mapperpublic interface ISkuDao { Sku querySkuByGoodsId(String goodsId);}

2.4 EndNode

用于封装最终返回的结果数据。

1234567891011121314151617181920212223242526272829303132@Slf4j@Servicepublic class EndNode extends AbstractGroupBuyMarketSupport { // 这里属于结尾的节点,就是封装一下最终的数据 @Override public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-EndNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = dynamicContext.getGroupBuyActivityDiscountVO(); SkuVO skuVO = dynamicContext.getSkuVO(); // 返回空结果 return TrialBalanceEntity.builder() .goodsId(skuVO.getGoodsId()) .goodsName(skuVO.getGoodsName()) .originalPrice(skuVO.getOriginalPrice()) .deductionPrice(new BigDecimal("0.00")) .targetCount(groupBuyActivityDiscountVO.getTarget()) .startTime(groupBuyActivityDiscountVO.getStartTime()) .endTime(groupBuyActivityDiscountVO.getEndTime()) .isVisible(false) .isEnable(false) .build(); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return defaultStrategyHandler; }}

返回的数据可以由动态上下文中的结果进行封装。

暂时我们这里只把可以获取到的数据做一个处理。关于优惠金额以及可见性等参数mock写入即可

策略模式优惠折扣计算

通过策略模式处理多类型折扣方式的逻辑计算,同时设定抽象模板,用于扩展后续人群标签的过滤。

1. 模型设计

首先,MarketNode 节点的数据异步加载工作已经完成,这一部分开始使用这里的数据做折扣计算。

之后,折扣是在数据库中配置的,按照类型包括:ZJ - 直减、MJ - 满减、ZK - 折扣、N - n元购。那么这些不同的类型就可以用策略模型进行包装,每个实现类专门负责自己的逻辑计算。

2. 编码实现

2.1 工程结构

service 包下,增加 discount 折扣计算包,提供不同类型的折扣计算服务。

之后再 MarketNode 节点做逻辑调用。计算完结果后在写上下文,便于最后在 EndNode 节点填充。

2.2 折扣策略 - MJ、N、ZJ、ZK

折扣的方式可以有:ZJ、MJ、ZK、N,以及多种方式。这里的 market_expr 是计算的公式,比如:MJ - 满减这样的,就是满100元减10元。

概念:像是这种营销折扣方式,是一种商家配置无券营销方式,用户是无感知券但可以使用优惠的。另外一种就是有券,用户主动拿到手里的券,自己选择消费方式。

2.3 折扣优惠类型枚举值编写

1234567891011121314151617181920212223242526272829package cn.bugstack.domain.activity.model.valobj;/** * @description 折扣优惠类型 */@Getter@AllArgsConstructor@NoArgsConstructorpublic enum DiscountTypeEnum { BASE(0, "基础优惠"), TAG(1, "人群标签"), ; private Integer code; private String info; public static DiscountTypeEnum get(Integer code) { switch (code) { case 0: return BASE; case 1: return TAG; default: throw new RuntimeException("err code!"); } }}

2.4 折扣服务的接口

123456789101112131415/** * @description 折扣计算服务 */public interface IDiscountCalculateService { /** * 折扣计算 * * @param userId 用户ID * @param originalPrice 商品原始价格 * @param groupBuyDiscount 折扣计划配置 * @return 商品优惠价格 */ BigDecimal calculate(String userId, BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount);}

定义折扣计算的接口,包括:用户ID、商品原始价格、折扣计划配置。

用户ID,主要用于后续做人群标签的过滤使用。

2.5 抽象模板类

1234567891011121314151617181920212223242526272829303132333435package cn.bugstack.domain.activity.service.discount;/** * @description 折扣计算服务抽象类 */@Slf4jpublic abstract class AbstractDiscountCalculateService implements IDiscountCalculateService { @Resource protected IActivityRepository repository; @Override public BigDecimal calculate(String userId, BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) { // 1. 人群标签过滤 if (DiscountTypeEnum.TAG.equals(groupBuyDiscount.getDiscountType())){ boolean isCrowdRange = filterTagId(userId, groupBuyDiscount.getTagId()); if (!isCrowdRange) { log.info("折扣优惠计算拦截,用户不再优惠人群标签范围内 userId:{}", userId); return originalPrice; } } // 2. 折扣优惠计算 return doCalculate(originalPrice, groupBuyDiscount); } // 人群过滤 - 限定人群优惠 private boolean filterTagId(String userId, String tagId) { return repository.isTagCrowdRange(tagId, userId); } protected abstract BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount);}

2.6 增加一个满减(MJ)的计算

123456789101112131415161718192021222324252627282930313233343536/** * @description 满减优惠计算 */@Slf4j@Service("MJ")public class MJCalculateService extends AbstractDiscountCalculateService { @Override public BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) { log.info("优惠策略折扣计算:{}", groupBuyDiscount.getDiscountType().getCode()); // 折扣表达式 - 100,10 满100减10元 // 这个marketExpr就是 100,10 String marketExpr = groupBuyDiscount.getMarketExpr(); String[] split = marketExpr.split(Constants.SPLIT); BigDecimal x = new BigDecimal(split[0].trim()); BigDecimal y = new BigDecimal(split[1].trim()); // 不满足最低满减约束,则按照原价 if (originalPrice.compareTo(x) < 0) { return originalPrice; } // 折扣价格 BigDecimal deductionPrice = originalPrice.subtract(y); // 判断折扣后金额,最低支付1分钱 // 如果是0元,最低支付0.01元 if (deductionPrice.compareTo(BigDecimal.ZERO) <= 0) { return new BigDecimal("0.01"); } return deductionPrice; }}

这里举例其中一个,MJ 的折扣计算。其他的可以参考代码中的实现。

满减,拿到折扣的配置表达式,拆分 100,10,之后判断商品金额是否满足100元,满足100元则可以减去10元。不过这里要知道,如果商品最终折扣价格不足1分钱,要按照1分钱计算。

2.7 MarketNode营销调用

1234567891011121314151617181920212223242526272829303132333435363738394041424344@Slf4j@Servicepublic class MarketNode extends AbstractGroupBuyMarketSupport { @Resource private ThreadPoolExecutor threadPoolExecutor; @Resource private EndNode endNode; /** * Spring 注入详细说明 */ // 只要是这个接口下面的实现,都会依次注入进来 // 这个名字就是MJCalculateService、NCCalculateService、ZJCalculateService、ZKCalculateService @Resource private Map discountCalculateServiceMap; @Override public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-MarketNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); // 获取上下文数据 GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = dynamicContext.getGroupBuyActivityDiscountVO(); GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount = groupBuyActivityDiscountVO.getGroupBuyDiscount(); SkuVO skuVO = dynamicContext.getSkuVO(); // 优惠试算 IDiscountCalculateService discountCalculateService = discountCalculateServiceMap.get(groupBuyDiscount.getMarketPlan()); if (null == discountCalculateService) { log.info("不存在{}类型的折扣计算服务,支持类型为:{}", groupBuyDiscount.getMarketPlan(), JSON.toJSONString(discountCalculateServiceMap.keySet())); throw new AppException(ResponseCode.E0001.getCode(), ResponseCode.E0001.getInfo()); } // 折扣后的价格 BigDecimal deductionPrice = discountCalculateService.calculate(requestParameter.getUserId(), skuVO.getOriginalPrice(), groupBuyDiscount); dynamicContext.setDeductionPrice(deductionPrice); return router(requestParameter, dynamicContext); } // ... 省略部分代码}

private Map discountCalculateServiceMap 是一种 Spring 的 Map 注入方式,会根据一个接口把所有的实现类都注入上,之后 Key 是 Bean 的名字

这样我们就可以根据数据库配置的折扣的类型来调用对应的策略,也就把 if...else 这样的判断代码给去掉了

人群标签数据采集

需求:

以轻量化的方式构建人群标签数据,将人群数据写入到 Redis BitMap 用于后续使用

在公司中,所有部门产生的业务数据都会回流到数仓,它有一个非常庞大的数据集市系统。之后这些数据会被量化分析师使用,通过 R 语言建模,执行模型任务,把符合模型所需的标签数据跑到一个新的指定表文件中,这些文件在通过加工存放到 Redis BitMap 进行使用。一般一个标签可能会有 50万、100万、500万的数据规模

有了这些标签数据,运营人员就可以精准的对这些用户做定向活动投放,比如;特定的券、特定的通知等。以此达到更加精准的运营效果

1. 业务流程

首先,人群标签是通过创建的采集任务所产生的数据。任务里包含了要采集业务中什么类型的数据规则。本项目中会采集拼团交易数据,不过本节还没有这类数据,所以先来模拟这部分数据。

之后,把采集的数据除了放数据库,还需要写入到 Redis 的 BitMap 中,这个数据结构比较适合高并发场景判断用户是否存在。

2. 编码实现

需要新创建出 crowd_tags、crowd_tags_detail、crowd_tags_job 3张库表,用于统计人群标签

引入 Redisson 配置,pom引入、app/config 增加配置、infrastructure 增加 Redisson 服务接口和实现类

12345 org.redisson redisson-spring-boot-starter 3.26.0

增加三张库表:

12345678910111213141516# 转储表 crowd_tags# ------------------------------------------------------------DROP TABLE IF EXISTS `crowd_tags`;CREATE TABLE `crowd_tags` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `tag_id` varchar(32) NOT NULL COMMENT '人群ID', `tag_name` varchar(64) NOT NULL COMMENT '人群名称', `tag_desc` varchar(256) NOT NULL COMMENT '人群描述', `statistics` int(8) NOT NULL COMMENT '人群标签统计量', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_tag_id` (`tag_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签';

1234567891011121314# 转储表 crowd_tags_detail# ------------------------------------------------------------DROP TABLE IF EXISTS `crowd_tags_detail`;CREATE TABLE `crowd_tags_detail` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `tag_id` varchar(32) NOT NULL COMMENT '人群ID', `user_id` varchar(16) NOT NULL COMMENT '用户ID', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_tag_user` (`tag_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签明细';

1234567891011121314151617181920# 转储表 crowd_tags_job# ------------------------------------------------------------DROP TABLE IF EXISTS `crowd_tags_job`;CREATE TABLE `crowd_tags_job` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `tag_id` varchar(32) NOT NULL COMMENT '标签ID', `batch_id` varchar(8) NOT NULL COMMENT '批次ID', `tag_type` tinyint(1) NOT NULL DEFAULT '1' COMMENT '标签类型(参与量、消费金额)', `tag_rule` varchar(8) NOT NULL COMMENT '标签规则(限定类型 N次)', `stat_start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计数据,开始时间', `stat_end_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计数据,结束时间', `status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '状态;0初始、1计划(进入执行阶段)、2重置、3完成', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_batch_id` (`batch_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='人群标签任务';

增加redis的配置文件:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364package cn.bugstack.config;/** * Redis 客户端,使用 Redisson Redisson * */@Configuration@EnableConfigurationProperties(RedisClientConfigProperties.class)public class RedisClientConfig { @Bean("redissonClient") public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) { Config config = new Config(); // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96 config.setCodec(JsonJacksonCodec.INSTANCE); config.useSingleServer() .setAddress("redis://" + properties.getHost() + ":" + properties.getPort())// .setPassword(properties.getPassword()) .setConnectionPoolSize(properties.getPoolSize()) .setConnectionMinimumIdleSize(properties.getMinIdleSize()) .setIdleConnectionTimeout(properties.getIdleTimeout()) .setConnectTimeout(properties.getConnectTimeout()) .setRetryAttempts(properties.getRetryAttempts()) .setRetryInterval(properties.getRetryInterval()) .setPingConnectionInterval(properties.getPingInterval()) .setKeepAlive(properties.isKeepAlive()) ; return Redisson.create(config); } static class RedisCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in, SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }}

123456789101112131415161718192021222324252627282930313233package cn.bugstack.config;/** * @description Redis 连接配置 redisson-spring-boot-starter */@Data@ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true)public class RedisClientConfigProperties { /** host:ip */ private String host; /** 端口 */ private int port; /** 账密 */ private String password; /** 设置连接池的大小,默认为64 */ private int poolSize = 64; /** 设置连接池的最小空闲连接数,默认为10 */ private int minIdleSize = 10; /** 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000 */ private int idleTimeout = 10000; /** 设置连接超时时间(单位:毫秒),默认为10000 */ private int connectTimeout = 10000; /** 设置连接重试次数,默认为3 */ private int retryAttempts = 3; /** 设置连接重试的间隔时间(单位:毫秒),默认为1000 */ private int retryInterval = 1000; /** 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查 */ private int pingInterval = 0; /** 设置是否保持长连接,默认为true */ private boolean keepAlive = true;}

1234567891011121314# Redisredis: sdk: config: host: 192.168.1.109 port: 16379 pool-size: 10 min-idle-size: 5 idle-timeout: 30000 connect-timeout: 5000 retry-attempts: 3 retry-interval: 1000 ping-interval: 60000 keep-alive: true

2.1 工程结构

infrastructure 基础设施层:增加人群标签仓储服务和3个操作人群标签的DAO操作,同时在增加一个 Redisson 操作接口和实现类。

domain 领域层:增加 tag 人群标签领域,完成人群标签功能实现。

app 应用启动层:配置 mapper、application-dev.yml、以及 config 下的 Redis 链接配置类。

2.2 人群任务

增加人群标签服务接口,并实现功能流程

12345678910111213141516171819202122232425262728293031323334353637383940414243/** * @author Fuzhengwei bugstack.cn @小傅哥 * @description 人群标签服务 * @create 2024-12-28 12:51 */@Slf4j@Servicepublic class TagService implements ITagService { @Resource private ITagRepository repository; @Override public void execTagBatchJob(String tagId, String batchId) { log.info("人群标签批次任务 tagId:{} batchId:{}", tagId, batchId); // 1. 查询批次任务 CrowdTagsJobEntity crowdTagsJobEntity = repository.queryCrowdTagsJobEntity(tagId, batchId); // 2. 采集用户数据 - 这部分需要采集用户的消费类数据,后续有用户发起拼单后再处理。 // 3. 数据写入记录 List userIdList = new ArrayList() {{ add("xxx1"); add("xxx2"); add("xxx3"); add("xxx4"); add("xxx5"); add("xxx6"); add("xxx7"); add("xxx8"); }}; // 4. 一般人群标签的处理在公司中,会有专门的数据数仓团队通过脚本方式写入到数据库,就不用这样一个个或者批次来写。 for (String userId : userIdList) { repository.addCrowdTagsUserId(tagId, userId); } // 5. 更新人群标签统计量 repository.updateCrowdTagsStatistics(tagId, userIdList.size()); }}

通过采集人群标签任务获取人群数据,暂时没有这类业务数据,所以先模拟一个用户数据,你也可以调整这里的数据为你需要的。

采集数据后,repository.addCrowdTagsUserId(tagId, userId); 写入到数据库表。注意 addCrowdTagsUserId 方法,写入后还会做 BitMap 存储。

这些操作完成后,会更新统计量。注意,目前的统计量更新是不准的,因为执行 addCrowdTagsUserId 操作,会有主键冲突,主键冲突直接拦截不会抛异常。那么更新人群标签的统计量会继续增加。

2.3 数据存储 - bitmap

123456789101112131415package cn.bugstack.infrastructure.redis;// 这个是redisService的内容default int getIndexFromUserId(String userId) { try { MessageDigest md = MessageDigest.getInstance("MD5"); byte[] hashBytes = md.digest(userId.getBytes(StandardCharsets.UTF_8)); // 将哈希字节数组转换为正整数 BigInteger bigInt = new BigInteger(1, hashBytes); // 取模以确保索引在合理范围内 return bigInt.mod(BigInteger.valueOf(Integer.MAX_VALUE)).intValue(); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 algorithm not found", e); } }

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566package cn.bugstack.infrastructure.adapter.repository;/** * @description 人群标签仓储 */@Repositorypublic class TagRepository implements ITagRepository { @Resource private ICrowdTagsDao crowdTagsDao; @Resource private ICrowdTagsDetailDao crowdTagsDetailDao; @Resource private ICrowdTagsJobDao crowdTagsJobDao; @Resource private IRedisService redisService; // 查询人群标签任务实体 @Override public CrowdTagsJobEntity queryCrowdTagsJobEntity(String tagId, String batchId) { CrowdTagsJob crowdTagsJobReq = new CrowdTagsJob(); crowdTagsJobReq.setTagId(tagId); crowdTagsJobReq.setBatchId(batchId); CrowdTagsJob crowdTagsJobRes = crowdTagsJobDao.queryCrowdTagsJob(crowdTagsJobReq); if (null == crowdTagsJobRes) return null; return CrowdTagsJobEntity.builder() .tagType(crowdTagsJobRes.getTagType()) .tagRule(crowdTagsJobRes.getTagRule()) .statStartTime(crowdTagsJobRes.getStatStartTime()) .statEndTime(crowdTagsJobRes.getStatEndTime()) .build(); } // 增加人群用户ID @Override public void addCrowdTagsUserId(String tagId, String userId) { CrowdTagsDetail crowdTagsDetailReq = new CrowdTagsDetail(); crowdTagsDetailReq.setTagId(tagId); crowdTagsDetailReq.setUserId(userId); try { crowdTagsDetailDao.addCrowdTagsUserId(crowdTagsDetailReq); } catch (DuplicateKeyException ignore) { // 忽略唯一索引冲突 } // 获取BitSet RBitSet bitSet = redisService.getBitSet(tagId); // 更新BitMap bitSet.set(redisService.getIndexFromUserId(userId), true); } // 更新人群标签标准值 @Override public void updateCrowdTagsStatistics(String tagId, int count) { CrowdTags crowdTagsReq = new CrowdTags(); crowdTagsReq.setTagId(tagId); crowdTagsReq.setStatistics(count); crowdTagsDao.updateCrowdTagsStatistics(crowdTagsReq); }}

执行完写库后,开始把数据写入到人群标签。

不过注意人群标签的存储不是字符串,所以要转行为长整型进行存放。

拆分库表关联关系

在系统开始之初,简化功能设计,让拼团活动配置表直接耦合商品ID。也就是一个拼团活动,只关联一个商品ID。那么如果现在需要给10个商品,全配置一个相同的拼团活动,就没法配置了。总不能一个个全配置一遍。所以要对这块的内容做拆分解耦。

group_buy_activity 拼团活动配置表中,融合了渠道和商品ID,属于和活动配置绑定了。

sku 商品信息,已经包含了渠道SC值和商品ID。而商品表咱们前面提到过,它是由接入方同步的商品信息,也可以不走这里,直接用 RPC/HTTP 接口查询数据,所以 sku 表不适合绑定互动 ID。

那么,就需要一个新的表来关联活动和商品信息配置,表名为 sc_sku_activity 必备字段:SC渠道、活动ID、商品ID。(自己创建库表后可以和课程的库表对比)

这样可以根据这样的信息来创建你的库表,同时移除 group_buy_activity 表中 SC渠道值和商品ID。

1. 业务流程

整个改动流程的核心为让 MarketNode 节点的查询由原来方式改为先查询 SC 商品活动配置关联表,获得到活动 ID,再查询活动信息。这期间如果有效的活动配置信息无,那么则走到 ErrorNode 节点,返回一个指定的错误码。

首先,可以先通过如图的调用过程,理解要完成的编程动作。包括解耦后的新的库表关联关系。

之后,编码的时候异步多线程查询商品关联配置,如果配置的信息为空,或者不存在有效的活动,那么可以返回一个 null。当拿到 null 以后,可以做判断进行路由,走到 ErrorNode 节点。

2. 编码实现

2.1 工程结构

功能主要以这3个节点进行开发,会涉及到关于数据库映射 PO、Mapper 的字段调整和新的映射关系添加。

MarketNode 营销节点查询数据方式调整处理,在 QueryGroupBuyActivityDiscountVOThreadTask 中需要有一个 querySCSkuActivityBySCGoodsId 方法,来查询商品对应的活动配置。

ErrorNode 节点,主要处理一些异常兜底的逻辑,比如:查询不到对应的拼团活动配置,那么就要返回对应的错误码。

2.2 创建渠道商品活动配置关联表

12345678910111213141516171819202122232425262728package cn.bugstack.infrastructure.dao.po;/** * @description 渠道商品活动配置关联表 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class SCSkuActivity { /** 自增ID */ private Long id; /** 渠道 */ private String source; /** 来源 */ private String channel; /** 活动ID */ private Long activityId; /** 商品ID */ private String goodsId; /** 创建时间 */ private Date createTime; /** 更新时间 */ private Date updateTime;}

2.3 修改SC商品渠道DAO

123456789101112131415161718192021

需要配置新的 DAO 方法,查询渠道商品活动 ID 配置信息。

2.4 修改查询营销配置

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859package cn.bugstack.domain.activity.service.trial.thread;/** * @description 查询营销配置任务 */public class QueryGroupBuyActivityDiscountVOThreadTask implements Callable { /** * 活动ID */ private final Long activityId; /** * 来源 */ private final String source; /** * 渠道 */ private final String channel; /** * 商品ID */ private final String goodsId; /** * 活动仓储 */ private final IActivityRepository activityRepository; public QueryGroupBuyActivityDiscountVOThreadTask(Long activityId, String source, String channel, String goodsId, IActivityRepository activityRepository) { this.activityId = activityId; this.source = source; this.channel = channel; this.goodsId = goodsId; this.activityRepository = activityRepository; } @Override public GroupBuyActivityDiscountVO call() throws Exception { // 判断是否存在可用的活动ID Long availableActivityId = activityId; // 通过新建的关联表scSkuActivity进行查询 if (null == activityId){ // 查询渠道商品活动配置关联配置 SCSkuActivityVO scSkuActivityVO = activityRepository.querySCSkuActivityBySCGoodsId(source, channel, goodsId); if (null == scSkuActivityVO) return null; availableActivityId = scSkuActivityVO.getActivityId(); } // 查询活动配置 return activityRepository.queryGroupBuyActivityDiscountVO(availableActivityId); }}

需要多增加一个查询 querySCSkuActivityBySCGoodsId 的操作,把对应的这个商品配置的活动ID查询出来。

2.5 配置营销节点路由

12345678910111213141516171819202122232425262728293031323334353637383940package cn.bugstack.domain.activity.service.trial.node;public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-MarketNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); // 获取上下文数据 GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = dynamicContext.getGroupBuyActivityDiscountVO(); if (null == groupBuyActivityDiscountVO) { return router(requestParameter, dynamicContext); } GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount = groupBuyActivityDiscountVO.getGroupBuyDiscount(); SkuVO skuVO = dynamicContext.getSkuVO(); if (null == groupBuyDiscount || null == skuVO) { return router(requestParameter, dynamicContext); } // 优惠试算 IDiscountCalculateService discountCalculateService = discountCalculateServiceMap.get(groupBuyDiscount.getMarketPlan()); if (null == discountCalculateService) { log.info("不存在{}类型的折扣计算服务,支持类型为:{}", groupBuyDiscount.getMarketPlan(), JSON.toJSONString(discountCalculateServiceMap.keySet())); throw new AppException(ResponseCode.E0001.getCode(), ResponseCode.E0001.getInfo()); } // 折扣价格 BigDecimal deductionPrice = discountCalculateService.calculate(requestParameter.getUserId(), skuVO.getOriginalPrice(), groupBuyDiscount); dynamicContext.setDeductionPrice(deductionPrice); return router(requestParameter, dynamicContext);}@Overridepublic StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { // 不存在配置的拼团活动,走异常节点 if (null == dynamicContext.getGroupBuyActivityDiscountVO() || null == dynamicContext.getSkuVO()) { return errorNode; } return endNode;}

判断 groupBuyActivityDiscountVO、groupBuyDiscount、skuVO 是否为空。如果为空则直接路由走到兜底节点。

这个兜底节点就是本节新增加的 ErrorNode 节点。

2.6 异常兜底节点

123456789101112131415161718192021222324252627package cn.bugstack.domain.activity.service.trial.node;@Slf4j@Servicepublic class ErrorNode extends AbstractGroupBuyMarketSupport { @Override protected TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-NoMarketNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); // 无营销配置 if (null == dynamicContext.getGroupBuyActivityDiscountVO() || null == dynamicContext.getSkuVO()) { log.info("商品无拼团营销配置 {}", requestParameter.getGoodsId()); throw new AppException(ResponseCode.E0002.getCode(), ResponseCode.E0002.getInfo()); } return TrialBalanceEntity.builder().build(); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return defaultStrategyHandler; }}

异常兜底节点可以处理很多异常流程,包括:流程讲解、接口超时等。以及公司中运营配置的活动信息有误,也会走到一个兜底的流程。

人群标签节点过滤

在整个首页营销试算流程中,需要添加一个新的人群标签节点 TagNode,来处理人群过滤的操作。

在这里会看到目前的模型结构设计是非常容易添加出一个新的流程节点,同时对原有的功能不会有破坏性。这样既可以让我们更好的维护代码,也能方便持续的需求迭代。

1. 业务流程

如图,增加 TagNode 节点,调整节点调用关系

首先,添加一个新的 TagNode 节点,调整营销 MarketNode 节点完成业务功能后,流转到新的 TagNode 节点。

之后,在从个 TagNode 节点流转到 EndNode 结束节点。

2. 编码实现

2.1 工程结构

domain 领域层的 activity 活动领域下,trial 服务下 node 节点中添加 TagNode,之后再从 MarketNode 流转到 TagNode

TagNode 节点重点处理人群标签的过滤操作。

2.2 人群过滤

group_buy_activity 表中 tag_scope 中配置1,2 代表需要过滤人群,限制可见性和参与性。比如:这里的配置表示,一个参加活动的用户,如果不再人群范围内,既不允许看见活动,也不允许参与活动。如果只配置2,那么表示通过人群的用户,能看见,但不能参与。

那么我们这里就要做人群过滤限制,就要拿到这个1,2值,之后判断处理。

2.3 聚合方法

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104public class GroupBuyActivityDiscountVO { /** * 活动ID */ private Long activityId; /** * 活动名称 */ private String activityName; /** * 来源 */ private String source; /** * 渠道 */ private String channel; /** * 商品ID */ private String goodsId; /** * 折扣配置 */ private GroupBuyDiscount groupBuyDiscount; /** * 拼团方式(0自动成团、1达成目标拼团) */ private Integer groupType; /** * 拼团次数限制 */ private Integer takeLimitCount; /** * 拼团目标 */ private Integer target; /** * 拼团时长(分钟) */ private Integer validTime; /** * 活动状态(0创建、1生效、2过期、3废弃) */ private Integer status; /** * 活动开始时间 */ private Date startTime; /** * 活动结束时间 */ private Date endTime; /** * 人群标签规则标识 */ private String tagId; /** * 人群标签规则范围 */ private String tagScope; /** * 可见限制 * 只要存在这样一个值,那么首次获得的默认值就是 false */ public boolean isVisible() { String[] split = this.tagScope.split(Constants.SPLIT); if (split.length > 0 && Objects.equals(split[0], "1") && StringUtils.isNotBlank(split[0])) { return false; } return true; } /** * 参与限制 * 只要存在这样一个值,那么首次获得的默认值就是 false */ public boolean isEnable() { String[] split = this.tagScope.split(Constants.SPLIT); if (split.length == 2 && Objects.equals(split[1], "2") && StringUtils.isNotBlank(split[1])) { return false; } return true; } //... 省略部分代码}

可见限制:方法聚合到到类中,判断是否配置了1。如果配置了,那么默认这个对应的值的结果就是 false,之后在判断是否在人群范围内,如果在人群范围内则为 true。

参与限制:方法聚合到到类中,判断是否配置了2。如果配置了,那么默认这个对应的值的结果就是 false,之后在判断是否在人群范围内,如果在人群范围内则为 true。

2.4 TagNode实现

12345678910111213141516171819202122232425262728293031323334353637@Slf4j@Servicepublic class TagNode extends AbstractGroupBuyMarketSupport { @Resource private EndNode endNode; @Override protected TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { // 获取拼团活动配置 GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = dynamicContext.getGroupBuyActivityDiscountVO(); String tagId = groupBuyActivityDiscountVO.getTagId(); boolean visible = groupBuyActivityDiscountVO.isVisible(); boolean enable = groupBuyActivityDiscountVO.isEnable(); // 人群标签配置为空,则走默认值 if (StringUtils.isBlank(tagId)) { dynamicContext.setVisible(true); dynamicContext.setEnable(true); return router(requestParameter, dynamicContext); } // 是否在人群范围内:visible、enable 如果值为 ture 则表示没有配置拼团限制,那么就直接保证为 true 即可 boolean isWithin = repository.isTagCrowdRange(tagId, requestParameter.getUserId()); dynamicContext.setVisible(visible || isWithin); dynamicContext.setEnable(enable || isWithin); return router(requestParameter, dynamicContext); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return endNode; }}

首先,判断 tagId 是否为空,如果为空则访问的可见性和参与性都为 true 即可。

之后,过滤人群标签,isWithin 为用户是否在人群范围内。visible || isWithin、enable || isWithin 只要有一个 true 则可以通过。

动态配置开关操作

在程序运行过程中,直接动态变更某些属性配置。这些动态变更的配置包括:降级和切量的开关,也包括一些功能程序的白名单用户测试。

那么对于配置中心,有 SpringCloud Config + Event Bus,也有 Nacos,还有各个大厂中会基于各类组件做的自研实现。那么我们先来做一个基于 Redis 发布/订阅处理动态配置的自研的实现,之后对于 SpringCloud 的动态配置变更已经有案例

1. 业务流程

基于 Redis 实现一套动态配置中心 DCC 服务:Dynamic Config Control

注意,这部分会涉及到 Spring源码、Java 动态配置的一些编码操作,属于组件类开发是思想。例如:本部分实现的功能也可以被独立出一个工程组件开发后被业务系统引入使用。

方案,动态配置的处理可以使用 Zookeeper 的节点监听,也可以基于 Redis 的发布/订阅。本部分咱们使用 Redis 这套方案。

2. 编码实现

2.1 工程结构

步骤1:添加一个自定义注解,用于 Spring 扫描 Bean 对象的时候,可以直接管理这些配置了自定义注解的类的属性。

步骤2:给服务类的属性添加自定义注解。

步骤3:由 app 模块下的 config,添加一个动态配置管理的工厂,会自动的完成属性信息的填充和动态变更操作。

步骤4:业务使用,会调用步骤2中的属性服务。当有配置操作变动的时候,则可以把配置信息直接刷新到内存属性上。

步骤5:配置的变更来自于这里,当调用 DCCController 时,会触发 Redis 的发布/订阅,动态值的变更,以此把类上的属性的值做变更。

2.2 创建自定义注解

12345678910package cn.bugstack.types.annotations;@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.FIELD})@Documentedpublic @interface DCCValue { String value() default "";}

2.3 自定义注解的使用

12345678910111213141516171819202122232425262728293031323334353637383940package cn.bugstack.infrastructure.dcc;/** * @description 动态配置服务 */@Servicepublic class DCCService { /** * 降级开关 0关闭、1开启 */ @DCCValue("downgradeSwitch:0") private String downgradeSwitch; @DCCValue("cutRange:100") private String cutRange; public boolean isDowngradeSwitch() { return "1".equals(downgradeSwitch); } public boolean isCutRange(String userId) { // 计算哈希码的绝对值 int hashCode = Math.abs(userId.hashCode()); // 获取最后两位 int lastTwoDigits = hashCode % 100; // 判断是否在切量范围内 if (lastTwoDigits <= Integer.parseInt(cutRange)) { return true; } return false; }}

首先,我们知道在类中的属性配置的值固定的,配置后一直在程序运行中都是一个值。

那么,我们为了可以让程序随着功能的验证,可以动态的调整这些属性的值,就需要引入一些手段,这些手段的目的就是可以动态调整属性值。问:Java 中怎么通过反射来处理属性值的变更。

2.4 动态变更属性值

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130package cn.bugstack.config;@Slf4j@Configurationpublic class DCCValueBeanFactory implements BeanPostProcessor { private static final String BASE_CONFIG_PATH = "group_buy_market_dcc_"; private final RedissonClient redissonClient; // 存储bean对象 private final Map dccObjGroup = new HashMap<>(); public DCCValueBeanFactory(RedissonClient redissonClient) { this.redissonClient = redissonClient; } // Redis中发布订阅的一个处理 // RTopic // 增加一个bean的信息 @Bean("dccTopic") public RTopic testRedisTopicListener(RedissonClient redissonClient) { RTopic topic = redissonClient.getTopic("group_buy_market_dcc"); topic.addListener(String.class, (charSequence, s) -> { String[] split = s.split(Constants.SPLIT); // 获取值 String attribute = split[0]; String key = BASE_CONFIG_PATH + attribute; String value = split[1]; // 设置值 RBucket bucket = redissonClient.getBucket(key); boolean exists = bucket.isExists(); if (!exists) return; bucket.set(value); Object objBean = dccObjGroup.get(key); if (null == objBean) return; Class objBeanClass = objBean.getClass(); // 检查 objBean 是否是代理对象 if (AopUtils.isAopProxy(objBean)) { // 获取代理对象的目标对象 objBeanClass = AopUtils.getTargetClass(objBean); } try { // 1. getDeclaredField 方法用于获取指定类中声明的所有字段,包括私有字段、受保护字段和公共字段。 // 2. getField 方法用于获取指定类中的公共字段,即只能获取到公共访问修饰符(public)的字段。 Field field = objBeanClass.getDeclaredField(attribute); field.setAccessible(true); field.set(objBean, value); field.setAccessible(false); log.info("DCC 节点监听,动态设置值 {} {}", key, value); } catch (Exception e) { throw new RuntimeException(e); } }); return topic; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 注意;增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生变化,这样不能获得到自己的自定义注解了。 Class targetBeanClass = bean.getClass(); Object targetBeanObject = bean; // 判断这个bean对象是AOP的切面对象 if (AopUtils.isAopProxy(bean)) { targetBeanClass = AopUtils.getTargetClass(bean); targetBeanObject = AopProxyUtils.getSingletonTarget(bean); } Field[] fields = targetBeanClass.getDeclaredFields(); for (Field field : fields) { if (!field.isAnnotationPresent(DCCValue.class)) { continue; } // 获取DCC注解对象 DCCValue dccValue = field.getAnnotation(DCCValue.class); String value = dccValue.value(); if (StringUtils.isBlank(value)) { throw new RuntimeException(field.getName() + " @DCCValue is not config value config case 「isSwitch/isSwitch:1」"); } String[] splits = value.split(":"); // 进行"group_buy_market_dcc_"的拼接 String key = BASE_CONFIG_PATH.concat(splits[0]); String defaultValue = splits.length == 2 ? splits[1] : null; // 设置值 String setValue = defaultValue; try { // 如果为空则抛出异常 if (StringUtils.isBlank(defaultValue)) { throw new RuntimeException("dcc config error " + key + " is not null - 请配置默认值!"); } // Redis 操作,判断配置Key是否存在,不存在则创建,存在则获取最新值 RBucket bucket = redissonClient.getBucket(key); boolean exists = bucket.isExists(); if (!exists) { bucket.set(defaultValue); } else { setValue = bucket.get(); } // field.setAccessible(true):是 Java 反射(Reflection)中的一个操作,用于绕过访问权限检查,允许程序访问或修改原本不可直接访问的字段(如 private、protected 或包级私有的字段) field.setAccessible(true); field.set(targetBeanObject, setValue); field.setAccessible(false); } catch (Exception e) { throw new RuntimeException(e); } dccObjGroup.put(key, targetBeanObject); } return bean; }}

DCCValueBeanFactory 实现了 BeanPostProcessor,这样我们就可以拿到 Spring 所有实例化后的 Bean 对象。Spring 是扫描你所工程下所有的 Bean 对象,加载到它的容器里管理。

postProcessAfterInitialization 扫描所有的 Bean 对象,之后检查哪个类的属性加有 @DCCValue 注解,检测到后进行管理操作。

@DCCValue("downgradeSwitch:0") 解析自定义注解配置值,一个是key,一个是默认的值。之后会从 redis 获取值,如果没有则走默认值。

testRedisTopicListener 是一个监听 redis 发布/订阅消息的处理,之后动态设置 Redis 值,完事后更新类中 Redis 的属性值。

2.5 DCC 动态配置中心接口

1234567891011package cn.bugstack.api;/** * @description DCC 动态配置中心,就是controller实现的接口类 */public interface IDCCService { Response updateConfig(String key, String value);}

2.5 实现对应的接口

1234567891011121314151617181920212223242526272829303132333435363738394041package cn.bugstack.trigger.http;/** * @description 动态配置管理 */// @CrossOrigin("*"):跨域的注解@Slf4j@RestController()@CrossOrigin("*")@RequestMapping("/api/v1/gbm/dcc/")public class DCCController implements IDCCService { @Resource private RTopic dccTopic; /** * 动态值变更 *

* curl http://127.0.0.1:8091/api/v1/gbm/dcc/update_config?key=downgradeSwitch&value=1 * curl http://127.0.0.1:8091/api/v1/gbm/dcc/update_config?key=cutRange&value=0 */ @RequestMapping(value = "update_config", method = RequestMethod.GET) @Override public Response updateConfig(@RequestParam String key, @RequestParam String value) { try { log.info("DCC 动态配置值变更 key:{} value:{}", key, value); dccTopic.publish(key + "," + value); return Response.builder() .code(ResponseCode.SUCCESS.getCode()) .info(ResponseCode.SUCCESS.getInfo()) .build(); } catch (Exception e) { log.error("DCC 动态配置值变更失败 key:{} value:{}", key, value, e); return Response.builder() .code(ResponseCode.UN_ERROR.getCode()) .info(ResponseCode.UN_ERROR.getInfo()) .build(); } }}

2.6 开关节点使用

123456789101112131415161718192021222324252627282930313233343536package cn.bugstack.domain.activity.service.trial.node;@Servicepublic class SwitchNode extends AbstractGroupBuyMarketSupport { @Resource private MarketNode marketNode; @Override public TrialBalanceEntity doApply(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { log.info("拼团商品查询试算服务-SwitchNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter)); // 根据用户ID切量 String userId = requestParameter.getUserId(); // 判断是否降级 if (repository.downgradeSwitch()) { log.info("拼团活动降级拦截 {}", userId); throw new AppException(ResponseCode.E0003.getCode(), ResponseCode.E0003.getInfo()); } // 切量范围判断 if (!repository.cutRange(userId)) { log.info("拼团活动切量拦截 {}", userId); throw new AppException(ResponseCode.E0004.getCode(), ResponseCode.E0004.getInfo()); } return router(requestParameter, dynamicContext); } @Override public StrategyHandler get(MarketProductEntity requestParameter, DefaultActivityStrategyFactory.DynamicContext dynamicContext) throws Exception { return marketNode; }}

进入开关节点,通过仓储 repository 获取对应的值。

repository.downgradeSwitch() 判断是否降级。

repository.cutRange(userId) 通过用户ID的哈希值最后2位,判断是否在100范围内,以此做切量处理。

拼团交易营销锁单

当商城类系统接入拼团时,则需要在下单过程中使用一笔营销优惠。这里的营销优惠可以为:无券平台营销、有券消费营销、拼团折扣营销、积分抵扣营销等。

那么,当商城类系统接入使用下单时,则需要到拼团系统锁定一笔优惠,也就是占用一个名额。完事后,商城类系统继续操作支付交易的过程。

1. 业务流程

首先,团购的商品下单。下单过程分为创建流水单、锁定营销优惠(拼团、积分、券)、创建支付订单、唤起收银台支付、用户扫码支付、支付完成核销优惠等。

那么,这里用户以拼团方式下单,创建流水单完成后,需要与拼团系统交互,锁定营销优惠。更新流水单优惠金额和支付金额。接下来就可以创建支付单了(支付单需要最终的支付金额)。

注意:拼团表 group_buy_order 除了有目标量(target_count)、完成量(complete),还要有一个锁单量(lock_count),当锁单量达到目标量后,用户在此组织下,不能在参与拼团。直至这些用户支付完成达成拼团或者锁单超时回退支付营销,空出可参与锁单量,这样其他用户可以继续参与。

1.1 工程结构

步骤1:添加数据库 DAO 配置,拼团订单、拼团明细操作。之后交给 repository 仓储使用。

步骤2:domain 领域层实现交易订单服务,创建拼团锁单操作,以及查询。

步骤3:trigger 层提供拼团交易接口服务,这里会串联 activity、trade 两个领域服务。目前的 trigger 替代了一部分 case 层的编排操作,如果是较大规模的系统,则可以单独提供一个 case 层。

1.2 库表更新

首先,新增加两张表:group_buy_order、group_buy_order_list。

注意:库表设计中,添加了 team_id、lock_count(锁单量),以及做了细化的处理。

12345678910111213141516171819202122CREATE TABLE `group_buy_order` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `team_id` varchar(8) NOT NULL COMMENT '拼单组队ID', `activity_id` bigint(8) NOT NULL COMMENT '活动ID', `source` varchar(8) NOT NULL COMMENT '渠道', `channel` varchar(8) NOT NULL COMMENT '来源', `original_price` decimal(8,2) NOT NULL COMMENT '原始价格', `deduction_price` decimal(8,2) NOT NULL COMMENT '折扣金额', `pay_price` decimal(8,2) NOT NULL COMMENT '支付价格', `target_count` int(5) NOT NULL COMMENT '目标数量', `complete_count` int(5) NOT NULL COMMENT '完成数量', `lock_count` int(5) NOT NULL COMMENT '锁单数量', `status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '状态(0-拼单中、1-完成、2-失败)', `valid_start_time` datetime NOT NULL COMMENT '拼团开始时间', `valid_end_time` datetime NOT NULL COMMENT '拼团结束时间', `notify_type` varchar(8) NOT NULL DEFAULT 'HTTP' COMMENT '回调类型(HTTP、MQ)', `notify_url` varchar(512) DEFAULT NULL COMMENT '回调地址(HTTP 回调不可为空)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_team_id` (`team_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

123456789101112131415161718192021222324CREATE TABLE `group_buy_order_list` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `user_id` varchar(64) NOT NULL COMMENT '用户ID', `team_id` varchar(8) NOT NULL COMMENT '拼单组队ID', `order_id` varchar(12) NOT NULL COMMENT '订单ID', `activity_id` bigint(8) NOT NULL COMMENT '活动ID', `start_time` datetime NOT NULL COMMENT '活动开始时间', `end_time` datetime NOT NULL COMMENT '活动结束时间', `goods_id` varchar(16) NOT NULL COMMENT '商品ID', `source` varchar(8) NOT NULL COMMENT '渠道', `channel` varchar(8) NOT NULL COMMENT '来源', `original_price` decimal(8,2) NOT NULL COMMENT '原始价格', `deduction_price` decimal(8,2) NOT NULL COMMENT '折扣金额', `pay_price` decimal(8,2) NOT NULL COMMENT '支付金额', `status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '状态;0初始锁定、1消费完成、2用户退单', `out_trade_no` varchar(12) NOT NULL COMMENT '外部交易单号-确保外部调用唯一幂等', `out_trade_time` datetime DEFAULT NULL COMMENT '外部交易时间', `biz_id` varchar(64) NOT NULL COMMENT '业务唯一ID', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_order_id` (`order_id`), KEY `idx_user_id_activity_id` (`user_id`,`activity_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2. 编码实现

2.1 基础实现

增加数据库的对象:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849package cn.bugstack.infrastructure.dao.po;/** * @description 用户拼单 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class GroupBuyOrder { /** 自增ID */ private Long id; /** 拼单组队ID */ private String teamId; /** 活动ID */ private Long activityId; /** 渠道 */ private String source; /** 来源 */ private String channel; /** 原始价格 */ private BigDecimal originalPrice; /** 折扣金额 */ private BigDecimal deductionPrice; /** 支付价格 */ private BigDecimal payPrice; /** 目标数量 */ private Integer targetCount; /** 完成数量 */ private Integer completeCount; /** 锁单数量 */ private Integer lockCount; /** 状态(0-拼单中、1-完成、2-失败) */ private Integer status; /** 拼团开始时间 - 参与拼团时间 */ private Date validStartTime; /** 拼团结束时间 - 拼团有效时长 */ private Date validEndTime; /** 回调类型 HTTP、MQ */ private String notifyType; /** 回调通知(HTTP 方式回调,地址不可为空) */ private String notifyUrl; /** 创建时间 */ private Date createTime; /** 更新时间 */ private Date updateTime;}

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152package cn.bugstack.infrastructure.dao.po;/** * @description 用户拼单明细 */@EqualsAndHashCode(callSuper = true)@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class GroupBuyOrderList extends Page { /** 自增ID */ private Long id; /** 用户ID */ private String userId; /** 拼单组队ID */ private String teamId; /** 订单ID */ private String orderId; /** 活动ID */ private Long activityId; /** 活动开始时间 */ private Date startTime; /** 活动结束时间 */ private Date endTime; /** 商品ID */ private String goodsId; /** 渠道 */ private String source; /** 来源 */ private String channel; /** 原始价格 */ private BigDecimal originalPrice; /** 折扣金额 */ private BigDecimal deductionPrice; /** 支付金额 */ private BigDecimal payPrice; /** 状态;0初始锁定、1消费完成 */ private Integer status; /** 外部交易单号-确保外部调用唯一幂等 */ private String outTradeNo; /** 外部交易时间 */ private Date outTradeTime; /** 唯一业务ID */ private String bizId; /** 创建时间 */ private Date createTime; /** 更新时间 */ private Date updateTime;}

增加这两个对象的方法:

1234567891011121314package cn.bugstack.infrastructure.dao;@Mapperpublic interface IGroupBuyOrderDao { void insert(GroupBuyOrder groupBuyOrder); int updateAddLockCount(String teamId); int updateSubtractionLockCount(String teamId); GroupBuyOrder queryGroupBuyProgress(String teamId);}

12345678910package cn.bugstack.infrastructure.dao;@Mapperpublic interface IGroupBuyOrderListDao { void insert(GroupBuyOrderList groupBuyOrderListReq); GroupBuyOrderList queryGroupBuyOrderRecordByOutTradeNo(GroupBuyOrderList groupBuyOrderListReq);}

给2个新表提供新的 DAO 操作:

IGroupBuyOrderDao:插入数据、更新锁单量、查询进度

IGroupBuyOrderListDao:插入拼单明细、查询交易记录

2.2 交易订单

1234567891011121314151617181920212223242526272829303132333435363738@Slf4j@Servicepublic class TradeOrderService implements ITradeOrderService { @Resource private ITradeRepository repository; @Override public MarketPayOrderEntity queryNoPayMarketPayOrderByOutTradeNo(String userId, String outTradeNo) { log.info("拼团交易-查询未支付营销订单:{} outTradeNo:{}", userId, outTradeNo); // 查询未支付营销订单 return repository.queryMarketPayOrderEntityByOutTradeNo(userId, outTradeNo); } @Override public GroupBuyProgressVO queryGroupBuyProgress(String teamId) { log.info("拼团交易-查询拼单进度:{}", teamId); // 查询拼单进度 return repository.queryGroupBuyProgress(teamId); } @Override public MarketPayOrderEntity lockMarketPayOrder(UserEntity userEntity, PayActivityEntity payActivityEntity, PayDiscountEntity payDiscountEntity) { log.info("拼团交易-锁定营销优惠支付订单:{} activityId:{} goodsId:{}", userEntity.getUserId(), payActivityEntity.getActivityId(), payDiscountEntity.getGoodsId()); // 构建聚合对象 GroupBuyOrderAggregate groupBuyOrderAggregate = GroupBuyOrderAggregate.builder() .userEntity(userEntity) .payActivityEntity(payActivityEntity) .payDiscountEntity(payDiscountEntity) .build(); // 锁定聚合订单 - 这会用户只是下单还没有支付。后续会有2个流程;支付成功、超时未支付(回退) // 锁定营销优惠支付订单 return repository.lockMarketPayOrder(groupBuyOrderAggregate); }}

在交易订单中提供查询,包括:未支付营销订单、查询拼单进度和锁定营销优惠支付订单。

在 lockMarketPayOrder 有数据库事务操作。

2.3 实现ITradeRepository的接口方法

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140package cn.bugstack.infrastructure.adapter.repository;/** * @description 交易仓储服务 */@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Resource private IGroupBuyOrderDao groupBuyOrderDao; @Resource private IGroupBuyOrderListDao groupBuyOrderListDao; // 查询未支付营销订单 @Override public MarketPayOrderEntity queryMarketPayOrderEntityByOutTradeNo(String userId, String outTradeNo) { GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setUserId(userId); groupBuyOrderListReq.setOutTradeNo(outTradeNo); GroupBuyOrderList groupBuyOrderListRes = groupBuyOrderListDao.queryGroupBuyOrderRecordByOutTradeNo(groupBuyOrderListReq); if (null == groupBuyOrderListRes) return null; return MarketPayOrderEntity.builder() .teamId(groupBuyOrderListRes.getTeamId()) .orderId(groupBuyOrderListRes.getOrderId()) .originalPrice(groupBuyOrderListRes.getOriginalPrice()) .deductionPrice(groupBuyOrderListRes.getDeductionPrice()) .payPrice(groupBuyOrderListRes.getPayPrice()) .tradeOrderStatusEnumVO(TradeOrderStatusEnumVO.valueOf(groupBuyOrderListRes.getStatus())) .build(); } // 锁定营销优惠支付订单 @Transactional(timeout = 500) @Override public MarketPayOrderEntity lockMarketPayOrder(GroupBuyOrderAggregate groupBuyOrderAggregate) { // 聚合对象信息 UserEntity userEntity = groupBuyOrderAggregate.getUserEntity(); PayActivityEntity payActivityEntity = groupBuyOrderAggregate.getPayActivityEntity(); PayDiscountEntity payDiscountEntity = groupBuyOrderAggregate.getPayDiscountEntity(); NotifyConfigVO notifyConfigVO = payDiscountEntity.getNotifyConfigVO(); Integer userTakeOrderCount = groupBuyOrderAggregate.getUserTakeOrderCount(); // 判断是否有团 - teamId 为空 - 新团、为不空 - 老团 String teamId = payActivityEntity.getTeamId(); if (StringUtils.isBlank(teamId)) { // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID teamId = RandomStringUtils.randomNumeric(8); // 日期处理 Date currentDate = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(currentDate); calendar.add(Calendar.MINUTE, payActivityEntity.getValidTime()); // 构建拼团订单 GroupBuyOrder groupBuyOrder = GroupBuyOrder.builder() .teamId(teamId) .activityId(payActivityEntity.getActivityId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .targetCount(payActivityEntity.getTargetCount()) // 完成数量为0 .completeCount(0) // 锁单数量为1 .lockCount(1) .validStartTime(currentDate) .validEndTime(calendar.getTime()) .notifyType(notifyConfigVO.getNotifyType().getCode()) .notifyUrl(notifyConfigVO.getNotifyUrl()) .build(); // 写入记录 groupBuyOrderDao.insert(groupBuyOrder); } else { // 更新记录 - 如果更新记录不等于1,则表示拼团已满,抛出异常 int updateAddTargetCount = groupBuyOrderDao.updateAddLockCount(teamId); if (1 != updateAddTargetCount) { throw new AppException(ResponseCode.E0005); } } // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID String orderId = RandomStringUtils.randomNumeric(12); GroupBuyOrderList groupBuyOrderListReq = GroupBuyOrderList.builder() .userId(userEntity.getUserId()) .teamId(teamId) .orderId(orderId) .activityId(payActivityEntity.getActivityId()) .startTime(payActivityEntity.getStartTime()) .endTime(payActivityEntity.getEndTime()) .goodsId(payDiscountEntity.getGoodsId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .status(TradeOrderStatusEnumVO.CREATE.getCode()) .outTradeNo(payDiscountEntity.getOutTradeNo()) .build(); try { // 写入拼团记录 groupBuyOrderListDao.insert(groupBuyOrderListReq); } catch (DuplicateKeyException e) { throw new AppException(ResponseCode.INDEX_EXCEPTION); } return MarketPayOrderEntity.builder() .orderId(orderId) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .tradeOrderStatusEnumVO(TradeOrderStatusEnumVO.CREATE) .build(); } // 查询拼单进度 @Override public GroupBuyProgressVO queryGroupBuyProgress(String teamId) { GroupBuyOrder groupBuyOrder = groupBuyOrderDao.queryGroupBuyProgress(teamId); if (null == groupBuyOrder) return null; return GroupBuyProgressVO.builder() .completeCount(groupBuyOrder.getCompleteCount()) .targetCount(groupBuyOrder.getTargetCount()) .lockCount(groupBuyOrder.getLockCount()) .build(); }}

group_buy_order、group_buy_order_list,两个库表一个写入记录,一个更新记录。需要在一个事务中完成操作。

2.4 服务接口的实现

12345678910111213141516171819202122232425package cn.bugstack.api;/** * @description 营销交易服务接口 */public interface IMarketTradeService { /** * 营销锁单 * * @param requestDTO 锁单商品信息 * @return 锁单结果信息 */ Response lockMarketPayOrder(LockMarketPayOrderRequestDTO requestDTO); /** * 营销结算 * * @param requestDTO 结算商品信息 * @return 结算结果信息 */ Response settlementMarketPayOrder(SettlementMarketPayOrderRequestDTO requestDTO);}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132package cn.bugstack.trigger.http;/** * @description 营销交易服务 */@Slf4j@RestController()@CrossOrigin("*")@RequestMapping("/api/v1/gbm/trade/")public class MarketTradeController implements IMarketTradeService { @Resource private IIndexGroupBuyMarketService indexGroupBuyMarketService; @Resource private ITradeOrderService tradeOrderService; /** * 拼团营销锁单 */ @RequestMapping(value = "lock_market_pay_order", method = RequestMethod.POST) @Override public Response lockMarketPayOrder(LockMarketPayOrderRequestDTO lockMarketPayOrderRequestDTO) { try { // 参数 String userId = lockMarketPayOrderRequestDTO.getUserId(); String source = lockMarketPayOrderRequestDTO.getSource(); String channel = lockMarketPayOrderRequestDTO.getChannel(); String goodsId = lockMarketPayOrderRequestDTO.getGoodsId(); Long activityId = lockMarketPayOrderRequestDTO.getActivityId(); String outTradeNo = lockMarketPayOrderRequestDTO.getOutTradeNo(); String teamId = lockMarketPayOrderRequestDTO.getTeamId(); log.info("营销交易锁单:{} LockMarketPayOrderRequestDTO:{}", userId, JSON.toJSONString(lockMarketPayOrderRequestDTO)); if (StringUtils.isBlank(userId) || StringUtils.isBlank(source) || StringUtils.isBlank(channel) || StringUtils.isBlank(goodsId) || StringUtils.isBlank(goodsId) || null == activityId) { return Response.builder() .code(ResponseCode.ILLEGAL_PARAMETER.getCode()) .info(ResponseCode.ILLEGAL_PARAMETER.getInfo()) .build(); } // 查询 outTradeNo 是否已经存在交易记录 MarketPayOrderEntity marketPayOrderEntity = tradeOrderService.queryNoPayMarketPayOrderByOutTradeNo(userId, outTradeNo); if (null != marketPayOrderEntity) { LockMarketPayOrderResponseDTO lockMarketPayOrderResponseDTO = LockMarketPayOrderResponseDTO.builder() .orderId(marketPayOrderEntity.getOrderId()) .deductionPrice(marketPayOrderEntity.getDeductionPrice()) .tradeOrderStatus(marketPayOrderEntity.getTradeOrderStatusEnumVO().getCode()) .build(); log.info("交易锁单记录(存在):{} marketPayOrderEntity:{}", userId, JSON.toJSONString(marketPayOrderEntity)); return Response.builder() .code(ResponseCode.SUCCESS.getCode()) .info(ResponseCode.SUCCESS.getInfo()) .data(lockMarketPayOrderResponseDTO) .build(); } // 判断拼团是否完成了目标 if (null != teamId) { GroupBuyProgressVO groupBuyProgressVO = tradeOrderService.queryGroupBuyProgress(teamId); if (null != groupBuyProgressVO && Objects.equals(groupBuyProgressVO.getTargetCount(), groupBuyProgressVO.getLockCount())) { log.info("交易锁单拦截-拼单目标已达成:{} {}", userId, teamId); return Response.builder() .code(ResponseCode.E0006.getCode()) .info(ResponseCode.E0006.getInfo()) .build(); } } // 营销优惠试算 TrialBalanceEntity trialBalanceEntity = indexGroupBuyMarketService.indexMarketTrial(MarketProductEntity.builder() .userId(userId) .source(source) .channel(channel) .goodsId(goodsId) .activityId(activityId) .build()); GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = trialBalanceEntity.getGroupBuyActivityDiscountVO(); // 锁单 marketPayOrderEntity = tradeOrderService.lockMarketPayOrder( UserEntity.builder().userId(userId).build(), PayActivityEntity.builder() .teamId(teamId) .activityId(activityId) .activityName(groupBuyActivityDiscountVO.getActivityName()) .startTime(groupBuyActivityDiscountVO.getStartTime()) .endTime(groupBuyActivityDiscountVO.getEndTime()) .targetCount(groupBuyActivityDiscountVO.getTarget()) .build(), PayDiscountEntity.builder() .source(source) .channel(channel) .goodsId(goodsId) .goodsName(trialBalanceEntity.getGoodsName()) .originalPrice(trialBalanceEntity.getOriginalPrice()) .deductionPrice(trialBalanceEntity.getDeductionPrice()) .outTradeNo(outTradeNo) .build()); log.info("交易锁单记录(新):{} marketPayOrderEntity:{}", userId, JSON.toJSONString(marketPayOrderEntity)); // 返回结果 return Response.builder() .code(ResponseCode.SUCCESS.getCode()) .info(ResponseCode.SUCCESS.getInfo()) .data(LockMarketPayOrderResponseDTO.builder() .orderId(marketPayOrderEntity.getOrderId()) .deductionPrice(marketPayOrderEntity.getDeductionPrice()) .tradeOrderStatus(marketPayOrderEntity.getTradeOrderStatusEnumVO().getCode()) .build()) .build(); } catch (AppException e) { log.error("营销交易锁单业务异常:{} LockMarketPayOrderRequestDTO:{}", lockMarketPayOrderRequestDTO.getUserId(), JSON.toJSONString(lockMarketPayOrderRequestDTO), e); return Response.builder() .code(e.getCode()) .info(e.getInfo()) .build(); } catch (Exception e) { log.error("营销交易锁单服务失败:{} LockMarketPayOrderRequestDTO:{}", lockMarketPayOrderRequestDTO.getUserId(), JSON.toJSONString(lockMarketPayOrderRequestDTO), e); return Response.builder() .code(ResponseCode.UN_ERROR.getCode()) .info(ResponseCode.UN_ERROR.getInfo()) .build(); } }}

首先,需要查询外部交易 outTradeNo 是否存在交易记录。如果存在未完成的订单,直接返回结果即可。这个是幂等的一个防护。如果不查询,最终也是会有数据库唯一索引拦截。

之后,判断拼团锁单是否完成目标量,如果已经完成了目标量则直接直接返回,让用户不能参与当前拼团。一般在并发情况下,如果多人选择一个拼团,那么查询拼团量可以有效拦截。如果没有拦截,最终访问数据,也会有数量判断拦截。注意这里会有数据库表的行级锁,如果 TPS 量大,那么则需要加入 redis 操作库存量。

之后,开始做营销优惠试算。判断当前商品在拼团下应该优惠多少。确认完优惠后,开始锁单。最终返回给用户到界面展示,用户确认了支付所用到的营销优惠,那么在点击确认支付跳转收银台扫码支付即可。

责任链抽象模板设计

1. 模型设计

责任链是一种简单的单链路结构,在工程中会有多个这样的单链,为了可以让不同的场景都能创建出自己的链,则需要解耦责任链的链路和执行,再有执行器处理。设计如图:

如图,这是一种多实例对象责任链的设计结构,会使用到如 Java JDK 源码中 Link 的方式填写链路,之后再有业务链路处理链路执行。而每一个链路都会被填充一个逻辑处理器的实现类(ILogicHandler)来处理具体的业务。

那么,这样就很好的扩展了各种链路的使用诉求。

2. 编码实现

2.1 工程结构

在 types → design,提供 link 责任链。model1、model2

model1:是通用的单例链,本身抽象类既可添加链路节点,又能做链路过程执行处理。

model2:拆分了责任链和执行处理器,并由 LinkArmory 进行装配。这样就可以填充多例链。

2.2 单例链路

2.2.1 ILogicChainArmory

123456789101112package cn.bugstack.types.design.framework.link.model1;/** * @description 责任链装配 */public interface ILogicChainArmory { ILogicLink next(); ILogicLink appendNext(ILogicLink next);}

2.2.2 ILogicLink

12345678910package cn.bugstack.types.design.framework.link.model1;/** * @description 略规则责任链接口 */public interface ILogicLink extends ILogicChainArmory { R apply(T requestParameter, D dynamicContext) throws Exception;}

2.2.3 AbstractLogicLink

1234567891011121314151617181920212223242526package cn.bugstack.types.design.framework.link.model1;/** * @description 抽象类 */public abstract class AbstractLogicLink implements ILogicLink { private ILogicLink next; @Override public ILogicLink next() { return next; } @Override public ILogicLink appendNext(ILogicLink next) { this.next = next; return next; } protected R next(T requestParameter, D dynamicContext) throws Exception { return next.apply(requestParameter, dynamicContext); }}

ILogicChainArmory 装配链,提供添加节点方法和获取节点。

ILogicLink 继承 ILogicChainArmory,并提供一个受理业务逻辑的方法。

AbstractLogicLink 的过程是封装添加节点和执行 next 下一个节点的方法。

那么,这里是由统一的类维护责任链,也就是一份责任链。如果有2个独立的链要处理,就需要使用到非单例的类进行填充,否则会修改同一份链。也就是 ILogicLink next 被几份链反复调整,也就不是一个单独的链了。

2.2.4 测试举例

123456789101112131415package cn.bugstack.test.types.rule01.logic;@Slf4j@Servicepublic class RuleLogic101 extends AbstractLogicLink{ @Override public String apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception { log.info("link model01 RuleLogic101"); return next(requestParameter, dynamicContext); }}

123456789101112131415package cn.bugstack.test.types.rule01.logic;@Slf4j@Servicepublic class RuleLogic102 extends AbstractLogicLink{ @Override public String apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception { log.info("link model01 RuleLogic102"); return "link model01 单实例链"; }}

1234567891011121314151617181920212223242526// 这个工厂类负责连接这两个节点:RuleLogic101和RuleLogic102package cn.bugstack.test.types.rule01.factory;@Servicepublic class Rule01TradeRuleFactory { @Resource private RuleLogic101 ruleLogic101; @Resource private RuleLogic102 ruleLogic102; public ILogicLink openLogicLink() { // ruleLogic101要连接ruleLogic102 ruleLogic101.appendNext(ruleLogic102); return ruleLogic101; } @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { private String age; }}

123456789101112131415161718// 主测试方法@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class Link01Test { @Resource public Rule01TradeRuleFactory rule01TradeRuleFactory; @Test public void test_model01_01() throws Exception { ILogicLink logicLink = rule01TradeRuleFactory.openLogicLink(); String logic = logicLink.apply("123", new Rule02TradeRuleFactory.DynamicContext()); log.info("测试结果:{}", JSON.toJSONString(logic)); }}

2.3 多例链路

多例链的设计要解耦链路和执行,把链路当做一个 LinkedList 列表处理,之后执行当做是单独的 for 循环

2.3.1 链表接口

1234567891011121314151617181920package cn.bugstack.types.design.framework.link.model2.chain;/** * @description 链接口 */public interface ILink { boolean add(E e); boolean addFirst(E e); boolean addLast(E e); boolean remove(Object o); E get(int index); void printLinkList();}

2.3.2 功能链表

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159package cn.bugstack.types.design.framework.link.model2.chain;/** * @description 功能链路 */public class LinkedList implements ILink { /** * 责任链名称 */ private final String name; transient int size = 0; transient Node first; transient Node last; public LinkedList(String name) { this.name = name; } void linkFirst(E e) { final Node f = first; final Node newNode = new Node<>(null, e, f); first = newNode; if (f == null) last = newNode; else f.prev = newNode; size++; } void linkLast(E e) { final Node l = last; final Node newNode = new Node<>(l, e, null); last = newNode; if (l == null) { first = newNode; } else { l.next = newNode; } size++; } @Override public boolean add(E e) { linkLast(e); return true; } @Override public boolean addFirst(E e) { linkFirst(e); return true; } @Override public boolean addLast(E e) { linkLast(e); return true; } @Override public boolean remove(Object o) { if (o == null) { for (Node x = first; x != null; x = x.next) { if (x.item == null) { unlink(x); return true; } } } else { for (Node x = first; x != null; x = x.next) { if (o.equals(x.item)) { unlink(x); return true; } } } return false; } E unlink(Node x) { final E element = x.item; final Node next = x.next; final Node prev = x.prev; if (prev == null) { first = next; } else { prev.next = next; x.prev = null; } if (next == null) { last = prev; } else { next.prev = prev; x.next = null; } x.item = null; size--; return element; } @Override public E get(int index) { return node(index).item; } Node node(int index) { if (index < (size >> 1)) { Node x = first; for (int i = 0; i < index; i++) x = x.next; return x; } else { Node x = last; for (int i = size - 1; i > index; i--) x = x.prev; return x; } } public void printLinkList() { if (this.size == 0) { System.out.println("链表为空"); } else { Node temp = first; System.out.print("目前的列表,头节点:" + first.item + " 尾节点:" + last.item + " 整体:"); while (temp != null) { System.out.print(temp.item + ","); temp = temp.next; } System.out.println(); } } protected static class Node { E item; Node next; Node prev; public Node(Node prev, E element, Node next) { this.item = element; this.next = next; this.prev = prev; } } public String getName() { return name; }}

2.3.3 逻辑处理器

1234567891011121314package cn.bugstack.types.design.framework.link.model2.handler;/** * @description 逻辑处理器 */public interface ILogicHandler { default R next(T requestParameter, D dynamicContext) { return null; } R apply(T requestParameter, D dynamicContext) throws Exception;}

2.3.4 业务链路

1234567891011121314151617181920212223242526package cn.bugstack.types.design.framework.link.model2.chain;/** * @description 业务链路 */public class BusinessLinkedList extends LinkedList> implements ILogicHandler{ public BusinessLinkedList(String name) { super(name); } @Override public R apply(T requestParameter, D dynamicContext) throws Exception { Node> current = this.first; do { ILogicHandler item = current.item; R apply = item.apply(requestParameter, dynamicContext); if (null != apply) return apply; current = current.next; } while (null != current); return null; }}

BusinessLinkedList 存在的意义是为了受理业务流程,循环遍历责任链。遍历的过程不只是这样,也可以实现 Iterable 来处理。

遍历的过程会以 apply 是否为空和链路是否走到最后来判断。apply 为空则表示当前 ILogicHandler 实现的节点的业务流程为空,放行到下一个节点。

2.3.5 链路装配

1234567891011121314151617181920212223package cn.bugstack.types.design.framework.link.model2;/** * @description 链路装配 */public class LinkArmory { private final BusinessLinkedList logicLink; @SafeVarargs public LinkArmory(String linkName, ILogicHandler... logicHandlers) { logicLink = new BusinessLinkedList<>(linkName); for (ILogicHandler logicHandler: logicHandlers){ logicLink.add(logicHandler); } } public BusinessLinkedList getLogicLink() { return logicLink; }}

提供一个链路装配器,让调用方自行装配处理。

2.3.6 测试举例

123456789101112131415161718package cn.bugstack.test.types.rule02.logic;/** * @description */@Slf4j@Servicepublic class RuleLogic201 implements ILogicHandler { public XxxResponse apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception{ log.info("link model02 RuleLogic201"); return next(requestParameter, dynamicContext); }}

123456789101112131415161718package cn.bugstack.test.types.rule02.logic;/** * @description */@Slf4j@Servicepublic class RuleLogic202 implements ILogicHandler { public XxxResponse apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception{ log.info("link model02 RuleLogic202"); return new XxxResponse("hi 小傅哥!"); }}

123456789101112131415161718package cn.bugstack.test.types.rule02.logic;/** * @description */public class XxxResponse { private final String age; public XxxResponse(String age) { this.age = age; } public String getAge() { return age; }}

12345678910111213141516171819202122232425262728293031323334package cn.bugstack.test.types.rule02.factory;/** * @description */@Servicepublic class Rule02TradeRuleFactory { @Bean("demo01") public BusinessLinkedList demo01(RuleLogic201 ruleLogic201, RuleLogic202 ruleLogic202) { LinkArmory linkArmory = new LinkArmory<>("demo01", ruleLogic201, ruleLogic202); return linkArmory.getLogicLink(); } @Bean("demo02") public BusinessLinkedList demo02(RuleLogic202 ruleLogic202) { LinkArmory linkArmory = new LinkArmory<>("demo02", ruleLogic202); return linkArmory.getLogicLink(); } @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { private String age; }}

多例链与单例链最大的区别在于,工厂里直接配置了两个 Bean 对象,组装不同的链路。每个链路只维护自己的流程。

之后在单测类里分别测试两套链路。测试结果符合预期。

交易规则责任链过滤

完善拼团交易营销锁单的流程,增加锁单流程中的规则处理。

这部分的规则过滤,会使用到前面设计的统一的设计模式框架中的责任链模式。对这类轻量的场景,一般只需要选择单链的执行模型即可,而与之对比的规则树,是适合于那种节点间的复杂分支流转。

1. 业务流程

在前面的部分,我们实现了拼团锁单中,参数校验、幂等校验、达成校验,之后做了营销试算和营销锁单。

那么在这部分,还需要对营销锁单继续完善,过滤拼团活动配置的规则。包括:活动的有效期、状态,以及个人参与拼团的次数。在实际公司中的项目里,还会有更多的规则要被处理。

2. 编码实现

2.1 数据库修改

增加biz_id,即:业务id,构建唯一的索引标识,例如:100123_xiaofuge_1表示xiaofuge这个用户最多只能参与一次

2.2 工程结构

这部分会围绕 trade 领域内 service 服务下的规则进行实现。实现的过程会使用到前面部分定义的责任链模板。

2.3 功能修复

2.3.1 营销优惠记录支付金额

123456789// 在marketNode的第87行package cn.bugstack.domain.activity.service.trial.node;// 折扣价格,因为新增了payprice(支付价格)、deductionPrice(优惠价格)的字段BigDecimal payPrice = discountCalculateService.calculate(requestParameter.getUserId(), skuVO.getOriginalPrice(), groupBuyDiscount);dynamicContext.setDeductionPrice(skuVO.getOriginalPrice().subtract(payPrice));dynamicContext.setPayPrice(payPrice);

为了更好的使用拼团营销试算,明确试算返回的优惠折扣和支付金额,为此 在拼团试算 MarketNode 节点,增加 payPrice 属性填写支付金额。

2.3.2 营销优惠计算使用人群ID

12345678910111213141516171819202122232425262728public abstract class AbstractDiscountCalculateService implements IDiscountCalculateService { @Resource protected IActivityRepository repository; @Override public BigDecimal calculate(String userId, BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) { // 1. 人群标签过滤 if (DiscountTypeEnum.TAG.equals(groupBuyDiscount.getDiscountType())){ boolean isCrowdRange = filterTagId(userId, groupBuyDiscount.getTagId()); if (!isCrowdRange) { log.info("折扣优惠计算拦截,用户不再优惠人群标签范围内 userId:{}", userId); return originalPrice; } } // 2. 折扣优惠计算 return doCalculate(originalPrice, groupBuyDiscount); } // 人群过滤 - 限定人群优惠 private boolean filterTagId(String userId, String tagId) { // 判断用户是否存在人群中 return repository.isTagCrowdRange(tagId, userId); } protected abstract BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount);}

完善 AbstractDiscountCalculateService 的 filterTagId 中对人群标签的使用

2.4 交易规则模型

2.4.1 构建交易结算实体

1234567891011121314151617181920212223242526272829303132package cn.bugstack.domain.trade.model.entity;/** * @description 拼团交易结算规则反馈 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class TradeSettlementRuleFilterBackEntity { /** 拼单组队ID */ private String teamId; /** 活动ID */ private Long activityId; /** 目标数量 */ private Integer targetCount; /** 完成数量 */ private Integer completeCount; /** 锁单数量 */ private Integer lockCount; /** 状态(0-拼单中、1-完成、2-失败) */ private GroupBuyOrderEnumVO status; /** 拼团开始时间 - 参与拼团时间 */ private Date validStartTime; /** 拼团结束时间 - 拼团有效时长 */ private Date validEndTime; /** 回调配置 */ private NotifyConfigVO notifyConfigVO;}

123456789101112131415161718192021222324package cn.bugstack.domain.trade.model.entity;/** * @description 拼团交易结算规则命令 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class TradeSettlementRuleCommandEntity { /** 渠道 */ private String source; /** 来源 */ private String channel; /** 用户ID */ private String userId; /** 外部交易单号 */ private String outTradeNo; /** 外部交易时间 */ private Date outTradeTime;}

2.4.2 判断活动的可用性

123456789101112131415161718192021222324252627282930313233343536373839package cn.bugstack.domain.trade.service.settlement.filter;@Slf4j@Servicepublic class ActivityUsabilityRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeRuleFilterBackEntity apply(TradeRuleCommandEntity requestParameter, TradeRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("交易规则过滤-活动的可用性校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId()); // 查询拼团活动 GroupBuyActivityEntity groupBuyActivity = repository.queryGroupBuyActivityEntityByActivityId(requestParameter.getActivityId()); // 校验:活动状态 - 可以抛业务异常code,或者把code写入到动态上下文dynamicContext中,最后获取。 // ActivityStatusEnumVO 是一个枚举类,包括:活动状态(0创建、1生效、2过期、3废弃) if (!ActivityStatusEnumVO.EFFECTIVE.equals(groupBuyActivity.getStatus())) { log.info("活动的可用性校验,非生效状态 activityId:{}", requestParameter.getActivityId()); throw new AppException(ResponseCode.E0101); } // 校验:活动时间 Date currentTime = new Date(); if (currentTime.before(groupBuyActivity.getStartTime()) || currentTime.after(groupBuyActivity.getEndTime())) { log.info("活动的可用性校验,非可参与时间范围 activityId:{}", requestParameter.getActivityId()); throw new AppException(ResponseCode.E0102); } // 写入动态上下文 dynamicContext.setGroupBuyActivity(groupBuyActivity); // 走到下一个责任链节点 return next(requestParameter, dynamicContext); }}

活动可用性过滤活动的状态和时间。之后把活动数据写入到上下文中。

2.4.3 判断用户参与限制

123456789101112131415161718192021222324252627282930package cn.bugstack.domain.trade.service.settlement.filter;@Slf4j@Servicepublic class UserTakeLimitRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeRuleFilterBackEntity apply(TradeRuleCommandEntity requestParameter, TradeRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("交易规则过滤-用户参与次数校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId()); GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity(); // 查询用户在一个拼团活动上参与的次数 Integer count = repository.queryOrderCountByActivityId(requestParameter.getActivityId(), requestParameter.getUserId()); // 用户在一个拼团活动上参与的次数超过限定的次数 if (null != groupBuyActivity.getTakeLimitCount() && count >= groupBuyActivity.getTakeLimitCount()) { log.info("用户参与次数校验,已达可参与上限 activityId:{}", requestParameter.getActivityId()); throw new AppException(ResponseCode.E0103); } return TradeRuleFilterBackEntity.builder() .userTakeOrderCount(count) .build(); }}

如果拼团活动配置了对用户的参与次数限制,那么需要在用户参与活动前,做好数量的校验拦截。

比如:用户可以参与3次,那么库表里会记录3条数据:活动ID_用户ID_1、活动ID_用户ID_2、活动ID_用户ID_3,这样可以在库表层面做最强的防护拦截,不会让一个用户无限的参与活动。

2.4.4 构建交易规则工厂

123456789101112131415161718192021222324252627282930package cn.bugstack.domain.trade.service.settlement.factory;@Slf4j@Servicepublic class TradeRuleFilterFactory { // 这里的BusinessLinkedList是用的上面的部分定义的节点 @Bean("tradeRuleFilter") public BusinessLinkedList tradeRuleFilter(ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter) { // 组装链 LinkArmory linkArmory = new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter); // 链对象 return linkArmory.getLogicLink(); } @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { private GroupBuyActivityEntity groupBuyActivity; }}

责任链的规则创建完成后,就是在工厂类中构建责任链。

例如:在实际的公司业务开发中,一个责任链会有很多这样的规则,因为要过滤用户的开户状态、授信状态、渠道状态、风控状态、额度状态等。那么这样的设计分层结构就非常好扩展和维护了。

此外,也不只是一个责任链,而是很多,他们根据不同的业务模型,比如:乡村交易、政企交易、校园交易、普户交易等,他们的规则流程也是不同的,那么为不同的流程,直接配置出对应的责任链,就可以快速的搭建符合的业务流程了,开发效率、维护效率都很高,技术认可度也有很大的提高!

拼团组队结算统计

首先,在之前的拼团流程中,拼团的过程是用户在商城下单,锁定拼团优惠(也就是拼团系统里锁单的过程)。之后就是用户给这笔商品完成支付交易,交易后不会直接发货,直至拼团组队完成后才会发货。

那么,这里有一个流程,就是支付完成后,需要做拼团数量的统计结算。例如:拼团需要3个用户一起下单,那么每完成一笔支付,就要给拼团的组队加上一笔记录。

1. 业务流程

首先,交易订单的营销结算,核心就是更新拼团队伍的参与人数数量。每完成一笔支付,就有一笔拼团进度数量+1。

之后,这里要知道,更新拼团订单的明细状态(交易完成)和更新拼团进度数量要在一个事务下完成。

另外,更新拼团的进度要判断,当前是否为最后一次拼团完结状态。比如:计算剩余1个,即可完成拼团目标量,那么这最后一笔更新完成后,既是整个拼团队伍的进度完成了。

2. 编码实现

2.1 新增一张数据库表

123456789101112131415161718# 转储表 notify_task# ------------------------------------------------------------CREATE TABLE `notify_task` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', `activity_id` bigint(8) NOT NULL COMMENT '活动ID', `team_id` varchar(8) NOT NULL COMMENT '拼单组队ID', `notify_type` varchar(8) NOT NULL DEFAULT 'HTTP' COMMENT '回调类型(HTTP、MQ)', `notify_mq` varchar(32) DEFAULT NULL COMMENT '回调消息', `notify_url` varchar(128) DEFAULT NULL COMMENT '回调接口', `notify_count` int(8) NOT NULL COMMENT '回调次数', `notify_status` tinyint(1) NOT NULL COMMENT '回调状态【0初始、1完成、2重试、3失败】', `parameter_json` varchar(256) NOT NULL COMMENT '参数对象', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `uq_team_id` (`team_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2.2 工程结构

首先,重构交易分层,用类的名称,拆分交易中的业务场景。以此完成单一职责的划分。

之后,trade 领域下目前包括2部分服务,一个是营销交易锁单服务,另外一个是本节新增加的,营销交易结算服务。

重点,在于本部分实现的营销交易结算服务,对每一笔支付记录的记账处理,直至完成拼团进度。

2.3 重构锁单

单一职责的特点在于一个类,主要关心一类服务的设计。单一职责的好处是:降低复杂性 、提高可读性和可维护性 、提高复用性 、降低变更风险 。

那么,我们使用单一职责来设计接口,它的特点在于,接口的命名即可看出做的业务属性,如:ITradePay、ITradeRefund、ITradeSettlement 等。如果说没有单一职责,那么这些 pay、refund、settlement,就会被写到一个接口类里,这个类会随着业务的迭代,被充斥大量的业务逻辑,越往后越难维护。所以对于这类场景,我们要做单一职责设计。

2.4 拼团结算

2.4.1 定义拼团结算的接口

12345678910111213package cn.bugstack.domain.trade.service;public interface ITradeSettlementOrderService { /** * 营销结算 * @param tradePaySuccessEntity 交易支付订单实体对象 * @return 交易结算订单实体 */ TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity);}

首先,依照于单一职责,设计出一个交易结算的服务。

之后,定义营销结算的方法。入参为交易支付的成功实体信息,出参为交易结算的实体。

2.4.2 功能实现

12345678910111213141516171819202122232425262728293031323334353637383940414243444546package cn.bugstack.domain.trade.service.settlement;@Slf4j@Servicepublic class TradeSettlementOrderService implements ITradeSettlementOrderService { @Resource private ITradeRepository repository; @Override public TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity) { log.info("拼团交易-支付订单结算:{} outTradeNo:{}", tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); // 1. 查询拼团信息 MarketPayOrderEntity marketPayOrderEntity = repository.queryMarketPayOrderEntityByOutTradeNo(tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); if (null == marketPayOrderEntity) { log.info("不存在的外部交易单号或用户已退单,不需要做支付订单结算:{} outTradeNo:{}", tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); return null; } // 2. 查询组团信息 GroupBuyTeamEntity groupBuyTeamEntity = repository.queryGroupBuyTeamByTeamId(marketPayOrderEntity.getTeamId()); // 3. 构建聚合对象 GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate = GroupBuyTeamSettlementAggregate.builder() .userEntity(UserEntity.builder().userId(tradePaySuccessEntity.getUserId()).build()) .groupBuyTeamEntity(groupBuyTeamEntity) .tradePaySuccessEntity(tradePaySuccessEntity) .build(); // 4. 拼团交易结算 repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate); // 5. 返回结算信息 - 公司中开发这样的流程时候,会根据外部需要进行值的设置 return TradePaySettlementEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .teamId(marketPayOrderEntity.getTeamId()) .activityId(groupBuyTeamEntity.getActivityId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .build(); }}

首先,结算的过程分为查询外部的交易单号是否为拼团锁单订单,也就是说,之前这笔交易单号,参与过有效的锁单。

另外,商城类系统调用营销的要过滤是否有营销类信息,如果没有则不调用,这样会减轻对拼团类系统接口的压力。

最后,构建聚合对象,调用仓储层的 settlementMarketPayOrder 完成拼团类数据落库。

2.4.3 settlementMarketPayOrder事务操作

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869package cn.bugstack.infrastructure.adapter.repository;@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Resource private IGroupBuyActivityDao groupBuyActivityDao; @Resource private IGroupBuyOrderDao groupBuyOrderDao; @Resource private IGroupBuyOrderListDao groupBuyOrderListDao; @Resource private INotifyTaskDao notifyTaskDao; // ... 省略部分代码 @Transactional(timeout = 500) @Override public void settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate) { UserEntity userEntity = groupBuyTeamSettlementAggregate.getUserEntity(); GroupBuyTeamEntity groupBuyTeamEntity = groupBuyTeamSettlementAggregate.getGroupBuyTeamEntity(); TradePaySuccessEntity tradePaySuccessEntity = groupBuyTeamSettlementAggregate.getTradePaySuccessEntity(); // 1. 更新拼团订单明细状态 GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setUserId(userEntity.getUserId()); groupBuyOrderListReq.setOutTradeNo(tradePaySuccessEntity.getOutTradeNo()); int updateOrderListStatusCount = groupBuyOrderListDao.updateOrderStatus2COMPLETE(groupBuyOrderListReq); if (1 != updateOrderListStatusCount) { throw new AppException(ResponseCode.E0005); } // 2. 更新拼团达成数量 int updateAddCount = groupBuyOrderDao.updateAddCompleteCount(groupBuyTeamEntity.getTeamId()); if (1 != updateAddCount) { throw new AppException(ResponseCode.E0005); } // 3. 更新拼团完成状态 if (groupBuyTeamEntity.getTargetCount() - groupBuyTeamEntity.getCompleteCount() == 1) { int updateOrderStatusCount = groupBuyOrderDao.updateOrderStatus2COMPLETE(groupBuyTeamEntity.getTeamId()); if (1 != updateOrderStatusCount) { throw new AppException(ResponseCode.E0005); } // 查询拼团交易完成外部单号列表 List outTradeNoList = groupBuyOrderListDao.queryGroupBuyCompleteOrderOutTradeNoListByTeamId(groupBuyTeamEntity.getTeamId()); // 拼团完成写入回调任务记录 // 回调任务供外部接口访问和使用 NotifyTask notifyTask = new NotifyTask(); notifyTask.setActivityId(groupBuyTeamEntity.getActivityId()); notifyTask.setTeamId(groupBuyTeamEntity.getTeamId()); notifyTask.setNotifyUrl("暂无"); notifyTask.setNotifyCount(0); notifyTask.setNotifyStatus(0); notifyTask.setParameterJson(JSON.toJSONString(new HashMap() {{ put("teamId", groupBuyTeamEntity.getTeamId()); put("outTradeNoList", outTradeNoList); }})); notifyTaskDao.insert(notifyTask); } }}

交易结算的过程分为:更新拼团订单明细状态、更新拼团达成数量,之后要判断当前这笔结算是否为最后的结算,如果是,还需要写入回调任务。

这里的回调任务,是在拼团结束后,回调商城系统,通知拼团完成。之后商城系统要对已经支付完成的订单进行发货。

关于回调的流程,后续在做处理。现在也可以先尝试考虑下这个流程。

交易结算责任链过滤

拼团交易结算的过程,需要一些列的规则过滤。包括:我们上一部分提到的校验外部交易单的时间是否在拼团有效时间内,同时还有关于这笔外部交易单是否为有效的拼团锁单订单。另外像是 SC 渠道的有效性也需要在结算时进行校验。

所以,这部分我们需要实现一套规则链,来处理这些业务规则。因为规则链已经被抽取为通用的模板了

1. 业务流程

首先,这部分的重点在于新增加结算规则过滤的责任链,处理:SC渠道管控、有效的外部交易单号、结算实现是否为拼团时效内。

那么这里会有一些功能改造点:

拼团表:group_buy_order 增加 valid_start_time(有效开始时间)、valid_end_time(有效结束时间) 字段。用于每笔交易结算时候,用结算时间判断是否匹配到拼团有效时间范围内。

拼团明细:group_buy_order_list 增加 out_trade_time(交易时间) 字段,记录每笔结算的订单结算的时间。随着状态更新的时候更新。

trade 领域下,lock 锁单。实体对象,修改名称。TradeRuleCommandEntity → TradeLockRuleCommandEntity,TradeRuleFilterBackEntity → TradeLockRuleFilterBackEntity 增加了 Lock 标识。便于在添加 TradeSettlementRuleCommandEntity、TradeSettlementRuleFilterBackEntity 时更好理解。

PayActivityEntity 添加 validTime,GroupBuyTeamEntity 添加 validStartTime、validEndTime

trade 领域下,settlement 结算服务中,使用责任链模板,实现营销交易规则的过滤。SCRuleFilter(SC 黑名单管控过滤 DCCService 配置新的属性 scBlacklist)、OutTradeNoRuleFilter(外部交易单号有效性过滤)、SettableRuleFilter(交易时间是否在拼团有效时间内过滤)、EndRuleFilter(结束节点封装返回数据)

交易服务,TradePaySettlementEntity 调用 tradeSettlementRuleFilter 责任链方法,并返回相关的数据信息。

settlementMarketPayOrder 结算一个事务下操作,增加 updateOrderStatus2COMPLETE 更新时候添加 outTradeTime 时间。

2. 编码实现

2.1 工程结构

首先,你在实现功能编码的时候,要先考虑清楚你的模型结构如何处理。例如:本部分要做一个结算规则的处理,那么这些规则你是想在一个类里写大量的方法用 if...else 还是怎么实现。

之后,你要开始思考。如果不使用 if...else 一层层判断,那么就要先思考这块的业务流程,可以被分为几块处理。把他们的边界思考好。

然后,从一个个类下手,用类实现边界。也就是用类来区分你原来在一个大方法中写的一片流程代码,而每个类的衔接,可以使用规则树模型、责任链模型等,衔接中间流转的调用过程。

2.2 调整数据库表

2.2.1 group_buy_order

每一个拼团,都是从拼团活动表 group_buy_activity 获得的参与数据。也就是在 group_buy_activity 活动有效时间范围内,在加上拼团时间。就是 group_buy_order 拼团的有效开始和结束时间。

把这样的数据写到 group_buy_order 表里,而不是每次都从活动表计算。是为了把即时性数据存库,避免以后调整活动时间,造成客诉不好排查。

2.2.2 group_buy_order_list

group_buy_order_list 添加一个拼团交易完成的时间,这样可以更好的知道用户是多久锁单,多久支付的。支付时候有等待的。便于以后做数据量化计算。

2.3 结算规则

2.3.1 SCRuleFilter

12345678910111213141516171819202122232425262728package cn.bugstack.domain.trade.service.settlement.filter;/** * @description SC 渠道来源过滤 - 当某个签约渠道下架后,则不会记账 */@Slf4j@Servicepublic class SCRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("结算规则过滤-渠道黑名单校验{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo()); // sc 渠道黑名单拦截 boolean intercept = repository.isSCBlackIntercept(requestParameter.getSource(), requestParameter.getChannel()); if (intercept) { log.error("{}{} 渠道黑名单拦截", requestParameter.getSource(), requestParameter.getChannel()); throw new AppException(ResponseCode.E0105); } return next(requestParameter, dynamicContext); }}

2.3.2 OutTradeNoRuleFilter

1234567891011121314151617181920212223242526272829303132package cn.bugstack.domain.trade.service.settlement.filter;/** * @description 外部交易单号过滤;外部交易单号是否为退单 */@Slf4j@Servicepublic class OutTradeNoRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("结算规则过滤-外部单号校验{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo()); // 查询拼团信息 MarketPayOrderEntity marketPayOrderEntity = repository.queryMarketPayOrderEntityByOutTradeNo(requestParameter.getUserId(), requestParameter.getOutTradeNo()); if (null == marketPayOrderEntity) { log.error("不存在的外部交易单号或用户已退单,不需要做支付订单结算:{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo()); throw new AppException(ResponseCode.E0104); } dynamicContext.setMarketPayOrderEntity(marketPayOrderEntity); return next(requestParameter, dynamicContext); }}

2.3.3 SettableRuleFilter

123456789101112131415161718192021222324252627282930313233343536373839package cn.bugstack.domain.trade.service.settlement.filter;/** * @description 可结算规则过滤;交易时间 */@Slf4j@Servicepublic class SettableRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("结算规则过滤-有效时间校验{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo()); // 上下文:获取数据 MarketPayOrderEntity marketPayOrderEntity = dynamicContext.getMarketPayOrderEntity(); // 查询拼团对象 GroupBuyTeamEntity groupBuyTeamEntity = repository.queryGroupBuyTeamByTeamId(marketPayOrderEntity.getTeamId()); // 外部交易时间 - 也就是用户支付完成的时间,这个时间要在拼团有效时间范围内 Date outTradeTime = requestParameter.getOutTradeTime(); // 判断,外部交易时间,要小于拼团结束时间。否则抛异常。 if (!outTradeTime.before(groupBuyTeamEntity.getValidEndTime())) { log.error("订单交易时间不在拼团有效时间范围内"); throw new AppException(ResponseCode.E0106); } // 设置上下文 dynamicContext.setGroupBuyTeamEntity(groupBuyTeamEntity); return next(requestParameter, dynamicContext); }}

2.3.4 EndRuleFilter

1234567891011121314151617181920212223242526272829303132package cn.bugstack.domain.trade.service.settlement.filter;/** * @description 结束节点 */@Slf4j@Servicepublic class EndRuleFilter implements ILogicHandler { @Override public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("结算规则过滤-结束节点{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo()); // 获取上下文对象 GroupBuyTeamEntity groupBuyTeamEntity = dynamicContext.getGroupBuyTeamEntity(); // 返回封装数据 return TradeSettlementRuleFilterBackEntity.builder() .teamId(groupBuyTeamEntity.getTeamId()) .activityId(groupBuyTeamEntity.getActivityId()) .targetCount(groupBuyTeamEntity.getTargetCount()) .completeCount(groupBuyTeamEntity.getCompleteCount()) .lockCount(groupBuyTeamEntity.getLockCount()) .status(groupBuyTeamEntity.getStatus()) .validStartTime(groupBuyTeamEntity.getValidStartTime()) .validEndTime(groupBuyTeamEntity.getValidEndTime()) .notifyConfigVO(groupBuyTeamEntity.getNotifyConfigVO()) .build(); }}

2.4 规则工厂

12345678910111213141516171819202122232425262728293031323334353637383940package cn.bugstack.domain.trade.service.settlement.factory;/** * @description 交易结算规则过滤工厂 */@Slf4j@Servicepublic class TradeSettlementRuleFilterFactory { @Bean("tradeSettlementRuleFilter") public BusinessLinkedList tradeSettlementRuleFilter( SCRuleFilter scRuleFilter, OutTradeNoRuleFilter outTradeNoRuleFilter, SettableRuleFilter settableRuleFilter, EndRuleFilter endRuleFilter) { // 组装链 LinkArmory linkArmory = new LinkArmory<>("交易结算规则过滤链", scRuleFilter, outTradeNoRuleFilter, settableRuleFilter, endRuleFilter); // 链对象 return linkArmory.getLogicLink(); } // 构建上下文 @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { // 订单营销实体对象 private MarketPayOrderEntity marketPayOrderEntity; // 拼团组队实体对象 private GroupBuyTeamEntity groupBuyTeamEntity; }}

规则工厂:负责把责任链进行串联。之后提供一个指定名称的规则 Bean 对象。

2.5 使用规则

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667package cn.bugstack.domain.trade.service.settlement;/** * @description 拼团交易结算服务 */@Slf4j@Servicepublic class TradeSettlementOrderService implements ITradeSettlementOrderService { @Resource private ITradeRepository repository; @Resource private BusinessLinkedList tradeSettlementRuleFilter; @Override public TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity) throws Exception { log.info("拼团交易-支付订单结算:{} outTradeNo:{}", tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); // 1. 结算规则过滤 // 使用责任链实现 TradeSettlementRuleFilterBackEntity tradeSettlementRuleFilterBackEntity = tradeSettlementRuleFilter.apply( TradeSettlementRuleCommandEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .outTradeTime(tradePaySuccessEntity.getOutTradeTime()) .build(), new TradeSettlementRuleFilterFactory.DynamicContext()); String teamId = tradeSettlementRuleFilterBackEntity.getTeamId(); // 2. 查询组团信息 GroupBuyTeamEntity groupBuyTeamEntity = GroupBuyTeamEntity.builder() .teamId(tradeSettlementRuleFilterBackEntity.getTeamId()) .activityId(tradeSettlementRuleFilterBackEntity.getActivityId()) .targetCount(tradeSettlementRuleFilterBackEntity.getTargetCount()) .completeCount(tradeSettlementRuleFilterBackEntity.getCompleteCount()) .lockCount(tradeSettlementRuleFilterBackEntity.getLockCount()) .status(tradeSettlementRuleFilterBackEntity.getStatus()) .validStartTime(tradeSettlementRuleFilterBackEntity.getValidStartTime()) .validEndTime(tradeSettlementRuleFilterBackEntity.getValidEndTime()) .build(); // 3. 构建聚合对象 GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate = GroupBuyTeamSettlementAggregate.builder() .userEntity(UserEntity.builder().userId(tradePaySuccessEntity.getUserId()).build()) .groupBuyTeamEntity(groupBuyTeamEntity) .tradePaySuccessEntity(tradePaySuccessEntity) .build(); // 4. 拼团交易结算 repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate); // 5. 返回结算信息 - 公司中开发这样的流程时候,会根据外部需要进行值的设置 return TradePaySettlementEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .teamId(teamId) .activityId(groupBuyTeamEntity.getActivityId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .build(); }}

营销结算服务 TradeSettlementOrderService,调整在于:不在这里判断外部交易单号是否有效了,而是统一收到责任链中实现。

另外规则链会返回所需的数据,以及返回了 tradePaySuccessEntity.getOutTradeTime() 交易时间。这个时间会在操作数据时候做更新。

拼团回调通知任务

在微服务设计中,当一个微服务系统的流程结束后,要通知下一个微服务系统。这个通知的过程,可以是 RPC、MQ,也可以是 HTTP 方式。

RPC、MQ,这一类的都是需要有一个公用的注册中心,它的技术架构比较适合于公司内部的统一系统使用。如果是有和外部其他系统的对接,通常我们会使用 HTTP 这样统一标准协议的接口进行使用。

那么,本部分要为拼团组队交易结算完结后,实现一个回调通知的任务处理。告知另外的微服务系统可以进行后续的流程了。

注意:微信支付,支付宝支付,也是在完成支付后,做的这样的回调处理。

1. 业务流程

如图,拼团结算组队完成,回调通知

首先,本部分的重点在拼团成团后,实现回调通知流程。回调的过程,需要在用户锁单时需要增加一个回调的地址,并在拼团完结后发起回调。

那么,这里的一些功能改造点:

group_buy_order 在设计的时候有一个 notify_url 回调地址,这部分我们修改库表添加上这个字段。并对工程中的 dao&po&mapper 操作,增加 notify_url 字段。

MarketTradeController 营销交易服务,lockMarketPayOrder 锁单接口入参对象,增加 notifyUrl 回调地址。并有 PayDiscountEntity 对象透传到 TradeRepository#lockMarketPayOrder 仓储操作。这样写到 group_buy_order 表就有回调地址了,等做回调操作的时候,就可以把这个地址写入到回调任务表中。

TradeSettlementOrderService的 settlementMarketPayOrder 结算服务,需要把锁单记录中的 notify_url 拿到,放到 GroupBuyTeamEntity 中,这样在写入 notify_task 表记录的时候就可以把 notify_url 一起写入进去了。

基于 okhttp 框架,封装对 http 接口的调用。用于处理调用外部其他微服务,实现回调通知的处理。因为外部的接口是随着每个服务调用拼团写入进来的 http 请求地址,所以在封装这部分调用的时候,要允许动态透传请求地址。实现类写到 infrastructure 基础设置层的 gateway 调用外部网关层。实现类;GroupBuyNotifyService 提供方法;groupBuyNotify

在交易结算服务类 ITradeSettlementOrderService,定义执行结算回调通知接口,包括:execSettlementNotifyJob()、execSettlementNotifyJob(String teamId) 一个是有入参的,一个无入参。这样可以指定给某个拼团队伍做结算。结算的过程就是调用 GroupBuyNotifyService的groupBuyNotify 完成回调通知,并根据返回的结果更新 notify_task 表状态记录(成功、失败、重试),并记录回调次数,小于5次的时候都可以继续回调。

回调通知,可以分为两个阶段处理。一个是拼团完成后立即执行,另外一个任务补偿。立即执行是为了提供时效性,但因为远程的 http 调用受网络和服务的影响可能会失败,所以要增加一个任务补偿来做定时检查。其中立即执行在 TradeSettlementOrderService#settlementMarketPayOrder → settlementMarketPayOrder 处理。另外定时任务在 GroupBuyNotifyJob 处理。

测试接口,trigger/http 下,增加 TestApiClientController 接口实现类,提供回调接口服务。这个是模拟的其他的微服务,将来要提供的接口。

2. 编码实现

2.1 工程结构

首先,这是涉及本次改动的一些类。我们要实现的功能就是串联出 notify_url 并做回调通知处理。

2.2 库表变更

在 group_buy_order 表增加 notify_url 字段,写入回调地址。

2.3 锁单写入 - notify_url

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Resource private IGroupBuyActivityDao groupBuyActivityDao; @Resource private IGroupBuyOrderDao groupBuyOrderDao; @Resource private IGroupBuyOrderListDao groupBuyOrderListDao; @Resource private INotifyTaskDao notifyTaskDao; @Resource private DCCService dccService; // ... 省略部分代码 @Transactional(timeout = 500) @Override public MarketPayOrderEntity lockMarketPayOrder(GroupBuyOrderAggregate groupBuyOrderAggregate) { // 聚合对象信息 UserEntity userEntity = groupBuyOrderAggregate.getUserEntity(); PayActivityEntity payActivityEntity = groupBuyOrderAggregate.getPayActivityEntity(); PayDiscountEntity payDiscountEntity = groupBuyOrderAggregate.getPayDiscountEntity(); Integer userTakeOrderCount = groupBuyOrderAggregate.getUserTakeOrderCount(); // 判断是否有团 - teamId 为空 - 新团、为不空 - 老团 String teamId = payActivityEntity.getTeamId(); if (StringUtils.isBlank(teamId)) { // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID teamId = RandomStringUtils.randomNumeric(8); // 日期处理 Date currentDate = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(currentDate); calendar.add(Calendar.MINUTE, payActivityEntity.getValidTime()); // 构建拼团订单 GroupBuyOrder groupBuyOrder = GroupBuyOrder.builder() .teamId(teamId) .activityId(payActivityEntity.getActivityId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .targetCount(payActivityEntity.getTargetCount()) .completeCount(0) .lockCount(1) .validStartTime(currentDate) .validEndTime(calendar.getTime()) // 这里需要加上url的信息 .notifyUrl(payDiscountEntity.getNotifyUrl()) .build(); // 写入记录 groupBuyOrderDao.insert(groupBuyOrder); } else { // 更新记录 - 如果更新记录不等于1,则表示拼团已满,抛出异常 int updateAddTargetCount = groupBuyOrderDao.updateAddLockCount(teamId); if (1 != updateAddTargetCount) { throw new AppException(ResponseCode.E0005); } } // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID String orderId = RandomStringUtils.randomNumeric(12); GroupBuyOrderList groupBuyOrderListReq = GroupBuyOrderList.builder() .userId(userEntity.getUserId()) .teamId(teamId) .orderId(orderId) .activityId(payActivityEntity.getActivityId()) .startTime(payActivityEntity.getStartTime()) .endTime(payActivityEntity.getEndTime()) .goodsId(payDiscountEntity.getGoodsId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .status(TradeOrderStatusEnumVO.CREATE.getCode()) .outTradeNo(payDiscountEntity.getOutTradeNo()) // 构建 bizId 唯一值;活动id_用户id_参与次数累加 .bizId(payActivityEntity.getActivityId() + Constants.UNDERLINE + userEntity.getUserId() + Constants.UNDERLINE + (userTakeOrderCount + 1)) .build(); try { // 写入拼团记录 groupBuyOrderListDao.insert(groupBuyOrderListReq); } catch (DuplicateKeyException e) { throw new AppException(ResponseCode.INDEX_EXCEPTION); } return MarketPayOrderEntity.builder() .orderId(orderId) .deductionPrice(payDiscountEntity.getDeductionPrice()) .tradeOrderStatusEnumVO(TradeOrderStatusEnumVO.CREATE) .build(); } // ... 省略部分代码}

lockMarketPayOrder 方法中,从 PayDiscountEntity 获取 NotifyUrl 写入到拼团组队记录中。groupBuyOrderDao.insert(groupBuyOrder);

2.4 结算处理 - notify_url

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Resource private IGroupBuyActivityDao groupBuyActivityDao; @Resource private IGroupBuyOrderDao groupBuyOrderDao; @Resource private IGroupBuyOrderListDao groupBuyOrderListDao; @Resource private INotifyTaskDao notifyTaskDao; @Resource private DCCService dccService; // ... 省略部分代码 @Transactional(timeout = 500) @Override public void settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate) { UserEntity userEntity = groupBuyTeamSettlementAggregate.getUserEntity(); GroupBuyTeamEntity groupBuyTeamEntity = groupBuyTeamSettlementAggregate.getGroupBuyTeamEntity(); TradePaySuccessEntity tradePaySuccessEntity = groupBuyTeamSettlementAggregate.getTradePaySuccessEntity(); // 1. 更新拼团订单明细状态 GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setUserId(userEntity.getUserId()); groupBuyOrderListReq.setOutTradeNo(tradePaySuccessEntity.getOutTradeNo()); groupBuyOrderListReq.setOutTradeTime(tradePaySuccessEntity.getOutTradeTime()); int updateOrderListStatusCount = groupBuyOrderListDao.updateOrderStatus2COMPLETE(groupBuyOrderListReq); if (1 != updateOrderListStatusCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 2. 更新拼团达成数量 int updateAddCount = groupBuyOrderDao.updateAddCompleteCount(groupBuyTeamEntity.getTeamId()); if (1 != updateAddCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 3. 更新拼团完成状态 if (groupBuyTeamEntity.getTargetCount() - groupBuyTeamEntity.getCompleteCount() == 1) { int updateOrderStatusCount = groupBuyOrderDao.updateOrderStatus2COMPLETE(groupBuyTeamEntity.getTeamId()); if (1 != updateOrderStatusCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 查询拼团交易完成外部单号列表 List outTradeNoList = groupBuyOrderListDao.queryGroupBuyCompleteOrderOutTradeNoListByTeamId(groupBuyTeamEntity.getTeamId()); // 拼团完成写入回调任务记录 NotifyTask notifyTask = new NotifyTask(); notifyTask.setActivityId(groupBuyTeamEntity.getActivityId()); notifyTask.setTeamId(groupBuyTeamEntity.getTeamId()); notifyTask.setNotifyUrl(groupBuyTeamEntity.getNotifyUrl()); notifyTask.setNotifyCount(0); notifyTask.setNotifyStatus(0); notifyTask.setParameterJson(JSON.toJSONString(new HashMap() {{ put("teamId", groupBuyTeamEntity.getTeamId()); put("outTradeNoList", outTradeNoList); }})); notifyTaskDao.insert(notifyTask); } } // ... 省略部分代码}

settlementMarketPayOrder 方法,从 GroupBuyTeamEntity 拼团记录获取到 notify_url 写入到 NotifyTask 表记录中。

2.5 回调网关 - okhttp

2.5.1 添加pom文件

1234567891011 com.squareup.okhttp3 okhttp-sse 3.14.9 com.squareup.okhttp3 logging-interceptor 3.14.9

2.5.2 添加okhttp客户端

1234567891011121314package cn.bugstack.config;/** * @description http 框架 */@Configurationpublic class OKHttpClientConfig { @Bean public OkHttpClient httpClient() { return new OkHttpClient(); }}

2.5.3 添加okhttp回调网关

1234567891011121314151617181920212223242526272829303132333435package cn.bugstack.infrastructure.gateway;/** * @description 拼团回调服务 */@Slf4j@Servicepublic class GroupBuyNotifyService { @Resource private OkHttpClient okHttpClient; public String groupBuyNotify(String apiUrl, String notifyRequestDTOJSON) throws Exception { try { // 1. 构建参数 MediaType mediaType = MediaType.parse("application/json"); RequestBody body = RequestBody.create(mediaType, notifyRequestDTOJSON); Request request = new Request.Builder() .url(apiUrl) .post(body) .addHeader("content-type", "application/json") .build(); // 2. 调用接口 Response response = okHttpClient.newCall(request).execute(); // 3. 返回结果 return response.body().string(); } catch (Exception e) { log.error("拼团回调 HTTP 接口服务异常 {}", apiUrl, e); throw new AppException(ResponseCode.HTTP_EXCEPTION); } }}

回调服务,在基础设置层的 gateway 中实现。用于动态调用外部的接口,进行回调通知。

2.6 回调封装

2.6.1 定义回调接口

1234567891011package cn.bugstack.domain.trade.adapter.port;/** * @description 交易接口服务接口 */public interface ITradePort { String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception;}

2.6.2 实现回调接口

12345678910111213141516171819package cn.bugstack.types.enums;/** * @description 回调任务状态 */@Getter@AllArgsConstructor@NoArgsConstructorpublic enum NotifyTaskHTTPEnumVO { SUCCESS("success", "成功"), ERROR("error", "失败"), NULL(null, "空执行"), ; private String code; private String info;}

123456789101112131415161718192021222324252627282930313233343536373839404142434445package cn.bugstack.infrastructure.adapter.port;/** * @description */@Servicepublic class TradePort implements ITradePort { @Resource private GroupBuyNotifyService groupBuyNotifyService; @Resource private IRedisService redisService; @Override public String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception { RLock lock = redisService.getLock(notifyTask.lockKey()); try { // group-buy-market 拼团服务端会被部署到多台应用服务器上,那么就会有很多任务一起执行。这个时候要进行抢占,避免被多次执行 if (lock.tryLock(3, 0, TimeUnit.SECONDS)) { try { // 回调方式 HTTP if (NotifyTypeEnumVO.HTTP.getCode().equals(notifyTask.getNotifyType())) { // 无效的 notifyUrl 则直接返回成功 if (StringUtils.isBlank(notifyTask.getNotifyUrl()) || "暂无".equals(notifyTask.getNotifyUrl())) { return NotifyTaskHTTPEnumVO.SUCCESS.getCode(); } return groupBuyNotifyService.groupBuyNotify(notifyTask.getNotifyUrl(), notifyTask.getParameterJson()); } } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } } return NotifyTaskHTTPEnumVO.NULL.getCode(); } catch (Exception e) { Thread.currentThread().interrupt(); return NotifyTaskHTTPEnumVO.NULL.getCode(); } }}

2.6.3 实现回调封装

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869@Slf4j@Servicepublic class TradeSettlementOrderService implements ITradeSettlementOrderService { @Resource private ITradeRepository repository; @Resource private ITradePort port; @Resource private BusinessLinkedList tradeSettlementRuleFilter; // ... 省略部分方法 @Override public Map execSettlementNotifyJob() throws Exception { log.info("拼团交易-执行结算通知任务"); // 查询未执行任务 List notifyTaskEntityList = repository.queryUnExecutedNotifyTaskList(); return execSettlementNotifyJob(notifyTaskEntityList); } // 指定teamId进行回调 @Override public Map execSettlementNotifyJob(String teamId) throws Exception { log.info("拼团交易-执行结算通知回调,指定 teamId:{}", teamId); List notifyTaskEntityList = repository.queryUnExecutedNotifyTaskList(teamId); return execSettlementNotifyJob(notifyTaskEntityList); } private Map execSettlementNotifyJob(List notifyTaskEntityList) throws Exception { int successCount = 0, errorCount = 0, retryCount = 0; for (NotifyTaskEntity notifyTask : notifyTaskEntityList) { // 回调处理 success 成功,error 失败 String response = port.groupBuyNotify(notifyTask); // 更新状态判断 & 变更数据库表回调任务状态 if (NotifyTaskHTTPEnumVO.SUCCESS.getCode().equals(response)) { int updateCount = repository.updateNotifyTaskStatusSuccess(notifyTask.getTeamId()); if (1 == updateCount) { successCount += 1; } } else if (NotifyTaskHTTPEnumVO.ERROR.getCode().equals(response)) { if (notifyTask.getNotifyCount() < 5) { int updateCount = repository.updateNotifyTaskStatusError(notifyTask.getTeamId()); if (1 == updateCount) { errorCount += 1; } } else { int updateCount = repository.updateNotifyTaskStatusRetry(notifyTask.getTeamId()); if (1 == updateCount) { retryCount += 1; } } } } Map resultMap = new HashMap<>(); resultMap.put("waitCount", notifyTaskEntityList.size()); resultMap.put("successCount", successCount); resultMap.put("errorCount", errorCount); resultMap.put("retryCount", retryCount); return resultMap; }}

execSettlementNotifyJob(List notifyTaskEntityList) 回调任务,调用 port.groupBuyNotify(notifyTask) 回调服务。

根据回调服务返回的结果,更新回调任务表记录。包括:调用了几次、成功的状态。

2.7 调用处理

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071package cn.bugstack.domain.trade.service.settlement;@Slf4j@Servicepublic class TradeSettlementOrderService implements ITradeSettlementOrderService { @Resource private ITradeRepository repository; @Resource private ITradePort port; @Resource private BusinessLinkedList tradeSettlementRuleFilter; @Override public TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity) throws Exception { log.info("拼团交易-支付订单结算:{} outTradeNo:{}", tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); // 1. 结算规则过滤 TradeSettlementRuleFilterBackEntity tradeSettlementRuleFilterBackEntity = tradeSettlementRuleFilter.apply( TradeSettlementRuleCommandEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .outTradeTime(tradePaySuccessEntity.getOutTradeTime()) .build(), new TradeSettlementRuleFilterFactory.DynamicContext()); String teamId = tradeSettlementRuleFilterBackEntity.getTeamId(); // 2. 查询组团信息 GroupBuyTeamEntity groupBuyTeamEntity = GroupBuyTeamEntity.builder() .teamId(tradeSettlementRuleFilterBackEntity.getTeamId()) .activityId(tradeSettlementRuleFilterBackEntity.getActivityId()) .targetCount(tradeSettlementRuleFilterBackEntity.getTargetCount()) .completeCount(tradeSettlementRuleFilterBackEntity.getCompleteCount()) .lockCount(tradeSettlementRuleFilterBackEntity.getLockCount()) .status(tradeSettlementRuleFilterBackEntity.getStatus()) .validStartTime(tradeSettlementRuleFilterBackEntity.getValidStartTime()) .validEndTime(tradeSettlementRuleFilterBackEntity.getValidEndTime()) .notifyUrl(tradeSettlementRuleFilterBackEntity.getNotifyUrl()) .build(); // 3. 构建聚合对象 GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate = GroupBuyTeamSettlementAggregate.builder() .userEntity(UserEntity.builder().userId(tradePaySuccessEntity.getUserId()).build()) .groupBuyTeamEntity(groupBuyTeamEntity) .tradePaySuccessEntity(tradePaySuccessEntity) .build(); // 4. 拼团交易结算 repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate); // 5. 组队回调处理 - 处理失败也会有定时任务补偿,通过这样的方式,可以减轻任务调度,提高时效性 Map notifyResultMap = execSettlementNotifyJob(teamId); log.info("回调通知拼团完结 result:{}", JSON.toJSONString(notifyResultMap)); // 6. 返回结算信息 - 公司中开发这样的流程时候,会根据外部需要进行值的设置 return TradePaySettlementEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .teamId(teamId) .activityId(groupBuyTeamEntity.getActivityId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .build(); } // ... 省略部分代码}

Map notifyResultMap = execSettlementNotifyJob(teamId); 回调通知处理。这步失败也没关系,后续会有定时任务处理。

2.8 任务补偿(定时任务实现)

123456789101112package cn.bugstack;@SpringBootApplication@Configurable@EnableSchedulingpublic class Application { public static void main(String[] args){ SpringApplication.run(Application.class); }}

123456789101112131415161718192021222324package cn.bugstack.trigger.job;/** * @description 拼团完结回调通知任务;拼团回调任务表,实际公司场景会定时清理数据结转,不会有太多数据挤压 */@Slf4j@Servicepublic class GroupBuyNotifyJob { @Resource private ITradeSettlementOrderService tradeSettlementOrderService; @Scheduled(cron = "0/15 * * * * ?") public void exec() { try { Map result = tradeSettlementOrderService.execSettlementNotifyJob(); log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result)); } catch (Exception e) { log.error("定时任务,回调通知拼团完结任务失败", e); } }}

首先,app 下的 Application 类,要添加一个 @EnableScheduling 注解,这样才能使用任务。

之后,trigger 下的 job 包下,创建 GroupBuyNotifyJob 回调任务,调用回调通知。

根据UI展示封装接口

根据在上一部分使用 DeepSeek 实现的拼团 UI,设计并实现所需的服务端接口。

在互联网公司里的开发过程也是这样,产品在评审期间,会提供 UI 工程师做好的设计图,研发拿到设计图后,提供所需的接口提供相应的字段。

1. 接口分析

紫色圈:10人再抢,是拼团的统计数据。类似的还有总共开多少团、成功的拼团等,如果有展示需求,都可以在拼团统计中给出。

灰色圈:商品信息,商品金额、优惠金额、支付金额等。

绿色圈:参与拼团,UI 调用的操作是锁单的处理。在完整的流程中是调用商城类系统,发起交易,之后由商城类系统进行营销锁单。我们这里模拟,所以从前端开始锁单。

黄色券:这里不是真实对接扫码支付,所以要点支付完成,才能触发拼团结算。所以这里需要调用拼团里的结算接口。

2. 编码实现

2.1 工程结构

2.2 首页 - 拼团营销配置接口

2.2.1 定义商品营销应答对象

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104package cn.bugstack.api.dto;/** * @description 商品营销应答对象 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class GoodsMarketResponseDTO { // 商品信息 private Goods goods; // 组队信息(1个个人的置顶、2个随机的「获取10个,随机取2个」) private List teamList; // 组队统计 private TeamStatistic teamStatistic; /** * 商品信息 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class Goods { // 商品ID private String goodsId; // 原始价格 private BigDecimal originalPrice; // 折扣金额 private BigDecimal deductionPrice; // 支付价格 private BigDecimal payPrice; } /** * 组队信息 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class Team { // 用户ID private String userId; // 拼单组队ID private String teamId; // 活动ID private Long activityId; // 目标数量 private Integer targetCount; // 完成数量 private Integer completeCount; // 锁单数量 private Integer lockCount; // 拼团开始时间 - 参与拼团时间 private Date validStartTime; // 拼团结束时间 - 拼团有效时长 private Date validEndTime; // 倒计时(字符串) validEndTime - validStartTime private String validTimeCountdown; /** 外部交易单号-确保外部调用唯一幂等 */ private String outTradeNo; public static String differenceDateTime2Str(Date validStartTime, Date validEndTime) { if (validStartTime == null || validEndTime == null) { return "无效的时间"; } long diffInMilliseconds = validEndTime.getTime() - validStartTime.getTime(); if (diffInMilliseconds < 0) { return "已结束"; } long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMilliseconds) % 60; long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMilliseconds) % 60; long hours = TimeUnit.MILLISECONDS.toHours(diffInMilliseconds) % 24; long days = TimeUnit.MILLISECONDS.toDays(diffInMilliseconds); return String.format("%02d:%02d:%02d", hours, minutes, seconds); } } /** * 组队统计 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public static class TeamStatistic { // 开团队伍数量 private Integer allTeamCount; // 成团队伍数量 private Integer allTeamCompleteCount; // 参团人数总量 - 一个商品的总参团人数 private Integer allTeamUserCount; }}

首先,这里要注意,不要把页面的所有要的属性,都用一个个属性字段平铺到类中,那样以后维护会非常复杂。要思考这些属性的归属问题,定义不同的对象承载。

之后,根据页面诉求,定义三个对象:Goods - 商品、Team - 拼团、TeamStatistic - 统计。

2.2.2 定义服务接口

12345678910111213package cn.bugstack.api;public interface IMarketIndexService { /** * 查询拼团营销配置 * * @param goodsMarketRequestDTO 营销商品信息 * @return 营销配置信息 */ Response queryGroupBuyMarketConfig(GoodsMarketRequestDTO goodsMarketRequestDTO)}

2.2.3 实现服务接口

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102package cn.bugstack.trigger.http;/** * @description 营销首页服务 */@Slf4j@RestController()@CrossOrigin("*")@RequestMapping("/api/v1/gbm/index/")public class MarketIndexController implements IMarketIndexService { @Resource private IIndexGroupBuyMarketService indexGroupBuyMarketService; @RequestMapping(value = "query_group_buy_market_config", method = RequestMethod.POST) @Override public Response queryGroupBuyMarketConfig(@RequestBody GoodsMarketRequestDTO goodsMarketRequestDTO) { try { log.info("查询拼团营销配置开始:{} goodsId:{}", goodsMarketRequestDTO.getUserId(), goodsMarketRequestDTO.getGoodsId()); // 判断参数信息 if (StringUtils.isBlank(goodsMarketRequestDTO.getUserId()) || StringUtils.isBlank(goodsMarketRequestDTO.getSource()) || StringUtils.isBlank(goodsMarketRequestDTO.getChannel()) || StringUtils.isBlank(goodsMarketRequestDTO.getGoodsId())) { return Response.builder() .code(ResponseCode.ILLEGAL_PARAMETER.getCode()) .info(ResponseCode.ILLEGAL_PARAMETER.getInfo()) .build(); } // 1. 营销优惠试算 TrialBalanceEntity trialBalanceEntity = indexGroupBuyMarketService.indexMarketTrial(MarketProductEntity.builder() .userId(goodsMarketRequestDTO.getUserId()) .source(goodsMarketRequestDTO.getSource()) .channel(goodsMarketRequestDTO.getChannel()) .goodsId(goodsMarketRequestDTO.getGoodsId()) .build()); GroupBuyActivityDiscountVO groupBuyActivityDiscountVO = trialBalanceEntity.getGroupBuyActivityDiscountVO(); Long activityId = groupBuyActivityDiscountVO.getActivityId(); // 2. 查询拼团组队 List userGroupBuyOrderDetailEntities = indexGroupBuyMarketService.queryInProgressUserGroupBuyOrderDetailList(activityId, goodsMarketRequestDTO.getUserId(), 1, 2); // 3. 统计拼团数据 TeamStatisticVO teamStatisticVO = indexGroupBuyMarketService.queryTeamStatisticByActivityId(activityId); GoodsMarketResponseDTO.Goods goods = GoodsMarketResponseDTO.Goods.builder() .goodsId(trialBalanceEntity.getGoodsId()) .originalPrice(trialBalanceEntity.getOriginalPrice()) .deductionPrice(trialBalanceEntity.getDeductionPrice()) .payPrice(trialBalanceEntity.getPayPrice()) .build(); List teams = new ArrayList<>(); if (null != userGroupBuyOrderDetailEntities && !userGroupBuyOrderDetailEntities.isEmpty()) { for (UserGroupBuyOrderDetailEntity userGroupBuyOrderDetailEntity : userGroupBuyOrderDetailEntities) { GoodsMarketResponseDTO.Team team = GoodsMarketResponseDTO.Team.builder() .userId(userGroupBuyOrderDetailEntity.getUserId()) .teamId(userGroupBuyOrderDetailEntity.getTeamId()) .activityId(userGroupBuyOrderDetailEntity.getActivityId()) .targetCount(userGroupBuyOrderDetailEntity.getTargetCount()) .completeCount(userGroupBuyOrderDetailEntity.getCompleteCount()) .lockCount(userGroupBuyOrderDetailEntity.getLockCount()) .validStartTime(userGroupBuyOrderDetailEntity.getValidStartTime()) .validEndTime(userGroupBuyOrderDetailEntity.getValidEndTime()) .validTimeCountdown(GoodsMarketResponseDTO.Team.differenceDateTime2Str(new Date(), userGroupBuyOrderDetailEntity.getValidEndTime())) .outTradeNo(userGroupBuyOrderDetailEntity.getOutTradeNo()) .build(); teams.add(team); } } GoodsMarketResponseDTO.TeamStatistic teamStatistic = GoodsMarketResponseDTO.TeamStatistic.builder() .allTeamCount(teamStatisticVO.getAllTeamCount()) .allTeamCompleteCount(teamStatisticVO.getAllTeamCompleteCount()) .allTeamUserCount(teamStatisticVO.getAllTeamUserCount()) .build(); Response response = Response.builder() .code(ResponseCode.SUCCESS.getCode()) .info(ResponseCode.SUCCESS.getInfo()) .data(GoodsMarketResponseDTO.builder() .goods(goods) .teamList(teams) .teamStatistic(teamStatistic) .build()) .build(); log.info("查询拼团营销配置完成:{} goodsId:{} response:{}", goodsMarketRequestDTO.getUserId(), goodsMarketRequestDTO.getGoodsId(), JSON.toJSONString(response)); return response; } catch (Exception e) { log.error("查询拼团营销配置失败:{} goodsId:{}", goodsMarketRequestDTO.getUserId(), goodsMarketRequestDTO.getGoodsId(), e); return Response.builder() .code(ResponseCode.UN_ERROR.getCode()) .info(ResponseCode.UN_ERROR.getInfo()) .build(); } }}

接口的查询组装数据主要分为3部分:营销优惠试算、查询拼团组队、统计拼团数据。

其中,查询拼团组队 分为优先查询一个,个人的一条拼团数据显示在最上面。之后在随机查询2条其他人的拼团数据。查询的过程要先查询个人的明细数据,之后从明细数据获得拼团队伍 teamId,在查询拼团队伍的信息,最后组装数据。随机查询可以查询2倍量的数据,之后在随机获取需要的量的数据。

2.2.4 查询拼团组队数据组装

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158package cn.bugstack.infrastructure.adapter.repository;@Repositorypublic class ActivityRepository implements IActivityRepository { @Resource private IGroupBuyActivityDao groupBuyActivityDao; @Resource private IGroupBuyDiscountDao groupBuyDiscountDao; @Resource private ISkuDao skuDao; @Resource private ISCSkuActivityDao skuActivityDao; @Resource private IRedisService redisService; @Resource private DCCService dccService; @Resource private IGroupBuyOrderDao groupBuyOrderDao; @Resource private IGroupBuyOrderListDao groupBuyOrderListDao; // ... 省略部分代码 // 组装个人的数据 @Override public List queryInProgressUserGroupBuyOrderDetailListByOwner(Long activityId, String userId, Integer ownerCount) { // 1. 根据用户ID、活动ID,查询用户参与的拼团队伍 GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setActivityId(activityId); groupBuyOrderListReq.setUserId(userId); groupBuyOrderListReq.setCount(ownerCount); List groupBuyOrderLists = groupBuyOrderListDao.queryInProgressUserGroupBuyOrderDetailListByUserId(groupBuyOrderListReq); if (null == groupBuyOrderLists || groupBuyOrderLists.isEmpty()) return null; // 2. 过滤队伍获取 TeamId Set teamIds = groupBuyOrderLists.stream() .map(GroupBuyOrderList::getTeamId) .filter(teamId -> teamId != null && !teamId.isEmpty()) // 过滤非空和非空字符串 .collect(Collectors.toSet()); // 3. 查询队伍明细,组装Map结构 List groupBuyOrders = groupBuyOrderDao.queryGroupBuyProgressByTeamIds(teamIds); if (null == groupBuyOrders || groupBuyOrders.isEmpty()) return null; Map groupBuyOrderMap = groupBuyOrders.stream() .collect(Collectors.toMap(GroupBuyOrder::getTeamId, order -> order)); // 4. 转换数据 List userGroupBuyOrderDetailEntities = new ArrayList<>(); for (GroupBuyOrderList groupBuyOrderList : groupBuyOrderLists) { String teamId = groupBuyOrderList.getTeamId(); GroupBuyOrder groupBuyOrder = groupBuyOrderMap.get(teamId); if (null == groupBuyOrder) continue; UserGroupBuyOrderDetailEntity userGroupBuyOrderDetailEntity = UserGroupBuyOrderDetailEntity.builder() .userId(groupBuyOrderList.getUserId()) .teamId(groupBuyOrder.getTeamId()) .activityId(groupBuyOrder.getActivityId()) .targetCount(groupBuyOrder.getTargetCount()) .completeCount(groupBuyOrder.getCompleteCount()) .lockCount(groupBuyOrder.getLockCount()) .validStartTime(groupBuyOrder.getValidStartTime()) .validEndTime(groupBuyOrder.getValidEndTime()) .outTradeNo(groupBuyOrderList.getOutTradeNo()) .build(); userGroupBuyOrderDetailEntities.add(userGroupBuyOrderDetailEntity); } return userGroupBuyOrderDetailEntities; } // 随机组装非个人用户的数据 @Override public List queryInProgressUserGroupBuyOrderDetailListByRandom(Long activityId, String userId, Integer randomCount) { // 1. 根据用户ID、活动ID,查询用户参与的拼团队伍 GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setActivityId(activityId); groupBuyOrderListReq.setUserId(userId); groupBuyOrderListReq.setCount(randomCount * 2); // 查询2倍的量,之后其中 randomCount 数量 List groupBuyOrderLists = groupBuyOrderListDao.queryInProgressUserGroupBuyOrderDetailListByRandom(groupBuyOrderListReq); if (null == groupBuyOrderLists || groupBuyOrderLists.isEmpty()) return null; // 判断总量是否大于 randomCount if (groupBuyOrderLists.size() > randomCount) { // 随机打乱列表 Collections.shuffle(groupBuyOrderLists); // 获取前 randomCount 个元素 groupBuyOrderLists = groupBuyOrderLists.subList(0, randomCount); } // 2. 过滤队伍获取 TeamId Set teamIds = groupBuyOrderLists.stream() .map(GroupBuyOrderList::getTeamId) .filter(teamId -> teamId != null && !teamId.isEmpty()) // 过滤非空和非空字符串 .collect(Collectors.toSet()); // 3. 查询队伍明细,组装Map结构 List groupBuyOrders = groupBuyOrderDao.queryGroupBuyProgressByTeamIds(teamIds); if (null == groupBuyOrders || groupBuyOrders.isEmpty()) return null; Map groupBuyOrderMap = groupBuyOrders.stream() .collect(Collectors.toMap(GroupBuyOrder::getTeamId, order -> order)); // 4. 转换数据 List userGroupBuyOrderDetailEntities = new ArrayList<>(); for (GroupBuyOrderList groupBuyOrderList : groupBuyOrderLists) { String teamId = groupBuyOrderList.getTeamId(); GroupBuyOrder groupBuyOrder = groupBuyOrderMap.get(teamId); if (null == groupBuyOrder) continue; UserGroupBuyOrderDetailEntity userGroupBuyOrderDetailEntity = UserGroupBuyOrderDetailEntity.builder() .userId(groupBuyOrderList.getUserId()) .teamId(groupBuyOrder.getTeamId()) .activityId(groupBuyOrder.getActivityId()) .targetCount(groupBuyOrder.getTargetCount()) .completeCount(groupBuyOrder.getCompleteCount()) .lockCount(groupBuyOrder.getLockCount()) .validStartTime(groupBuyOrder.getValidStartTime()) .validEndTime(groupBuyOrder.getValidEndTime()) .build(); userGroupBuyOrderDetailEntities.add(userGroupBuyOrderDetailEntity); } return userGroupBuyOrderDetailEntities; } // 拼团队伍统计 @Override public TeamStatisticVO queryTeamStatisticByActivityId(Long activityId) { // 1. 根据活动ID查询拼团队伍 List groupBuyOrderLists = groupBuyOrderListDao.queryInProgressUserGroupBuyOrderDetailListByActivityId(activityId); // 2. 过滤队伍获取 TeamId Set teamIds = groupBuyOrderLists.stream() .map(GroupBuyOrderList::getTeamId) .filter(teamId -> teamId != null && !teamId.isEmpty()) // 过滤非空和非空字符串 .collect(Collectors.toSet()); // 3. 统计数据 Integer allTeamCount = groupBuyOrderDao.queryAllTeamCount(teamIds); Integer allTeamCompleteCount = groupBuyOrderDao.queryAllTeamCompleteCount(teamIds); Integer allTeamUserCount = groupBuyOrderDao.queryAllUserCount(teamIds); // 4. 构建对象 return TeamStatisticVO.builder() .allTeamCount(allTeamCount) .allTeamCompleteCount(allTeamCompleteCount) .allTeamUserCount(allTeamUserCount) .build(); }}

queryInProgressUserGroupBuyOrderDetailListByOwner:组装个人用户数据。

queryInProgressUserGroupBuyOrderDetailListByRandom:随机组装非个人用户数据。

queryTeamStatisticByActivityId:拼团队伍统计。

这些数据的查询,也可以用 Redis 缓存优化以及使用定时任务 + 异步消息统计好,查询的时候直接使用。

2.2.5 交易 - 结算接口

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576package cn.bugstack.trigger.http;@Slf4j@RestController()@CrossOrigin("*")@RequestMapping("/api/v1/gbm/trade/")public class MarketTradeController implements IMarketTradeService { @Resource private IIndexGroupBuyMarketService indexGroupBuyMarketService; @Resource private ITradeLockOrderService tradeOrderService; @Resource private ITradeSettlementOrderService tradeSettlementOrderService; // ... 省略部分代码 @RequestMapping(value = "settlement_market_pay_order", method = RequestMethod.POST) @Override public Response settlementMarketPayOrder(@RequestBody SettlementMarketPayOrderRequestDTO requestDTO) { try { log.info("营销交易组队结算开始:{} outTradeNo:{}", requestDTO.getUserId(), requestDTO.getOutTradeNo()); if (StringUtils.isBlank(requestDTO.getUserId()) || StringUtils.isBlank(requestDTO.getSource()) || StringUtils.isBlank(requestDTO.getChannel()) || StringUtils.isBlank(requestDTO.getOutTradeNo()) || null == requestDTO.getOutTradeTime()) { return Response.builder() .code(ResponseCode.ILLEGAL_PARAMETER.getCode()) .info(ResponseCode.ILLEGAL_PARAMETER.getInfo()) .build(); } // 1. 结算服务 TradePaySettlementEntity tradePaySettlementEntity = tradeSettlementOrderService.settlementMarketPayOrder(TradePaySuccessEntity.builder() .source(requestDTO.getSource()) .channel(requestDTO.getChannel()) .userId(requestDTO.getUserId()) .outTradeNo(requestDTO.getOutTradeNo()) .outTradeTime(requestDTO.getOutTradeTime()) .build()); SettlementMarketPayOrderResponseDTO responseDTO = SettlementMarketPayOrderResponseDTO.builder() .userId(tradePaySettlementEntity.getUserId()) .teamId(tradePaySettlementEntity.getTeamId()) .activityId(tradePaySettlementEntity.getActivityId()) .outTradeNo(tradePaySettlementEntity.getOutTradeNo()) .build(); // 返回结果 Response response = Response.builder() .code(ResponseCode.SUCCESS.getCode()) .info(ResponseCode.SUCCESS.getInfo()) .data(SettlementMarketPayOrderResponseDTO.builder() .userId(tradePaySettlementEntity.getUserId()) .teamId(tradePaySettlementEntity.getTeamId()) .activityId(tradePaySettlementEntity.getActivityId()) .outTradeNo(tradePaySettlementEntity.getOutTradeNo()) .build()) .build(); log.info("营销交易组队结算完成:{} outTradeNo:{} response:{}", requestDTO.getUserId(), requestDTO.getOutTradeNo(), JSON.toJSONString(response)); return response; } catch (AppException e) { log.error("营销交易组队结算异常:{} LockMarketPayOrderRequestDTO:{}", requestDTO.getUserId(), JSON.toJSONString(requestDTO), e); return Response.builder() .code(e.getCode()) .info(e.getInfo()) .build(); } catch (Exception e) { log.error("营销交易组队结算失败:{} LockMarketPayOrderRequestDTO:{}", requestDTO.getUserId(), JSON.toJSONString(requestDTO), e); return Response.builder() .code(ResponseCode.UN_ERROR.getCode()) .info(ResponseCode.UN_ERROR.getInfo()) .build(); } }}

settlementMarketPayOrder 为结算接口,这个接口就是对领域服务 settlementMarketPayOrder 包装即可。

引入RabbitMQ分布式多端消费

引入 RabbitMQ 分布式技术框架,实现分布式消息消费和多服务消费的能力。

消息,是一种解耦服务间直接(http / rpc)调用的手段,以发送消息和接收消息的模式,完成业务流程的异步化处理。

1. 业务流程

在互联网公司中,往往一个微服务发送出来的 MQ,除了自己接收消费处理自己的业务流程,也会有很多其他微服务进行消费。那么这里就会有一个 Topic,被多个应用消费的配置。如图:

以拼团发送结算完成消息举例,拼团是负载均衡部署了2套服务,发送的 MQ 消息,由小型支付对接。

那么,负载均衡的拼团服务,发送 MQ 后,自己的2套微服务,会分别接收到消息。另外一套小型支付,假设只部署了一套,那么这里会消费5个 MQ 消息。

注意,以 RabbitMQ 举例,这里会需要使用到同一套交换机,同一个路由 Key,但队列要分别每个服务配置不同的。同时消息支持持久化,也就是拼团发送的 MQ 消息,即使小型支付服务暂时没有启动,也可以在启动后消费队列里的 MQ 消息。

这里的 team_success 就是 Topic 主题

2. 编码实现

2.1 引入 POM 文件

123456 org.springframework.boot spring-boot-starter-amqp 3.2.0

要在使用 MQ 的 module 模块下,增加 rabbitmq 配置。这样才能使用对应的方法。group-buy-market-infrastructure、group-buy-market-trigger、group-buy-market-app 都要引入。

2.2 生产者 - 拼团

group-buy-market 拼团营销项目:

12345678910111213141516171819202122232425# 数据库配置;启动时配置数据库资源信息spring: rabbitmq: addresses: 192.168.1.109 port: 5672 username: admin password: admin listener: simple: prefetch: 1 # 每次投递n个消息,消费完在投递n个 template: delivery-mode: persistent # 确保全局默认设置为持久化(可选) # 消息配置 config: # 生产者 producer: # 绑定交换机,统一一套交换机 exchange: group_buy_market_exchange # 消息主题配置;路由key、队列 topic_team_success: # 消息主题 routing_key: topic.team_success # 消费队列 queue: group_buy_market_queue_2_topic_team_success

首先,spring.rabbitmq 下的 config 为自定义配置,我们在这里配置 MQ 消息生产者信息。

之后,一整套服务,可以使用一个交换机。group_buy_market_exchange

然后,一个消息使用一个队列。topic_team_success 是 MQ 消息,下面配置了队列和路由key。

2.3 消费者 - 支付

market小型支付项目 :

1234567891011121314151617181920212223spring: # RabbitMQ rabbitmq: addresses: 192.168.1.109 port: 5672 username: admin password: admin listener: simple: prefetch: 1 # 每次投递n个消息,消费完在投递n个 template: delivery-mode: persistent # 确保全局默认设置为持久化(可选) # 消息配置 config: consumer: # 消费 topic 主题,team_success topic_team_success: # 绑定交换机 - 消息提供者的交换机 exchange: group_buy_market_exchange # 消息主题 routing_key: topic.team_success # 消费队列 - 每个系统有自己的消费队列 queue: s_pay_mall_queue_2_topic_team_success

首先,spring.rabbitmq 下的 config 为自定义配置,我们在这里配置 MQ 消息消费者信息。

之后,consumer 下,topic_team_success 这个消息,要指定一套交换机,一套消息主题的路由 key,但要使用自己的队列。这个队列就像水桶,分别装 MQ 消息。

2.4 代码配置

生产者 - 拼团:

配置 EventPublisher 发送消息的工具类。

配置 TeamSuccessTopicListener 监听消息的工具类。

消费者 - 支付:

配置 TeamSuccessTopicListener 监听消息的工具类。

发送MQ结算消息

增加拼团结算完成 MQ 触达方式,HTTP、MQ 触达,由调用方通过入参类型决定。

MQ 一般用在企业内的微服务系统间通信,因为企业内的微服务,共用了一套的 MQ 注册中心,MQ 可以更加高效的触达和分布式部署。而对于企业外的调用,与我们完全不是一个公司的系统,那么不再同一个微服务环境内,则需要通过 HTTP 方式这样标准的协议调用。如:支付宝支付完成回调、微信公众号发送消息后的回调,都是基于 HTTP 的方式实现。

1. 业务流程

如图,HTTP、MQ,由调用方配置使用那种方式进行处理。

用户创建营销锁单时,选择MQ、HTTP 回调方式。这个类型会被写入到对应的拼团订单记录里。

拼团完成结算后,在根据写入到拼团订单的记录,回调的方式,来回调通知结算。

2. 编码实现

2.1 库表修改

拼团组队订单 group_buy_order:

拼团回调任务 notify_task:

2.2 锁单链路修改

2.2.1 研发设计流程

MarketTradeController 的 lockMarketPayOrder 营销锁单方法入口,LockMarketPayOrderRequestDTO 入参对象增加 NotifyConfigVO 回调配置。也就是让调用方自己设置入参的回调类型。在 LockMarketPayOrderRequestDTO 类中会内聚一些方法,方便参数设置。

ITradeLockOrderService 的 lockMarketPayOrder 领域方法,PayDiscountEntity 也需要添加 NotifyConfigVO 对象。只不过这个对象要放在 trade 领域下的值对象里。

数据库操作:GroupBuyOrder 添加回调类型,group_buy_order_mapper.xml 映射回调字段以及插入、查询时,都要增加 notify_type 字段。

TradeRepository 的 lockMarketPayOrder 锁单仓储存储时,设置字段值 .notifyType(notifyConfigVO.getNotifyType().getCode()) 写入到库里 groupBuyOrderDao.insert(groupBuyOrder); 这样用户在锁单的时候,就把要回调的类型写入进来了。

2.2.2 核心代码

锁单入参对象:

1234567891011121314151617181920212223242526272829303132333435363738package cn.bugstack.trigger.http;@RequestMapping(value = "lock_market_pay_order", method = RequestMethod.POST)@Overridepublic Response lockMarketPayOrder(@RequestBody LockMarketPayOrderRequestDTO requestDTO) {//... 省略部分代码marketPayOrderEntity = tradeOrderService.lockMarketPayOrder( UserEntity.builder().userId(userId).build(), PayActivityEntity.builder() .teamId(teamId) .activityId(activityId) .activityName(groupBuyActivityDiscountVO.getActivityName()) .startTime(groupBuyActivityDiscountVO.getStartTime()) .endTime(groupBuyActivityDiscountVO.getEndTime()) .validTime(groupBuyActivityDiscountVO.getValidTime()) .targetCount(groupBuyActivityDiscountVO.getTarget()) .build(), PayDiscountEntity.builder() .source(source) .channel(channel) .goodsId(goodsId) .goodsName(trialBalanceEntity.getGoodsName()) .originalPrice(trialBalanceEntity.getOriginalPrice()) .deductionPrice(trialBalanceEntity.getDeductionPrice()) .payPrice(trialBalanceEntity.getPayPrice()) .outTradeNo(outTradeNo) .notifyConfigVO( // 构建回调通知对象 NotifyConfigVO.builder() .notifyType(NotifyTypeEnumVO.valueOf(notifyConfigVO.getNotifyType())) .notifyMQ(notifyConfigVO.getNotifyMQ()) .notifyUrl(notifyConfigVO.getNotifyUrl()) .build()) .build()); // ... 省略部分代码 }

注意入参值的构建 NotifyConfigVO,透传到数据库操作。

写入库表:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899package cn.bugstack.infrastructure.adapter.repository;@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { // 省略部分代码 @Transactional(timeout = 500) @Override public MarketPayOrderEntity lockMarketPayOrder(GroupBuyOrderAggregate groupBuyOrderAggregate) { // 聚合对象信息 UserEntity userEntity = groupBuyOrderAggregate.getUserEntity(); PayActivityEntity payActivityEntity = groupBuyOrderAggregate.getPayActivityEntity(); PayDiscountEntity payDiscountEntity = groupBuyOrderAggregate.getPayDiscountEntity(); // 取出notifyConfigVO对象 NotifyConfigVO notifyConfigVO = payDiscountEntity.getNotifyConfigVO(); Integer userTakeOrderCount = groupBuyOrderAggregate.getUserTakeOrderCount(); // 判断是否有团 - teamId 为空 - 新团、为不空 - 老团 String teamId = payActivityEntity.getTeamId(); if (StringUtils.isBlank(teamId)) { // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID teamId = RandomStringUtils.randomNumeric(8); // 日期处理 Date currentDate = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(currentDate); calendar.add(Calendar.MINUTE, payActivityEntity.getValidTime()); // 构建拼团订单 GroupBuyOrder groupBuyOrder = GroupBuyOrder.builder() .teamId(teamId) .activityId(payActivityEntity.getActivityId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .targetCount(payActivityEntity.getTargetCount()) .completeCount(0) .lockCount(1) .validStartTime(currentDate) .validEndTime(calendar.getTime()) .notifyType(notifyConfigVO.getNotifyType().getCode()) .notifyUrl(notifyConfigVO.getNotifyUrl()) .build(); // 写入记录 groupBuyOrderDao.insert(groupBuyOrder); } else { // 更新记录 - 如果更新记录不等于1,则表示拼团已满,抛出异常 int updateAddTargetCount = groupBuyOrderDao.updateAddLockCount(teamId); if (1 != updateAddTargetCount) { throw new AppException(ResponseCode.E0005); } } // 使用 RandomStringUtils.randomNumeric 替代公司里使用的雪花算法UUID String orderId = RandomStringUtils.randomNumeric(12); GroupBuyOrderList groupBuyOrderListReq = GroupBuyOrderList.builder() .userId(userEntity.getUserId()) .teamId(teamId) .orderId(orderId) .activityId(payActivityEntity.getActivityId()) .startTime(payActivityEntity.getStartTime()) .endTime(payActivityEntity.getEndTime()) .goodsId(payDiscountEntity.getGoodsId()) .source(payDiscountEntity.getSource()) .channel(payDiscountEntity.getChannel()) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .status(TradeOrderStatusEnumVO.CREATE.getCode()) .outTradeNo(payDiscountEntity.getOutTradeNo()) // 构建 bizId 唯一值;活动id_用户id_参与次数累加 .bizId(payActivityEntity.getActivityId() + Constants.UNDERLINE + userEntity.getUserId() + Constants.UNDERLINE + (userTakeOrderCount + 1)) .build(); try { // 写入拼团记录 groupBuyOrderListDao.insert(groupBuyOrderListReq); } catch (DuplicateKeyException e) { throw new AppException(ResponseCode.INDEX_EXCEPTION); } return MarketPayOrderEntity.builder() .orderId(orderId) .originalPrice(payDiscountEntity.getOriginalPrice()) .deductionPrice(payDiscountEntity.getDeductionPrice()) .payPrice(payDiscountEntity.getPayPrice()) .tradeOrderStatusEnumVO(TradeOrderStatusEnumVO.CREATE) .build(); } }

.notifyType(notifyConfigVO.getNotifyType().getCode()) 注意,写入拼团组队订单透传回调类型。

2.3 结算链路

2.3.1 研发设计流程

拼团结算的过程,会先过滤 filter,在 SettableRuleFilter 过滤器中,执行拼团对象查询时候,在仓储层组装出拼团回调配置数据。

TradeRepository 的 settlementMarketPayOrder 结算入库数据,按照不同的回调类型,写入进 notify_task 表中。

根据拼团交易结算是否完成,使用多线程异步执行发送 MQ 或者 HTTP 回调。这部分异步操作就好,即时触达。即时失败了,也会有任务兜底操作。

TradePort 执行任务分为 HTTP、MQ,来触达回调。

2.3.2 核心代码

拼团查询:

12345678910111213141516171819202122232425262728package cn.bugstack.infrastructure.adapter.repository;@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Override public GroupBuyTeamEntity queryGroupBuyTeamByTeamId(String teamId) { GroupBuyOrder groupBuyOrder = groupBuyOrderDao.queryGroupBuyTeamByTeamId(teamId); return GroupBuyTeamEntity.builder() .teamId(groupBuyOrder.getTeamId()) .activityId(groupBuyOrder.getActivityId()) .targetCount(groupBuyOrder.getTargetCount()) .completeCount(groupBuyOrder.getCompleteCount()) .lockCount(groupBuyOrder.getLockCount()) .status(GroupBuyOrderEnumVO.valueOf(groupBuyOrder.getStatus())) .validStartTime(groupBuyOrder.getValidStartTime()) .validEndTime(groupBuyOrder.getValidEndTime()) .notifyConfigVO(NotifyConfigVO.builder() .notifyType(NotifyTypeEnumVO.valueOf(groupBuyOrder.getNotifyType())) .notifyUrl(groupBuyOrder.getNotifyUrl()) // MQ 是固定的 .notifyMQ(topic_team_success) .build()) .build(); } }

查询出拼团数据,这里要返回拼团的回调类型。

回调 MQ 是固定的,不需要外部透彻。

回调任务:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475package cn.bugstack.infrastructure.adapter.repository;@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { // ... 省略部分代码 @Transactional(timeout = 500) @Override public NotifyTaskEntity settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate) { UserEntity userEntity = groupBuyTeamSettlementAggregate.getUserEntity(); GroupBuyTeamEntity groupBuyTeamEntity = groupBuyTeamSettlementAggregate.getGroupBuyTeamEntity(); NotifyConfigVO notifyConfigVO = groupBuyTeamEntity.getNotifyConfigVO(); TradePaySuccessEntity tradePaySuccessEntity = groupBuyTeamSettlementAggregate.getTradePaySuccessEntity(); // 1. 更新拼团订单明细状态 GroupBuyOrderList groupBuyOrderListReq = new GroupBuyOrderList(); groupBuyOrderListReq.setUserId(userEntity.getUserId()); groupBuyOrderListReq.setOutTradeNo(tradePaySuccessEntity.getOutTradeNo()); groupBuyOrderListReq.setOutTradeTime(tradePaySuccessEntity.getOutTradeTime()); int updateOrderListStatusCount = groupBuyOrderListDao.updateOrderStatus2COMPLETE(groupBuyOrderListReq); if (1 != updateOrderListStatusCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 2. 更新拼团达成数量 int updateAddCount = groupBuyOrderDao.updateAddCompleteCount(groupBuyTeamEntity.getTeamId()); if (1 != updateAddCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 3. 更新拼团完成状态 if (groupBuyTeamEntity.getTargetCount() - groupBuyTeamEntity.getCompleteCount() == 1) { int updateOrderStatusCount = groupBuyOrderDao.updateOrderStatus2COMPLETE(groupBuyTeamEntity.getTeamId()); if (1 != updateOrderStatusCount) { throw new AppException(ResponseCode.UPDATE_ZERO); } // 查询拼团交易完成外部单号列表 List outTradeNoList = groupBuyOrderListDao.queryGroupBuyCompleteOrderOutTradeNoListByTeamId(groupBuyTeamEntity.getTeamId()); // 拼团完成写入回调任务记录 NotifyTask notifyTask = new NotifyTask(); notifyTask.setActivityId(groupBuyTeamEntity.getActivityId()); notifyTask.setTeamId(groupBuyTeamEntity.getTeamId()); notifyTask.setNotifyType(notifyConfigVO.getNotifyType().getCode()); notifyTask.setNotifyMQ(NotifyTypeEnumVO.MQ.equals(notifyConfigVO.getNotifyType()) ? notifyConfigVO.getNotifyMQ() : null); notifyTask.setNotifyUrl(NotifyTypeEnumVO.HTTP.equals(notifyConfigVO.getNotifyType()) ? notifyConfigVO.getNotifyUrl() : null); notifyTask.setNotifyCount(0); notifyTask.setNotifyStatus(0); notifyTask.setParameterJson(JSON.toJSONString(new HashMap() {{ put("teamId", groupBuyTeamEntity.getTeamId()); put("outTradeNoList", outTradeNoList); }})); notifyTaskDao.insert(notifyTask); return NotifyTaskEntity.builder() .teamId(notifyTask.getTeamId()) .notifyType(notifyTask.getNotifyType()) .notifyMQ(notifyTask.getNotifyMQ()) .notifyUrl(notifyTask.getNotifyUrl()) .notifyCount(notifyTask.getNotifyCount()) .parameterJson(notifyTask.getParameterJson()) .build(); } return null; } // 省略部分代码}

这部分主要就是往 notifyTaskDao.insert(notifyTask); 写入数据的时候,要判断好写入的回调类型。因为任务还要被执行选择不同的类型来发 MQ 或者调用 HTTP 接口。

异步任务:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071package cn.bugstack.domain.trade.service.settlement;@Slf4j@Servicepublic class TradeSettlementOrderService implements ITradeSettlementOrderService { @Override public TradePaySettlementEntity settlementMarketPayOrder(TradePaySuccessEntity tradePaySuccessEntity) throws Exception { log.info("拼团交易-支付订单结算:{} outTradeNo:{}", tradePaySuccessEntity.getUserId(), tradePaySuccessEntity.getOutTradeNo()); // 1. 结算规则过滤 TradeSettlementRuleFilterBackEntity tradeSettlementRuleFilterBackEntity = tradeSettlementRuleFilter.apply( TradeSettlementRuleCommandEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .outTradeTime(tradePaySuccessEntity.getOutTradeTime()) .build(), new TradeSettlementRuleFilterFactory.DynamicContext()); String teamId = tradeSettlementRuleFilterBackEntity.getTeamId(); // 2. 查询组团信息 GroupBuyTeamEntity groupBuyTeamEntity = GroupBuyTeamEntity.builder() .teamId(tradeSettlementRuleFilterBackEntity.getTeamId()) .activityId(tradeSettlementRuleFilterBackEntity.getActivityId()) .targetCount(tradeSettlementRuleFilterBackEntity.getTargetCount()) .completeCount(tradeSettlementRuleFilterBackEntity.getCompleteCount()) .lockCount(tradeSettlementRuleFilterBackEntity.getLockCount()) .status(tradeSettlementRuleFilterBackEntity.getStatus()) .validStartTime(tradeSettlementRuleFilterBackEntity.getValidStartTime()) .validEndTime(tradeSettlementRuleFilterBackEntity.getValidEndTime()) .notifyConfigVO(tradeSettlementRuleFilterBackEntity.getNotifyConfigVO()) .build(); // 3. 构建聚合对象 GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate = GroupBuyTeamSettlementAggregate.builder() .userEntity(UserEntity.builder().userId(tradePaySuccessEntity.getUserId()).build()) .groupBuyTeamEntity(groupBuyTeamEntity) .tradePaySuccessEntity(tradePaySuccessEntity) .build(); // 4. 拼团交易结算 NotifyTaskEntity notifyTaskEntity = repository.settlementMarketPayOrder(groupBuyTeamSettlementAggregate); // 5. 组队回调处理 - 处理失败也会有定时任务补偿,通过这样的方式,可以减轻任务调度,提高时效性 if (null != notifyTaskEntity) { threadPoolExecutor.execute(() -> { Map notifyResultMap = null; try { notifyResultMap = execSettlementNotifyJob(notifyTaskEntity); log.info("回调通知拼团完结 result:{}", JSON.toJSONString(notifyResultMap)); } catch (Exception e) { log.error("回调通知拼团完结失败 result:{}", JSON.toJSONString(notifyResultMap), e); throw new AppException(e.getMessage()); } }); } // 6. 返回结算信息 - 公司中开发这样的流程时候,会根据外部需要进行值的设置 return TradePaySettlementEntity.builder() .source(tradePaySuccessEntity.getSource()) .channel(tradePaySuccessEntity.getChannel()) .userId(tradePaySuccessEntity.getUserId()) .teamId(teamId) .activityId(groupBuyTeamEntity.getActivityId()) .outTradeNo(tradePaySuccessEntity.getOutTradeNo()) .build(); }}

引入多线程,异步执行结算任务。

触达回调:

123456789101112131415161718192021222324252627282930313233343536373839404142434445@Servicepublic class TradePort implements ITradePort { @Resource private GroupBuyNotifyService groupBuyNotifyService; @Resource private IRedisService redisService; @Resource private EventPublisher publisher; @Override public String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception { RLock lock = redisService.getLock(notifyTask.lockKey()); try { // group-buy-market 拼团服务端会被部署到多台应用服务器上,那么就会有很多任务一起执行。这个时候要进行抢占,避免被多次执行 if (lock.tryLock(3, 0, TimeUnit.SECONDS)) { try { // 回调方式 HTTP if (NotifyTypeEnumVO.HTTP.getCode().equals(notifyTask.getNotifyType())) { // 无效的 notifyUrl 则直接返回成功 if (StringUtils.isBlank(notifyTask.getNotifyUrl()) || "暂无".equals(notifyTask.getNotifyUrl())) { return NotifyTaskHTTPEnumVO.SUCCESS.getCode(); } return groupBuyNotifyService.groupBuyNotify(notifyTask.getNotifyUrl(), notifyTask.getParameterJson()); } // 回调方式 MQ if (NotifyTypeEnumVO.MQ.getCode().equals(notifyTask.getNotifyType())) { publisher.publish(notifyTask.getNotifyMQ(), notifyTask.getParameterJson()); return NotifyTaskHTTPEnumVO.SUCCESS.getCode(); } } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } } return NotifyTaskHTTPEnumVO.NULL.getCode(); } catch (Exception e) { Thread.currentThread().interrupt(); return NotifyTaskHTTPEnumVO.NULL.getCode(); } }}

这部分会根据 HTTP、MQ,分别以不同的方式进行触达。如果还有其他回调方式,可以考虑使用策略模式封装。

消费MQ结算消息

完善支付商城对拼团组队结算消息的处理,同时完成小型支付商城中支付交易结算消息的处理。这样我们整个系统就都具备分布式部署的能力了。也就是多个应用实例同时部署,一个MQ在一个应用实例消费宕机,可以被其他应用实例继续拉取消费。

1. 业务流程

如图,MQ 在小型支付和拼团的执行流程。

MQ 具有解耦、消峰,最终一致性的特性。所以很多的分布式设计中,都会引入 MQ 来解耦复杂的业务流程,除了数据库事务处理外的流程节点,则由 MQ 进行驱动。

从拼团下单到锁单结算,完成后触达 MQ 消费。之后由支付商城消费 MQ 结算消息,变更订单状态,之后触达下一个支付结算的动作。在接收支付结算完成模拟发货。第二个 MQ 的发送不用写数据库任务来补偿,如果发 MQ 失败了,就直接抛异常重试继续发就可以。

2. 编码实现

2.1 RabbitMQ 后台

http://192.168.1.109:15672/#/queues - 修改为你的地址进入。

这里的交换机(Exchanges)、队列会随着系统启动使用创建。你也可以手动创建。

2.2 消费结算消息

123456789101112131415161718192021222324252627282930@Slf4j@Componentpublic class TeamSuccessTopicListener { @Resource private IOrderService orderService; /** * 指定消费队列 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.config.consumer.topic_team_success.queue}"), exchange = @Exchange(value = "${spring.rabbitmq.config.consumer.topic_team_success.exchange}", type = ExchangeTypes.TOPIC), key = "${spring.rabbitmq.config.consumer.topic_team_success.routing_key}" ) ) public void listener(String message) { try { NotifyRequestDTO requestDTO = JSON.parseObject(message, NotifyRequestDTO.class); log.info("拼团回调,组队完成,结算开始 {}", JSON.toJSONString(requestDTO)); // 营销结算 orderService.changeOrderMarketSettlement(requestDTO.getOutTradeNoList()); } catch (Exception e) { log.error("拼团回调,组队完成,结算失败 {}", message, e); throw e; } }}

在 listener 监听方法内,调用营销结算。这个营销结算其实就是以前 http 回调里的操作。现在迁移过来。

2.3 增加支付结算MQ

这一部分,我们要在小型支付系统自己发送支付完成结算的 MQ,再消费这个 MQ。通过这个方式替换掉原来依赖于 Guava 的发布订阅模型。因为 Guava 没法让不同实例间负载消费(也就是A1应用发送的消息,A2应用不能消费。而分布式架构技术栈 MQ 是可以的)。

生产者,topic_order_pay_success,是订单结算消息。分别在使用拼团和没有使用拼团结算发送消息。

消费者,topic_order_pay_success,消费触达结算动作。另外一个拼团组队成功的消息则是前面已经对接的。

2.4 发送,支付成功结算消息

添加事件消息发送方法:

123456789101112131415161718192021222324@Slf4j@Componentpublic class EventPublisher { @Autowired private RabbitTemplate rabbitTemplate; @Value("${spring.rabbitmq.config.producer.topic_order_pay_success.exchange}") private String exchangeName; public void publish(String routingKey, String message) { try { rabbitTemplate.convertAndSend(exchangeName, routingKey, message, m -> { // 持久化消息配置 m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return m; }); } catch (Exception e) { log.error("发送MQ消息失败 routingKey:{} message:{}", routingKey, message, e); throw e; } }}

这个类放到 s-pay-mall-ddd-infrastructure 下的 event 中。

发送MQ消息:

1234567891011121314151617181920212223242526272829303132333435@Overridepublic void changeOrderPaySuccess(String orderId, Date payTime) { PayOrder payOrderReq = new PayOrder(); payOrderReq.setOrderId(orderId); payOrderReq.setStatus(OrderStatusVO.PAY_SUCCESS.getCode()); payOrderReq.setPayTime(payTime); orderDao.changeOrderPaySuccess(payOrderReq); // 不走拼团营销的直接结算发货 BaseEvent.EventMessage paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage( PaySuccessMessageEvent.PaySuccessMessage.builder() .tradeNo(orderId) .build()); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = paySuccessMessageEventMessage.getData(); // 旧版发送消息方式 // eventBus.post(JSON.toJSONString(paySuccessMessage)); eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage));}@Overridepublic void changeOrderMarketSettlement(List outTradeNoList) { // 更新拼团结算状态 orderDao.changeOrderMarketSettlement(outTradeNoList); // 循环成功发送消息 - 一般在公司的场景里,还会有job任务扫描超时没有结算的订单,查询订单状态。查询对方服务端的接口,会被限制一次查询多少,频次多少。 outTradeNoList.forEach(outTradeNo -> { BaseEvent.EventMessage paySuccessMessageEventMessage = paySuccessMessageEvent.buildEventMessage( PaySuccessMessageEvent.PaySuccessMessage.builder() .tradeNo(outTradeNo) .build()); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = paySuccessMessageEventMessage.getData(); // 旧版发送消息方式 // eventBus.post(JSON.toJSONString(paySuccessMessage)); eventPublisher.publish(paySuccessMessageEvent.topic(), JSON.toJSONString(paySuccessMessage)); });}

两处发送 MQ 的操作。

消费,支付成功结算消息:

1234567891011121314151617181920212223242526272829303132333435@Slf4j@Componentpublic class OrderPaySuccessListener { @Resource private IGoodsService goodsService; // @Subscribe - 旧版发布订阅方式 @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.queue}"), exchange = @Exchange(value = "${spring.rabbitmq.config.consumer.topic_order_pay_success.exchange}", type = ExchangeTypes.TOPIC), key = "${spring.rabbitmq.config.consumer.topic_order_pay_success.routing_key}" ) ) public void listener(String paySuccessMessageJson) { try { log.info("收到支付成功消息 {}", paySuccessMessageJson); PaySuccessMessageEvent.PaySuccessMessage paySuccessMessage = JSON.parseObject(paySuccessMessageJson, PaySuccessMessageEvent.PaySuccessMessage.class); log.info("模拟发货(如;发货、充值、开户员、返利),单号:{}", paySuccessMessage.getTradeNo()); // 变更订单状态 - 发货完成&结算 goodsService.changeOrderDealDone(paySuccessMessage.getTradeNo()); // 可以打开测试,MQ 消费失败,会抛异常,之后重试消费。这个也是最终执行的重要手段。 // throw new RuntimeException("重试消费"); } catch (Exception e) { log.error("收到支付成功消息失败 {}", paySuccessMessageJson,e); throw e; } }}

2.5 锁单API对接

拼团的锁单增加了使用 MQ、HTTP 不同方式进行处理。所以这部分要处理下对接的接口,增加 MQ 的配置。

2.5.1 配置请求对象

需要在原有的 LockMarketPayOrderRequestDTO 请求对象,新增加回调配置。

这个配置类也就是拼团 API 模块下的配置类。

2.5.2 使用请求对象

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758@Slf4j@Componentpublic class ProductPort implements IProductPort { @Value("${app.config.group-buy-market.source}") private String source; @Value("${app.config.group-buy-market.chanel}") private String chanel; @Value("${app.config.group-buy-market.notify-url}") private String notifyUrl; private final ProductRPC productRPC; private final IGroupBuyMarketService groupBuyMarketService; @Override public MarketPayDiscountEntity lockMarketPayOrder(String userId, String teamId, Long activityId, String productId, String orderId) { // 请求参数 LockMarketPayOrderRequestDTO requestDTO = new LockMarketPayOrderRequestDTO(); requestDTO.setUserId(userId); requestDTO.setTeamId(teamId); requestDTO.setGoodsId(productId); requestDTO.setActivityId(activityId); requestDTO.setSource(source); requestDTO.setChannel(chanel); requestDTO.setOutTradeNo(orderId);// requestDTO.setNotifyUrl(notifyUrl); requestDTO.setNotifyMQ(); try { // 营销锁单 Call> call = groupBuyMarketService.lockMarketPayOrder(requestDTO); // 获取结果 Response response = call.execute().body(); log.info("营销锁单{} requestDTO:{} responseDTO:{}", userId, JSON.toJSONString(requestDTO), JSON.toJSONString(response)); if (null == response) return null; // 异常判断 if (!"0000".equals(response.getCode())) { throw new AppException(response.getCode(), response.getInfo()); } LockMarketPayOrderResponseDTO responseDTO = response.getData(); // 获取拼团优惠 return MarketPayDiscountEntity.builder() .originalPrice(responseDTO.getOriginalPrice()) .deductionPrice(responseDTO.getDeductionPrice()) .payPrice(responseDTO.getPayPrice()) .build(); } catch (Exception e) { log.error("营销锁单失败{}", userId, e); return null; } }}

requestDTO.setNotifyMQ(); 增加使用 MQ 方式进行对接。

独占锁和无锁化场景运用

以独占锁抢占方式,迭代拼团结算通知互备执行任务。再以无锁化设计,处理用户拼团锁单,库存抢占处理,降低对数据库的行锁压力,提高整体吞吐量。

1. 业务流程

如图,两种锁应对的场景:

分段锁,颗粒度缩小到库存维度。先加(incr)后锁的操作,是一种无锁化设计。锁的目的只是作为兜底。这类似于我们操作账户,操作完写一条流水。incr 操作是原子的,基本不会产生一样的值。但在实际生产中,遇到过集群的运维配置问题,以及业务运营配置数据问题,导致 incr 得到的值相同。

独占锁,在分布式架构系统设计中,会有多个实例部署。这些实例都会做 job 任务的执行,为了保障既能让任务互备,同时不要重复执行。这里要加独占锁,谁抢占到谁执行。执行完成后,释放锁,下一轮继续抢占。

2. 编码实现

2.1 独占锁

12345678910111213141516171819202122232425262728293031@Slf4j@Servicepublic class GroupBuyNotifyJob { @Resource private ITradeSettlementOrderService tradeSettlementOrderService; @Resource private RedissonClient redissonClient; @Scheduled(cron = "0 0 0 * * ?") public void exec() { // 为什么加锁?分布式应用N台机器部署互备(一个应用实例挂了,还有另外可用的),任务调度会有N个同时执行,那么这里需要增加抢占机制,谁抢占到谁就执行。完毕后,下一轮继续抢占。 RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec"); try { boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS); if (!isLocked) return; Map result = tradeSettlementOrderService.execSettlementNotifyJob(); log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result)); } catch (Exception e) { log.error("定时任务,回调通知拼团完结任务失败", e); } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } }}

在执行任务时,要抢占一个锁,谁抢占到谁就执行。

这种独占锁的设计,也是大家最为常用的。但有一些场景是不适合使用的,比如:集中式库存的抢占,这类场景如果使用独占锁,就会出现排队现象,所有的竞争用户都要等待上一个释放锁才能继续执行。这样会大大的降低系统的吞吐量。

2.2 无锁化(乐观锁)

2.2.1 更改部分

在 trade 交易下,锁单 filter 过滤中,添加一个组队库存占用规则过滤。再有了设计的使用后,你会发现,想扩展新的功能既不会污染织染的逻辑,也不会让加入就得大量做 if···else 判断。而是非常优雅的加入,之后在 TradeLockRuleFilterFactory 配置上即可。

2.2.2 责任链 - 组队库存规则

12345678910111213141516171819202122232425262728293031323334353637383940414243package cn.bugstack.domain.trade.service.lock.filter;@Slf4j@Servicepublic class TeamStockOccupyRuleFilter implements ILogicHandler { @Resource private ITradeRepository repository; @Override public TradeLockRuleFilterBackEntity apply(TradeLockRuleCommandEntity requestParameter, TradeLockRuleFilterFactory.DynamicContext dynamicContext) throws Exception { log.info("交易规则过滤-组队库存校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId()); // 1. teamId 为空,则为首次开团,不做拼团组队目标量库存限制 String teamId = requestParameter.getTeamId(); if (StringUtils.isBlank(teamId)) { return TradeLockRuleFilterBackEntity.builder() .userTakeOrderCount(dynamicContext.getUserTakeOrderCount()) .build(); } // 2. 抢占库存;通过抢占 Redis 缓存库存,来降低对数据库的操作压力。 GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity(); Integer target = groupBuyActivity.getTarget(); Integer validTime = groupBuyActivity.getValidTime(); String teamStockKey = dynamicContext.generateTeamStockKey(teamId); String recoveryTeamStockKey = dynamicContext.generateRecoveryTeamStockKey(teamId); boolean status = repository.occupyTeamStock(teamStockKey, recoveryTeamStockKey, target, validTime); if (!status) { log.warn("交易规则过滤-组队库存校验{} activityId:{} 抢占失败:{}", requestParameter.getUserId(), requestParameter.getActivityId(), teamStockKey); throw new AppException(ResponseCode.E0008); } return TradeLockRuleFilterBackEntity.builder() .userTakeOrderCount(dynamicContext.getUserTakeOrderCount()) .recoveryTeamStockKey(recoveryTeamStockKey) .build(); }}

首先,如果 teamId 为空,则不需要做库存的抢占。抢占库存是在一个 team 已经创建完成后,再有用户开始参与抢占时候,这个时候要做库存的缓存扣减处理。

之后,从上下文中获取到组队的目标量,有效期时间,以及组队的key和恢复量key。这个恢复量的用途是我们扣了 redis 中组队的任务缓存,但这个时候,发生异常了。那么我们要记录一个这样的数据。等做库存使用对比量的时候,可以用 target 目标量 + 恢复量一起来比。

2.2.3 占用库存

123456789101112131415161718192021222324252627282930313233@Slf4j@Repositorypublic class TradeRepository implements ITradeRepository { @Override public boolean occupyTeamStock(String teamStockKey, String recoveryTeamStockKey, Integer target, Integer validTime) { // 失败恢复量 Long recoveryCount = redisService.getAtomicLong(recoveryTeamStockKey); recoveryCount = null == recoveryCount ? 0 : recoveryCount; // 1. incr 得到值,与总量和恢复量做对比。恢复量为系统失败时候记录的量。 // 2. 从有组队量开始,相当于已经有了一个占用量,所以要 +1 long occupy = redisService.incr(teamStockKey) + 1; if (occupy >= target + recoveryCount) { redisService.setAtomicLong(teamStockKey, target); return false; } // 1. 给每个产生的值加锁为兜底设计,虽然incr操作是原子的,基本不会产生一样的值。但在实际生产中,遇到过集群的运维配置问题,以及业务运营配置数据问题,导致incr得到的值相同。 // 2. validTime + 60分钟,是一个延后时间的设计,让数据保留时间稍微长一些,便于排查问题。 String lockKey = teamStockKey + Constants.UNDERLINE + occupy; Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES); if (!lock) { log.info("组队库存加锁失败 {}", lockKey); } return lock; } }

occupyTeamStock 占用库存操作,先获取恢复量。之后和 target 目标量 + recoveryCount 恢复量做对比。

库存扣减后,添加一个锁,这个锁不会影响整体效率。只是一个兜底操作。如注释说明。

2.2.4 恢复记录

123456@Overridepublic void recoveryTeamStock(String recoveryTeamStockKey, Integer validTime) { // 首次组队拼团,是没有 teamId 的,所以不需要这个做处理。 if (StringUtils.isBlank(recoveryTeamStockKey)) return; redisService.incr(recoveryTeamStockKey);}

如果 recoveryTeamStockKey 为空,则表示没有组队信息,这个时候即使发生异常,也不用处理库存恢复记录。

之后做 incr 记录库存恢复量。

2.2.5 责任链配置

123456789101112131415161718192021@Slf4j@Servicepublic class TradeLockRuleFilterFactory { @Bean("tradeRuleFilter") public BusinessLinkedList tradeRuleFilter( ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter, TeamStockOccupyRuleFilter teamStockOccupyRuleFilter) { // 组装链 LinkArmory linkArmory = new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter, teamStockOccupyRuleFilter); // 链对象 return linkArmory.getLogicLink(); }}

我们实现的一个新的规则(teamStockOccupyRuleFilter),只要配置到工程中即可。

所以,即使你用AI编码,但不告诉它要在什么框架结构下编码,实现出什么编码。他可能给你的大概率也是CRUD。

函数式数据缓存和降级到DB处理

以查询活动配置为场景,增加缓存处理。同时使用降级服务,控制走缓存还是走 DB 数据库。并把这部分功能统一抽象成函数式编程。

1. 业务流程

如图,缓存和降级的使用:

首先,在日常的业务场景中,很多高频使用的数据,都是从 Redis 缓存获取。如果缓存不存在,才会从数据库读取。

之后,也会给缓存配置降级,如果缓存有问题,或者要做一些验证,必须从库里读取,则会动态的配置,让当时的操作从数据库获取。

注意,整个操作过程,缓存、降级、数据库,是一整条代码编程。如果在每个方法里都加这样的内容,就会显得很臃肿,所以一般会抽象一个方法,使用函数式的方式进行编程,降低使用者的编码量。这个技巧很重要

2. 编码实现

2.1 实现目标

首先,以 ActivityRepository 场景举例,两个 dao 操作从数据库查询,改为从 Redis 缓存获取。如果缓存不存在,则从数据库查询后写入缓存。

之后,要添加上降级逻辑的处理,如果此时不需要从缓存获取,则直接从数据库获取。

另外,要抽象操作缓存、降级的模块到共用的抽象类。

2.2 缓存处理

123456789101112131415@Servicepublic class DCCService { @DCCValue("cacheSwitch:0") private String cacheOpenSwitch; /** * 缓存开启开关,0为开启,1为关闭 */ public boolean isCacheOpenSwitch(){ return "0".equals(cacheOpenSwitch); }}

降级开关,缓存开启。

12345678910111213GroupBuyActivity groupBuyActivityRes = null;if (dccService.isCacheOpenSwitch()) { String groupBuyActivityCacheKey = GroupBuyActivity.cacheRedisKey(activityId); groupBuyActivityRes = redisService.getValue(groupBuyActivityCacheKey); if (null == groupBuyActivityRes) { groupBuyActivityRes = groupBuyActivityDao.queryValidGroupBuyActivityId(activityId); if (null == groupBuyActivityRes) return null; redisService.setValue(groupBuyActivityCacheKey, groupBuyActivityRes); }} else { groupBuyActivityRes = groupBuyActivityDao.queryValidGroupBuyActivityId(activityId);}

判断缓存降级开关。如果是降级的,则直接走DB查询操作。非降级,则走 Redis 缓存,缓存为空则从数据库查询后写入缓存。

这样的代码看着并没有什么问题,但如果大量的这样在程序中编写,就会变得很恶心,以后修改也麻烦。所以要做抽象的设计处理。

2.3 函数编程

12345678910111213141516@Testpublic void test_Supplier(){ // 创建一个 Supplier 实例,返回一个字符串 Supplier stringSupplier = () -> "Hello, XFG!"; // 使用 get() 方法获取 Supplier 提供的值 String result = stringSupplier.get(); // 输出结果 System.out.println(result); // 另一个示例,使用 Supplier 提供当前时间 Supplier currentTimeSupplier = System::currentTimeMillis; // 获取当前时间 Long currentTime = currentTimeSupplier.get(); // 输出当前时间 System.out.println("Current time in milliseconds: " + currentTime);}

这里要介绍一个函数 Supplier,它的作用是帮你延迟执行代码块。如:想调用一个 dao.query() 如果这样写那么就直接调用了,现在通过 Supplier.get 可以在你需要的时候调用。

好,那么这么一个东西有啥用途呢。如:我们使用缓存、查询DB、降级判断,这个代码如果被抽象出来,那么必然会有一个问题,就是所有使用方都有自己的 dao.query() 怎么抽象呢,那么就要让大家的 dao.query 作为参数使用 Supplier 传递到抽象类的方法里,方法里来执行操作。

2.4 抽象方法

123456789101112131415161718192021222324252627282930313233343536373839404142434445public abstract class AbstractRepository { private final Logger logger = LoggerFactory.getLogger(AbstractRepository.class); @Resource protected IRedisService redisService; @Resource protected DCCService dccService; /** * 通用缓存处理方法 * 优先从缓存获取,缓存不存在则从数据库获取并写入缓存 * * @param cacheKey 缓存键 * @param dbFallback 数据库查询函数 * @param 返回类型 * @return 查询结果 */ protected T getFromCacheOrDb(String cacheKey, Supplier dbFallback) { // 判断是否开启缓存 if (dccService.isCacheOpenSwitch()) { // 从缓存获取 T cacheResult = redisService.getValue(cacheKey); // 缓存存在则直接返回 if (null != cacheResult) { return cacheResult; } // 缓存不存在则从数据库获取 T dbResult = dbFallback.get(); // 数据库查询结果为空则直接返回 if (null == dbResult) { return null; } // 写入缓存 redisService.setValue(cacheKey, dbResult); return dbResult; } else { // 缓存未开启,直接从数据库获取 logger.warn("缓存降级 {}", cacheKey); return dbFallback.get(); } } }

首先,增加 AbstractRepository 抽象类,让所有需要使用缓存、降级一套判断的,都继承这个抽象类。

之后,提供 getFromCacheOrDb 方法,在这个方法中判断走降级、走缓存的处理。Supplier dbFallback 就可以帮助我们在需要的地方,完成 get 操作执行传入进来的代码块。

2.5 方法使用

1234567891011121314151617@Overridepublic GroupBuyActivityDiscountVO queryGroupBuyActivityDiscountVO(Long activityId) { // 优先从缓存获取&写缓存,注意如果实现了后台配置,在更新时要更库,删缓存。 String groupBuyActivityCacheKey = GroupBuyActivity.cacheRedisKey(activityId); GroupBuyActivity groupBuyActivityRes = getFromCacheOrDb(groupBuyActivityCacheKey, () -> groupBuyActivityDao.queryValidGroupBuyActivityId(activityId)); if (null == groupBuyActivityRes) return null; String discountId = groupBuyActivityRes.getDiscountId(); // 优先从缓存获取&写缓存 String groupBuyDiscountCacheKey = GroupBuyDiscount.cacheRedisKey(discountId); GroupBuyDiscount groupBuyDiscountRes = getFromCacheOrDb(groupBuyDiscountCacheKey, () -> groupBuyDiscountDao.queryGroupBuyActivityDiscountByDiscountId(discountId)); if (null == groupBuyDiscountRes) return null; // ... 省略部分代码}

现在我们使用缓存的方式则直接从 getFromCacheOrDb 抽象父类的方法获取。并把我们要执行的代码库函数的方式透传过去。