qmq-spring-boot-starter

Apache
Java
跨平台
2019-07-28
JimLynd
qmq-spring-boot-starter 正在参加 2019 年度最受欢迎开源中国软件评选,请投票支持!
qmq-spring-boot-starter 在 2019 年度最受欢迎开源中国软件评选 中已获得 {{ projectVoteCount }} 票,请投票支持!
投票赢奖品
已投票

使用方式

引入 Maven 依赖(已上传到中央仓库)

<dependency>
    <groupId>xin.wjtree.qmq</groupId>
    <artifactId>qmq-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

添加 Spring Boot 配置(YML)

spring:
  application:
    name: qmq-demo
  qmq:
    # 应用标识 appcode,必填
    app-code: qmq-demo
    # 服务器地址 metaserver,必填
    meta-server: http://127.0.0.1:8080/meta/address

    # 生产者配置,发送消息的线程池的设置,选填
    producer:
      # 发送线程数,默认 3
      send-threads: 3
      # 默认每次发送时最大批量大小,默认 30
      send-batch: 30
      # 如果消息发送失败,重试次数,默认 10
      send-try-count: 10
      # 异步发送队列大小,默认 10000
      max-queue-size: 10000

    # 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject
    template:
      default-subject: default_subject

    # 消费者配置,消费消息的线程池的设置,选填
    consumer:
      # 线程名称前缀,默认 qmq-process
      thread-name-prefix: qmq-process
      # 线程池大小,默认 2
      core-pool-size: 2
      # 最大线程池大小,默认 2
      max-pool-size: 2
      # 线程池队列大小,默认 1000
      queue-capacity: 1000

    # 消息主题和分组配置,选填
    # 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组
    subject:
      sub1: sub1
      sub2: sub2
      sub3: sub3
      # more subject ...
    group:
      group1: group1
      group2: group2
      group3: group3
      # more group ...

logging:
  level:
    # 设置 qmq-spring-boot-starter 的日志级别
    xin.wjtree.qmq: trace

server:
  port: 8989

发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import xin.wjtree.qmq.QmqTemplate;
import xin.wjtree.qmq.autoconfigure.QmqProperties;
import xin.wjtree.qmq.constant.QmqTimeUnit;
import xin.wjtree.qmq.internal.QmqAlias;
import xin.wjtree.qmq.internal.QmqIgnore;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest
public class QmqTest {
    @Resource
    private QmqTemplate template;
    @Resource
    private QmqProperties properties;

    /**
     * 发送即时消息
     * @throws InterruptedException
     */
    @Test
    public void sendImmediate() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 一般使用 template.send(properties.getSubject().get("sub1"), getUser()) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).send(properties.getSubject().get("sub1"), getUser());

        // 计数器减1
        latch.await();
    }

    /**
     * 发送延时消息
     * @throws InterruptedException
     */
    @Test
    public void sendDelay() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 延时 10 秒发送消息
        // 一般使用 template.sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS);

        // 计数器减1
        latch.await();
    }

    /**
     * 发送定时消息
     * @throws InterruptedException
     */
    @Test
    public void sendSchedule() throws InterruptedException, ParseException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 定时发送的日期时间
        Date date = new SimpleDateFormat("yyyy-MM-dd HH🇲🇲ss").parse("2019-07-28 00:16:00");

        // 一般使用 template.sendSchedule(properties.getSubject().get("sub1"), getUser(), date) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).sendSchedule(properties.getSubject().get("sub1"), getUser(), date);

        // 计数器减1
        latch.await();
    }

    public User getUser() {
        User user = new User();
        user.setId(100000000001L);
        user.setName("张三");
        user.setAge(120);
        user.setSchool("北京大学");
        user.setCompany("中石油");
        user.setDuty("行政总裁");
        user.setSalary(new BigDecimal("1000000"));
        user.setEnable(true);
        return user;
    }

    public static class User {
        @QmqAlias("user_id")
        private Long id;

        private String name;

        private Integer age;

        @QmqAlias("school_name")
        private String school;

        private String company;

        @QmqIgnore
        private String duty;

        private BigDecimal salary;

        private Boolean enable;

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }

        public String getCompany() {
            return company;
        }

        public void setCompany(String company) {
            this.company = company;
        }

        public String getDuty() {
            return duty;
        }

        public void setDuty(String duty) {
            this.duty = duty;
        }

        public BigDecimal getSalary() {
            return salary;
        }

        public void setSalary(BigDecimal salary) {
            this.salary = salary;
        }

        public Boolean getEnable() {
            return enable;
        }

        public void setEnable(Boolean enable) {
            this.enable = enable;
        }
    }
}

消费消息

启用消费者模式

  • 在配置类上添加 EnableQmq 注解,包括 appCode 和 metaServer 属性
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import qunar.tc.qmq.consumer.annotation.EnableQmq;

@EnableQmq(appCode="${spring.qmq.app-code}", metaServer="${spring.qmq.meta-server}")
@SpringBootApplication
public class QmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(QmqApplication.class, args);
    }

}

配置消费监听器

  • 在方法上添加 QmqConsumer 注解,包括 subject,consumerGroup,executor 等属性
  • executor = QmqConstant.EXECUTOR_NAME 表示消费线程池的 BeanName,该值固定为 qmqExecutor
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.consumer.annotation.QmqConsumer;
import xin.wjtree.qmq.constant.QmqConstant;

@Slf4j
@Component
public class QmqLinstener {

    @QmqConsumer(subject = "${spring.qmq.subject.sub1}", consumerGroup = "${spring.qmq.group.group1}",
            executor = QmqConstant.EXECUTOR_NAME)
    public void onMessage(Message message) {
        log.info("qmq 消费主题:{},消费消息:{}", message.getSubject(), ((BaseMessage) message).getAttrs());
    }

}
的码云指数为
超过 的项目
加载中

评论(1)

君千殇520
君千殇520
大兄弟啊,你开源项目,最起码有个项目介绍吧,是主要做什么用的,看你东西的不一定都是开发人员,你让人家看代码实现来猜你这东西是来做啥的吗?

暂无资讯

暂无问答

Spring Boot的启动器Starter详解

Spring Boot的启动器Starter详解 Spring Boot应用启动器基本的一共有44种,具体如下: 1)spring-boot-starter 这是Spring Boot的核心启动器,包含了自动配置、日志和YAML。 2)spring-boot...

2018/02/22 13:12
225
0
Starter POMs详解

Starter POMs是可以包含到应用中的一个方便的依赖关系描述符集合。你可以获取所有Spring及相关技术的一站式服务,而不需要翻阅示例代码,拷贝粘贴大量的依赖描述符。例如,如果你想使用Sprin...

2017/09/25 16:38
35
0
springboot 系列教程二:常用starter介绍

springboot非常的流行,就是因为starter的存在,starter是springboot的核心,可以理解成可插拔的插件,你想要什么插件配置什么插件就可以,比如我想要使用mybatis,那么配置starter-mybatis...

04/13 11:22
110
5
Spring-Boot JAR 包说明

名称 描述 spring-boot-starter Spring Boot核心引导类,包括对auto-config,logging,YAML的支持 spring-boot-starter-actuator 一些有助于管理自己应用程序的生产环境特性 spring-boot-star...

2015/02/06 17:29
154
0
各Spring-Boot-Starters介绍

Spring-Boot-Starters 最通俗的理解— jar 包,引用了什么 starter,就获得了什么的依赖。用游戏类比: SpringBoot—>WeGame spring-boot-starter-web—>英雄联盟及其小工具的合集 SpringMV...

2018/10/17 09:55
105
0
Spring boot 启动器

启动器名称 启动器说明 spring-boot-starter 核心模块,包含自动配置支持、日志库和对YAML配置文件的支持 spring-boot-starter-amqp 支持AMQP,包含 spring-rabbit spring-boot-starter-aop ...

2018/05/02 15:18
31
0
Spring Boot 自定义 starter

文章首发于微信公众号《程序员果果》 地址:https://mp.weixin.qq.com/s/F_1j-ng49QNlbj04Q9bqFQ 一、简介 SpringBoot 最强大的功能就是把我们常用的场景抽取成了一个个starter(场景启动器)...

05/28 09:40
7
0

没有更多内容

加载失败,请刷新页面

返回顶部
顶部