5.1 创建支付订单
public Result createPayment(TradePay tradePay) {
//查询订单支付状态
try {
TradePayExample payExample = new TradePayExample();
TradePayExample.Criteria criteria = payExample.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int count = tradePayMapper.countByExample(payExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
long payId = idWorker.nextId();
tradePay.setPayId(payId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
tradePayMapper.insert(tradePay);
log.info("创建支付订单成功:" + payId);
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
5.2 支付回调
5.2.1 流程分析
5.2.2 代码实现
public Result callbackPayment(TradePay tradePay) {
if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {
tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());
if (tradePay == null) {
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);
//更新成功代表支付成功
if (i == 1) {
TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
mqProducerTemp.setGroupName("payProducerGroup");
mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
mqProducerTemp.setMsgTag(topic);
mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
mqProducerTemp.setCreateTime(new Date());
mqProducerTempMapper.insert(mqProducerTemp);
TradePay finalTradePay = tradePay;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic,
tag,
finalTradePay.getPayId(),
JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
线程池优化消息发送逻辑
创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
使用线程池
@Autowired
private ThreadPoolTaskExecutor executorService;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
5.2.3
处理消息
支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理
订单服务修改订单状态为已支付
日志服务记录支付日志
用户服务负责给用户增加积分
以下用订单服务为例说明消息的处理情况
配置RocketMQ属性值
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
消费消息
在订单服务中,配置公共的消息处理类
public class BaseConsumer {
public TradeOrder handleMessage(IOrderService
orderService,
MessageExt messageExt,Integer code) throws Exception {
//解析消息内容
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
//查询
TradeOrder order = orderService.findOne(orderMq.getOrderId());
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
}
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
}
orderService.changeOrderStatus(order);
return order;
}
}
接受订单支付成功消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
consumerGroup = "${mq.pay.consumer.group.name}")
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(MessageExt messageExt) {
try {
log.info("CancelOrderProcessor receive message:"+messageExt);
TradeOrder order = handleMessage(orderService,
messageExt,
ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
log.info("订单:["+order.getOrderId()+"]支付成功");
} catch (Exception e) {
e.printStackTrace();
log.error("订单支付失败");
}
}
}