JMS监听Oracle AQ

  • 该文档中,jdk版本1.8,java项目为maven构建的springboot项目,并使用了定时任务来做AQ监听的重连功能,解决由于外部原因导致连接断裂之后,需要手动重启项目才能恢复连接的问题

  • github源码位置

  • gitee源码位置

一、创建队列

1.1.管理员登录执行

  • 管理员登录,执行授权操作,oracle使用队列需要单独的授权,默认未开启,须手动开启,授权命令如下,username使用自己的用户名即可
GRANT EXECUTE ON SYS.DBMS_AQ to 'username';GRANT EXECUTE ON SYS.DBMS_AQADM to 'username';GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username';GRANT EXECUTE ON SYS.DBMS_AQIN to 'username';GRANT EXECUTE ON SYS.DBMS_JOB to 'username';

1.2.用户登录执行执行

1.2.1. 创建消息负荷payload

  • 创建的此type用来封装队列所带的,根据实际需求进行创建
CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT(  param_1             VARCHAR2(100),  param_2             VARCHAR2(100))

1.2.2. 创建队列表

  • 创建对列表,并指定队列数据的类型,队列表名自定义即可,数据类型使用上面刚创建的type
begin  sys.dbms_aqadm.create_queue_table(    queue_table => 'QUEUE_TABLE',    queue_payload_type => 'TYPE_QUEUE_INFO',    sort_list => 'ENQ_TIME',    compatible => '10.0.0',    primary_instance => 0,    secondary_instance => 0);end;

1.2.3. 创建队列并启动

  • 创建名称为QUEUE_TEST的队列,并指定对列表名【同一个oracle用户下,可以有多个对列表,同一个对列表中,可以有多个队列】
begin  sys.dbms_aqadm.create_queue(    queue_name => 'QUEUE_TEST',    queue_table => 'QUEUE_TABLE',    queue_type => sys.dbms_aqadm.normal_queue,    max_retries => 5,    retry_delay => 0,    retention_time => 0);end;
  • 刚创建的队列的状态默认是未开启的,需要手动开启一下,同理,存在删除、停止等操作
begin  -- 启动队列  sys.dbms_aqadm.start_queue(      queue_name => 'QUEUE_TEST'  );    -- 暂停队列  --sys.dbms_aqadm.STOP_QUEUE(  --    queue_name => 'QUEUE_TEST'  --);    -- 删除队列  --sys.dbms_aqadm.DROP_QUEUE(  --    queue_name => 'QUEUE_TEST'  --);    -- 删除对列表  --sys.dbms_aqadm.DROP_QUEUE_TABLE(  --    queue_table => 'QUEUE_TABLE'  --);end;

1.2.4. 创建存储过程

  • 储存过程的作用为把数据加载到队列中,生成的新的队列会自动添加进绑定的对列表中,等待消费者进行消费
CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as  r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;  v_message_handle     RAW(16);  o_payload            TYPE_QUEUE_INFO;begin  -- 封装最终消息  o_payload := TYPE_QUEUE_INFO(param_1, param_2);  -- 入队操作,指定队列  dbms_aq.enqueue(queue_name         => 'QUEUE_TEST',                  enqueue_options    => r_enqueue_options,                  message_properties => r_message_properties,                  payload            => o_payload,                  msgid              => v_message_handle);  -- 出队操作  --dbms_aq.enqueue(queue_name => 'QUEUE_TEST',  --                dequeue_options => r_dequeue_options,  --                message_properties => r_message_properties,  --                payload => o_payload,  --                msgid => v_message_handle);end pro_queue;

二、Java中JMS的使用

2.1. 项目配置

2.1.1. maven

<dependency>      <groupId>com.oracle</groupId>      <artifactId>jmscommon</artifactId>      <version>1.2</version></dependency><dependency><groupId>com.oracle</groupId><artifactId>orai18n</artifactId><version>1.2</version></dependency><dependency><groupId>com.oracle</groupId><artifactId>jta</artifactId><version>1.2</version></dependency><dependency><groupId>com.oracle</groupId><artifactId>aqapi_g</artifactId><version>1.2</version></dependency>

2.1.2. yml

spring:  datasource:    url: jdbc:oracle:thin:@ip:port/sid    username: **    password: **    queue:  aq:    # 该队列是否可用,用来控制队列的加载和重连,不可省略    enable: true    # 队列名称,不可省略    name: QUEUE_TEST    # 队列重连的定时任务对应的时间表达式,不可省略    cron: 0 */1 * * * ?

2.2. AQ初始化

  • 在项目启动结束后立即运行此类,会根据所配置的队列名称监听对应的队列
package com.wangqq.jms;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * @Title: MessageAQInit.java * @Description: AQ 初始化 * @author wangqq * @date 2020年6月28日 下午3:45:23 * @version 1.0 */@Componentpublic class MessageAQInit implements CommandLineRunner {    @Autowired    private MessageAQConfig aqConfig;    @Autowired    private MessageAQListener listener;    @Override    public void run(String... args) throws RuntimeException {        // 检查消息队列是否启用        if (aqConfig.enable) {            // 设置AQ的消息监听器            MessageAQConnection.setListener(listener);            // 初始化AQ连接            if (!MessageAQConnection.initFactory(aqConfig)) {                throw new RuntimeException("Message Oracle AQ initialization failed!");            }            // 建立连接            if (!MessageAQConnection.establishConnection(aqConfig)) {                throw new RuntimeException("Message Oracle AQ connection failed!");            }        }    }}

2.3. 配置信息类

  • 配置类,将yml的配置文件转为java对象【时间表达式在代码中不会以对象属性的方式被使用,因此在该类中没有设置】
package com.wangqq.jms;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * @Title: MessageAQConfig.java * @Description: ORACLE 消息队列配置 * @author wangqq * @date 2020年6月28日 下午3:36:08 * @version 1.0 */@Componentpublic class MessageAQConfig {        /** 是否开启MessageAq功能 */    @Value("${queue.aq.enable}")    public Boolean enable;        /** 数据库用户名 */@Value("${spring.datasource.username}")public String userName;/** 数据库密码 */@Value("${spring.datasource.password}")public String password;/** 数据库地址url */@Value("${spring.datasource.url}")public String url;/** 队列名称 */@Value("${queue.aq.name}")public String queue;}

2.4. AQ 连接工厂类

  • AQ 链接的核心类,根据配置对象以及注入的监听对象,动态监听AQ队列
package com.wangqq.jms;import javax.jms.Queue;import javax.jms.Session;import lombok.extern.slf4j.Slf4j;import oracle.jms.AQjmsConnection;import oracle.jms.AQjmsConnectionFactory;import oracle.jms.AQjmsConsumer;import oracle.jms.AQjmsSession;/** * @Title: MessageAQConnection.java * @Description: AQ 连接 * @author wangqq * @date 2020年6月28日 下午3:50:32 * @version 1.0 */@Slf4jpublic class MessageAQConnection {    private static AQjmsConnectionFactory aQjmsConnectionFactory;    private static AQjmsConsumer aQjmsConsumer;    private static AQjmsSession aQjmsSession;    private static AQjmsConnection aQjmsConnection;    private static MessageAQListener listener;    /**     * 设置JMS监听器     *      * @param messageAqJmsListener     * @author wangqq     * @date 2020年7月6日 上午8:33:57     */    public static void setListener(MessageAQListener messageAqJmsListener) {        listener = messageAqJmsListener;    }    /**     * 初始化 AQ 连接 Factory     *     * @param aqConfig 消息队列配置     * @return 是否成功     */    public static boolean initFactory(MessageAQConfig aqConfig) {        try {            aQjmsConnectionFactory = new AQjmsConnectionFactory();            aQjmsConnectionFactory.setJdbcURL(aqConfig.url);            aQjmsConnectionFactory.setUsername(aqConfig.userName);            aQjmsConnectionFactory.setPassword(aqConfig.password);            return true;        } catch (Exception e) {            log.error(e.getMessage(), e);            return false;        }    }    /**     * 连接消息队列     *     * @param aqConfig 消息队列配置     * @return 是否成功     */    public static boolean establishConnection(MessageAQConfig aqConfig) {        try {            aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection();            aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);            aQjmsConnection.start();            Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue);            aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null,                    false);            aQjmsConsumer.setMessageListener(listener);            return true;        } catch (Exception e) {            log.error(e.getMessage(), e);            return false;        }    }    /**     * 关闭消息队列连接     *     * @return 是否成功     */    public static boolean closeConnection() {        try {            aQjmsConsumer.close();            aQjmsSession.close();            aQjmsConnection.close();            return true;        } catch (Exception e) {            log.error(e.getMessage(), e);            return false;        }    }}

2.5. 创建AQ 数据承载类

  • 用来接收oracle队列中所带的参数,基本保证与数据库中的type格式相同即可
package com.wangqq.bean;import lombok.Builder;import lombok.Data;/** * @Title: Test.java * @Description: AQ 数据承载类 * @author wangqq * @date 2021-01-20 16:19:16 * @version 1.0 */@Data@Builderpublic class Test {        private String param_1;        private String param_2;    }

2.6. 数据类型转换

  • 将oracleAq所承载的数据,转化为我们自己需要的实例对象,及上述中的Test对象
package com.wangqq.jms;import java.sql.Connection;import java.sql.SQLException;import java.sql.Struct;import com.wangqq.bean.Test;import lombok.NoArgsConstructor;import lombok.extern.slf4j.Slf4j;import oracle.sql.Datum;import oracle.sql.ORAData;import oracle.sql.ORADataFactory;/** * @Title: MessageORAData.java * @Description: 数据类型转换类 * @author synjones * @date 2018年12月3日 上午11:29:50 * @version 1.0 */@Slf4j@NoArgsConstructorpublic class MessageORAData implements ORAData, ORADataFactory {    private Object[] rawData = new Object[8];        private static final MessageORAData MESSAGE_FACTORY = new MessageORAData();    public static ORADataFactory getFactory() {        return MESSAGE_FACTORY;    }    @Override    public ORAData create(Datum datum, int sqlType) throws SQLException {        if (datum == null) {            return null;        } else {            try {                MessageORAData payOraData = new MessageORAData();                Struct aStruct = (Struct) datum;                payOraData.rawData = aStruct.getAttributes();                return payOraData;            } catch (Exception e) {                log.error(e.getMessage(), e);                return null;            }        }    }    @Override    public Datum toDatum(Connection arg0) throws SQLException {        return null;    }    /**     * 消息内容解析并封装     *      * @return     * @author wangqq     * @date 2020年7月6日 上午8:38:01     */    public Test getContent() {        try {            return Test.builder()                .param_1(rawData[0] == null ? null : rawData[0].toString())                .param_2(rawData[0] == null ? null : rawData[0].toString())                .build();        } catch (Exception e) {            log.error(e.getMessage(), e);            return null;        }    }}

2.7. AQ 监听

package com.wangqq.jms;import javax.jms.Message;import javax.jms.MessageListener;import org.springframework.stereotype.Component;import com.wangqq.bean.Test;import lombok.extern.slf4j.Slf4j;import oracle.jms.AQjmsAdtMessage;/** * @Title: JMSListener.java * @Description: JMS监听ORACLEAQ的队列消息 * @author wangqq * @date 2020年6月28日 上午11:23:42 * @version 1.0 */@Slf4j@Componentpublic class MessageAQListener implements MessageListener {        @Override    public void onMessage(Message message1) {        AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1;        try {            MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload();            // 获取消息内容            Test test = payload.getContent();                        System.out.println(test.toString());                    } catch (Exception e) {            log.error(e.getMessage(), e);        }    }}

2.8. AQ 监控任务, 在AQ断开后重连

  • 通过定时任务,定时查询是否有入队时间在5分钟之内的队列未被消费【队列入队后,会在对列表中产生一条数据,消费之后该数据会被清除掉】,若存在,则说明监听异常,需要重新创建连接监听队列
  • 数据库对列表中的入队时间在本次测试中为0时区的时间,故而在代码中转换了一下时区,否则无法根据入队时间查询数据
package com.wangqq.jms;import java.util.Date;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import com.wangqq.mapper.MessageAqMapper;import com.wangqq.util.DateUtil;import lombok.extern.slf4j.Slf4j;/** * @Title: MessageAQMonitor.java * @Description: AQ 监控任务, 在AQ断开后重连 * @author wangqq * @date 2020年6月28日 下午4:35:31 * @version 1.0 */@Slf4j@Componentpublic class MessageAQMonitor {    @Autowired    private MessageAQConfig aqConfig;    @Autowired    private MessageAqMapper aqMapper;    @Scheduled(cron = "${queue.aq.cron}")    private void monitorJob() {        // 检查消息队列是否启用        if (!aqConfig.enable) {            return;        }        // 获取当前时间,并向前推5分钟        String formatDateTime = DateUtil.formatDate(new Date(System.currentTimeMillis() - 300000));        // 将该时间转为0时区的时间【数据库中存储的队列时间为0时区的时间】        String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss");        // 查询是否存在5分钟以前的队列未被消费        int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime);        if (selectCount != 0) {            // 若存在,则重新启动监听            if (MessageAQConnection.closeConnection()) {                log.info("--> AQ connection has been closed.");                if (MessageAQConnection.establishConnection(aqConfig)) {                    log.info("--> AQ connection has been re-established.");                }            }        }    }}

2.9. 队列表中队列数量的查询

  • 根据队列名称和入队时间,查询在入队时间之后入对的队列数量
package com.wangqq.mapper;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Select;/** * @Title: MessageAqMapper.java * @Description: oracleAQ的查询 * @author wangqq * @date 2020年6月28日 下午4:04:50 * @version 1.0 */@Mapperpublic interface MessageAqMapper {    /**     *      * 查询数据库中的队列表中符合条件的队列的条数     *     * @param qName         队列名称     * @param minDatetime   队列入队的最小时间     * @return     * @author wangqq     * @date 2020-07-10 15:44:43     */    @Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} "            + "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}")    int selectCount(String qName, String minDatetime);}

2.10. 日期工具类

package com.wangqq.util;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.TimeZone;/** * @Title: DateUtil.java * @Description: 日期工具类 * @author wangqq * @date 2018年10月29日 下午5:27:21 * @version 1.0 */public class DateUtil {/** * 字符串转date,默认格式yyyy-MM-dd HH:mm:ss *  * @param source * @return */public static Date parseDate(String source) {return parseDate(source, "yyyy-MM-dd HH:mm:ss");}/** * 字符串转date *  * @param source * @param pattern *            格式 * @return */public static Date parseDate(String source, String pattern) {if (source == null || source.equals("")) {return null;}SimpleDateFormat sdf = new SimpleDateFormat(pattern);try {return sdf.parse(source);} catch (ParseException e) {e.printStackTrace();return null;}}/** * 格式化日期,默认格式yyyy-MM-dd HH:mm:ss *  * @param date * @return */public static String formatDate(Date date) {return formatDate(date, "yyyy-MM-dd HH:mm:ss");}/** * 格式化日期 *  * @param date * @param pattern *            格式 * @return */public static String formatDate(Date date, String pattern) {if (date == null) {return null;}SimpleDateFormat sdf = new SimpleDateFormat(pattern);return sdf.format(date);}/**     * 时区 时间转换方法:将传入的时间(可能为其他时区)转化成目标时区对应的时间     * @param sourceTime 时间格式必须为:yyyy-MM-dd HH:mm:ss     * @param sourceId 入参的时间的时区id 比如:+08:00     * @param targetId 要转换成目标时区id 比如:+09:00     * @param reFormat 返回格式 默认:yyyy-MM-dd HH:mm:ss     * @return string 转化时区后的时间     */    public static String timeConvert(String sourceTime, String sourceId,            String targetId,String reFormat){        //校验入参是否合法        if (null == sourceId || "".equals(sourceId) || null == targetId                || "".equals(targetId) || null == sourceTime                || "".equals(sourceTime)){            return null;        }                if(reFormat == null || "".equals(reFormat)){            reFormat = "yyyy-MM-dd HH:mm:ss";        }                //校验 时间格式必须为:yyyy-MM-dd HH:mm:ss        String reg = "^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$";        if (!sourceTime.matches(reg)){            return null;        }                try{            //时间格式            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");            //根据入参原时区id,获取对应的timezone对象            TimeZone sourceTimeZone = TimeZone.getTimeZone("GMT"+sourceId);            //设置SimpleDateFormat时区为原时区(否则是本地默认时区),目的:用来将字符串sourceTime转化成原时区对应的date对象            df.setTimeZone(sourceTimeZone);            //将字符串sourceTime转化成原时区对应的date对象            java.util.Date sourceDate = df.parse(sourceTime);                        //开始转化时区:根据目标时区id设置目标TimeZone            TimeZone targetTimeZone = TimeZone.getTimeZone("GMT"+targetId);            //设置SimpleDateFormat时区为目标时区(否则是本地默认时区),目的:用来将字符串sourceTime转化成目标时区对应的date对象            df.setTimeZone(targetTimeZone);            //得到目标时间字符串            String targetTime = df.format(sourceDate);                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");            java.util.Date date = sdf.parse(targetTime);            sdf = new SimpleDateFormat(reFormat);                        return sdf.format(date);        }        catch (ParseException e){            e.printStackTrace();        }        return null;    }}
(0)

相关推荐