一、概念
二、集成
- Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
- Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
1、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
2、自动配置类
在spring.factories
中的自动配置类是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
Spring Boot会自动加载配置类RocketMQAutoConfiguration
:
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {
...
}
由配置类可知,RocketMQ的配置是rocketmq
开头的,name-server
是必须要有的配置项。其他配置见RocketMQProperties
:
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer;
/**
* Enum type for accessChannel, values: LOCAL, CLOUD
*/
private String accessChannel;
private Producer producer;
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
* rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
*/
private Consumer consumer = new Consumer();
// ...
}
RocketMQAutoConfiguration
引入了几个Bean:
DefaultMQProducer
:默认的生产者;RocketMQTemplate
:生产者的模板方法类。
3、其他自动配置类
以上几个类,RocketMQ注册了生产者,但是没有消费者。再来看@Import
引入的其他几个配置类:
MessageConverterConfiguration
:序列化有关配置;ListenerContainerConfiguration
:注册消费者;ExtProducerResetConfiguration
:注册生产者;RocketMQTransactionConfiguration
:事务有关配置。
3.1 ListenerContainerConfiguration
首先了解一下RocketMQ在Spring Boot中消费者的写法,主要是两种:
- 实现
RocketMQListener
接口,在类上面用@RocketMQMessageListener
标记监听的topic; - 使用
DefaultMQPushConsumer
类,在代码里指定监听的接口和其他配置,并手动启动。
如果需要对消费者做一些设置,要实现RocketMQConsumerLifecycleListener
,并在prepareStart
中对DefaultMQPushConsumer
进行配置。
public interface RocketMQConsumerLifecycleListener<T> {
void prepareStart(final T consumer);
}
再来看ListenerContainerConfiguration
主要干了什么:
-
获取全部有
@RocketMQMessageListener
注解的类:public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); }
-
把上一步获取的类全部包装成
DefaultRocketMQListenerContainer
,注册到Spring,然后启动:private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } }
3.1.1 DefaultRocketMQListenerContainer
使用@RocketMQMessageListener
注解是更适合Spring的写法,Spring能根据注解中的内容把消费者包装成DefaultRocketMQListenerContainer
,并根据不同选项由策略模式自动配置:
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
// 这里
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
3.1.2 MessageListenerConcurrently
上面的CONCURRENTLY
消费模式用到了DefaultMessageListenerConcurrently
,查看它的接口:
public interface MessageListenerConcurrently extends MessageListener {
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> var1, ConsumeConcurrentlyContext var2);
}
说明可以批量消费数据。
3.2 ExtProducerResetConfiguration
RocketMQAutoConfiguration
自动引入了一个默认的生产者,除此之外,用户也可以自定义生产者:继承RocketMQTemplate
并使用注解@ExtRocketMQTemplateConfiguration
。
ExtProducerResetConfiguration
的作用和ListenerContainerConfiguration
差不多,就是把自定义的生产者都找到并包装为DefaultMQProducer
,交给Spring管理:
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerTemplate);
}
private void registerTemplate(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
// Set instanceName same as the beanName
mqProducer.setInstanceName(beanName);
try {
mqProducer.start();
} catch (MQClientException e) {
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
beanName), e);
}
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real producer to :{} {}", beanName, annotation.value());
}