|
|
@@ -1,7 +1,12 @@
|
|
|
package com.middle.platform.rabbitmq.config;
|
|
|
|
|
|
import com.middle.platform.common.core.constant.RabbitConstant;
|
|
|
+import org.springframework.amqp.core.Binding;
|
|
|
+import org.springframework.amqp.core.BindingBuilder;
|
|
|
+import org.springframework.amqp.core.Queue;
|
|
|
import org.springframework.amqp.core.TopicExchange;
|
|
|
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
|
|
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
|
@@ -20,4 +25,57 @@ public class RabbitConfig {
|
|
|
public TopicExchange dataExchange() {
|
|
|
return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_DATA, true, false);
|
|
|
}
|
|
|
+ @Bean
|
|
|
+ public SimpleRabbitListenerContainerFactory rabbitFactory(ConnectionFactory connectionFactory) {
|
|
|
+ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
|
|
+ factory.setConnectionFactory(connectionFactory);
|
|
|
+ //设置批量
|
|
|
+ factory.setBatchListener(true);
|
|
|
+ factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
|
|
|
+ factory.setBatchSize(500);//设置监听器一次批量处理的消息数量
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+ @Bean
|
|
|
+ public Queue rawQueue() {
|
|
|
+ return new Queue(RabbitConstant.RAW_QUEUE);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue dealQueue() {
|
|
|
+ return new Queue(RabbitConstant.DEAL_QUEUE);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue dbQueue() {
|
|
|
+ return new Queue(RabbitConstant.DB_QUEUE);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding dbBinding(TopicExchange dataExchange, Queue dbQueue) {
|
|
|
+ Binding binding = BindingBuilder
|
|
|
+ .bind(dbQueue)
|
|
|
+ .to(dataExchange)
|
|
|
+ .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + dbQueue.getName());
|
|
|
+ binding.addArgument(RabbitConstant.AUTO_DELETE, true);
|
|
|
+ return binding;
|
|
|
+ }
|
|
|
+ @Bean
|
|
|
+ public Binding dealBinding(TopicExchange dataExchange, Queue dealQueue) {
|
|
|
+ Binding binding = BindingBuilder
|
|
|
+ .bind(dealQueue)
|
|
|
+ .to(dataExchange)
|
|
|
+ .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + dealQueue.getName());
|
|
|
+ binding.addArgument(RabbitConstant.AUTO_DELETE, true);
|
|
|
+ return binding;
|
|
|
+ }
|
|
|
+ @Bean
|
|
|
+ public Binding rawBinding(TopicExchange dataExchange, Queue rawQueue) {
|
|
|
+ Binding binding = BindingBuilder
|
|
|
+ .bind(rawQueue)
|
|
|
+ .to(dataExchange)
|
|
|
+ .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + rawQueue.getName());
|
|
|
+ binding.addArgument(RabbitConstant.AUTO_DELETE, true);
|
|
|
+ return binding;
|
|
|
+ }
|
|
|
}
|