Spring Boot集成RocketMQ


一、概念

rocketmq.png

二、集成

  • Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

1、引入依赖

引入rocketmq-spring

<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.0.4</version>
</dependency>

2、自动配置类

spring.factories中的自动配置类是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

Spring Boot会自动加载配置类RocketMQAutoConfiguration

@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {
	...
}

由配置类可知,RocketMQ的配置是rocketmq开头的,name-server是必须要有的配置项。其他配置见RocketMQProperties

@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     */
    private String nameServer;

    /**
     * Enum type for accessChannel, values: LOCAL, CLOUD
     */
    private String accessChannel;

    private Producer producer;

    /**
     * Configure enable listener or not.
     * In some particular cases, if you don't want the the listener is enabled when container startup,
     * the configuration pattern is like this :
     * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
     * <p>
     * the listener is enabled by default.
     */
    private Consumer consumer = new Consumer();
    
  // ...     
}

RocketMQAutoConfiguration引入了几个Bean:

  • DefaultMQProducer:默认的生产者;
  • RocketMQTemplate:生产者的模板方法类。

3、其他自动配置类

以上几个类,RocketMQ注册了生产者,但是没有消费者。再来看@Import引入的其他几个配置类:

  • MessageConverterConfiguration:序列化有关配置;
  • ListenerContainerConfiguration:注册消费者;
  • ExtProducerResetConfiguration:注册生产者;
  • RocketMQTransactionConfiguration:事务有关配置。

3.1 ListenerContainerConfiguration

首先了解一下RocketMQ在Spring Boot中消费者的写法,主要是两种:

  • 实现RocketMQListener接口,在类上面用@RocketMQMessageListener标记监听的topic;
  • 使用DefaultMQPushConsumer类,在代码里指定监听的接口和其他配置,并手动启动。

如果需要对消费者做一些设置,要实现RocketMQConsumerLifecycleListener,并在prepareStart中对DefaultMQPushConsumer进行配置。

public interface RocketMQConsumerLifecycleListener<T> {
    void prepareStart(final T consumer);
}

再来看ListenerContainerConfiguration主要干了什么:

  • 获取全部有@RocketMQMessageListener注解的类:

    public void afterSingletonsInstantiated() {
            Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
      
            beans.forEach(this::registerContainer);
        }
    
  • 把上一步获取的类全部包装成DefaultRocketMQListenerContainer,注册到Spring,然后启动:

    private void registerContainer(String beanName, Object bean) {
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
      
            if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
            }
      
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
            }
      
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
      
            String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
            String topic = this.environment.resolvePlaceholders(annotation.topic());
      
            boolean listenerEnabled =
                (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                    .getOrDefault(topic, true);
      
            if (!listenerEnabled) {
                    consumerGroup, topic);
                return;
            }
            validate(annotation);
      
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
      
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
            DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
            if (!container.isRunning()) {
                try {
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
        }
    

3.1.1 DefaultRocketMQListenerContainer

使用@RocketMQMessageListener注解是更适合Spring的写法,Spring能根据注解中的内容把消费者包装成DefaultRocketMQListenerContainer,并根据不同选项由策略模式自动配置:

        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }

        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                // 这里
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }

3.1.2 MessageListenerConcurrently

上面的CONCURRENTLY消费模式用到了DefaultMessageListenerConcurrently,查看它的接口:

public interface MessageListenerConcurrently extends MessageListener {
    ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> var1, ConsumeConcurrentlyContext var2);
}

说明可以批量消费数据。

3.2 ExtProducerResetConfiguration

RocketMQAutoConfiguration自动引入了一个默认的生产者,除此之外,用户也可以自定义生产者:继承RocketMQTemplate并使用注解@ExtRocketMQTemplateConfiguration

ExtProducerResetConfiguration的作用和ListenerContainerConfiguration差不多,就是把自定义的生产者都找到并包装为DefaultMQProducer,交给Spring管理:

public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class)
            .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerTemplate);
    }

    private void registerTemplate(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

        if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
        }

        ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
        validate(annotation, genericApplicationContext);

        DefaultMQProducer mqProducer = createProducer(annotation);
        // Set instanceName same as the beanName
        mqProducer.setInstanceName(beanName);
        try {
            mqProducer.start();
        } catch (MQClientException e) {
            throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
                beanName), e);
        }
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
        rocketMQTemplate.setProducer(mqProducer);
        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        log.info("Set real producer to :{} {}", beanName, annotation.value());
    }

 Toc
 Tags