这两天在和一小伙伴研究解决RabbitMQ集群重启慢导致Consumer自动重连超时的问题,已经有了解决方案。接下来需要做个整理。由于同时涉及到springboot自动配置、springboot-amqp、spring-rabbit等诸多技术,先往后拖一下。
本文说什么呢?通过一个程序案例来聊聊程序里随处可见的interface。
先来个四连问:什么情况下定义interface?为什么要定义interface?定义interface是为了什么?你用对interface了吗?
接下来看这个案例吧。
程序里使用了RabbitMQ,下面 MQSender 是个interface,定义了生产者往mq放消息的两种方式:
package com.yft.rabbitmq.service; import com.yft.rabbitmq.constant.BindingEnum; /** * 延迟发送服务类 * * @author liuhongjie hongjie.liu@serviceshare.com * @date 2022年06月12日 */ public interface MQSender { /** * 发送消息 * * @param bindingEnum 声明binding的enum * @param msg 推送的消息 * @param delaySeconds 延迟的时间,秒 */ void sendDelayMsg(BindingEnum bindingEnum, Object msg, int delaySeconds); /** * 发送消息 * * @param bindingEnum 声明binding的enum * @param msg 推送的消息 */ void sendMsg(BindingEnum bindingEnum, Object msg); }
其中 BindingEnum 是个枚举,封装定义了exchange和queue及两者的binding关系
package com.yft.rabbitmq.constant; public enum BindingEnum { SYNC_REVIEW_RECORD("sync-review-record", "sync-review-record", "sync-review-record"), PAY_SETTLE("pay-settle","pay-settle","pay-settle"), PAY_SETTLE_QUERY_DELAY("pay-settle-query-delay", "pay-settle-query-delay", "pay-settle-query-delay"), ; String exchangeName; String queueName; String routingKey; private static final String EXCHANGE_NAME_PREFIX = "exchange.levy-platform."; private static final String QUEUE_NAME_PREFIX = "queue.levy-platform."; private static final String ROUTING_KEY_PREFIX = "bindingKey.levy-platform."; BindingEnum(String exchangeName, String queueName, String routingKey) { this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; } public String getExchangeName() { return EXCHANGE_NAME_PREFIX + exchangeName; } public String getQueueName() { return QUEUE_NAME_PREFIX + queueName; } public String getRoutingKey() { return ROUTING_KEY_PREFIX + routingKey; } }
View Code
MQSender只有一个实现类 DefaultMQSender 。我们同样贴出来它的代码
package com.yft.rabbitmq.service; import com.yft.rabbitmq.constant.BindingEnum; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.AmqpTemplate; /** * 延迟发送的默认实现 * * @author liuhongjie hongjie.liu@serviceshare.com * @date 2022年06月12日 */ @RequiredArgsConstructor public class DefaultMQSender implements MQSender { private final AmqpTemplate amqpTemplate; @Override public void sendDelayMsg(BindingEnum bindingEnum, Object msg, int delaySeconds) { amqpTemplate.convertAndSend(bindingEnum.getExchangeName(), bindingEnum.getRoutingKey(), msg, message -> { message.getMessageProperties().setDelay(delaySeconds * 1000); return message; }); } @Override public void sendMsg(BindingEnum bindingEnum, Object msg) { amqpTemplate.convertAndSend(bindingEnum.getExchangeName(), bindingEnum.getRoutingKey(), msg); } }
View Code
使用的话,见下面的 MQSenderConfig,它定义了相关的bean
package com.cn.yft.config; import com.yft.rabbitmq.service.DefaultMQSender; import com.yft.rabbitmq.service.MQSender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author wjx * @Date 2022/7/6 */ @Configuration public class MQSenderConfig { @Autowired private AmqpTemplate rabbitTemplate; @Bean public MQSender mqSender() { return new DefaultMQSender(rabbitTemplate); } }
View Code
案例介绍完毕。
那么,MQSender 这个interface的作用是什么?
当事人回答:作用是我能很方便的看到这个接口的能力。
当事人回答:如果以后不用RabbitMQ,新的消息中间件直接实现这个interface就行了。
如此几次对答后,几分钟后,小伙子开始觉得这个interface好像意义并不明显。
以这个程序实现场景来看,去掉这个interface是可以的,反而还会增强程序易读性。
那么,以这个场景来说,怎么定义一个合理的interface呢?
我画了下面的草图,图样图森破。可爱的小伙立即提出了新的疑惑,我当然明白他的疑惑。秉承我的风格,我并没有继续阐开,而是让这小伙后续琢磨琢磨。
好,在这里,我揭晓我的想法。
MQSender 摇身一变成:
package com.yft.rabbitmq.service; import com.yft.dto.MQMsgModel; /** * 延迟发送服务类 */ public interface MQSender { /** * 发送消息 * @param mqMsg mq消息对象 */ void sendMsg(MQMsgModel mqMsg); }
它的两个实现类:DefaultMQSender 是实时发送消息,DelayMQSender 是延迟发送消息
package com.yft.rabbitmq.service; /** * 即时发送消息的实现 */ @RequiredArgsConstructor public class DefaultMQSender implements MQSender { private final AmqpTemplate amqpTemplate; @Override public void sendMsg(MQMsgModel mqMsgModel) { BindingEnum bindingEnum = mqMsgModel.getBindingEnum(); amqpTemplate.convertAndSend(bindingEnum.getExchangeName(), bindingEnum.getRoutingKey(), mqMsgModel.getMsg()); } }
|
package com.yft.rabbitmq.service; /** * 延迟发送消息的实现 */ @RequiredArgsConstructor public class DelayMQSender implements MQSender { private final AmqpTemplate amqpTemplate; @Override public void sendMsg(MQMsgModel mqMsgModel) { BindingEnum bindingEnum = mqMsgModel.getBindingEnum(); amqpTemplate.convertAndSend(bindingEnum.getExchangeName(), bindingEnum.getRoutingKey(), mqMsgModel.getMsg(), message -> { message.getMessageProperties().setDelay(mqMsgModel.getDelaySeconds() * 1000); return message; }); } }
|
注意到多了一个 MQMsgModel, 好,我们来看这个 MQMsgModel,它是一个数据传输对象,定义了mq消息的属性
package com.yft.dto; import com.yft.rabbitmq.constant.BindingEnum; import lombok.Data; /** * mq消息对象 */ @Data public class MQMsgModel{ private BindingEnum bindingEnum; private Object msg; /** * 指定消息的延迟时间,单位:秒 →→→→(非延迟消息,不用指定) */ private Integer delaySeconds; }
使用的话, MQSenderConfig 定义两个bean就OK了
package com.cn.yft.config; @Configuration public class MQSenderConfig { @Autowired private AmqpTemplate rabbitTemplate; @Bean public MQSender mqSender() { return new DefaultMQSender(rabbitTemplate); } @Bean public MQSender delayMqSender() { return new DelayMQSender(rabbitTemplate); } }
(完毕)
再贴一下上面的四连问:什么情况下定义interface?为什么要定义interface?定义interface是为了什么?你用对interface了吗?
不知你是否有了一些答案?