Oracle高级队列(Advanced Queue)简单实例
Oracle高级队列(Advanced Queue,简称AQ)是Oracle数据库提供的一种消息队列机制,它允许应用程序通过数据库实现异步通信,解耦生产者和消费者,提升系统的可扩展性和稳定性。本文将通过一个简单实例,演示如何创建队列、入队消息和出队消息。
一、准备工作:创建用户并授权
首先需要创建一个测试用户,并为其授予使用高级队列所需的权限。以下是创建用户和授权的SQL语句:
-- 创建测试用户 CREATE USER aq_test IDENTIFIED BY aq_test_password; -- 授予基本连接和资源权限 GRANT CONNECT, RESOURCE TO aq_test; -- 授予高级队列相关权限 GRANT AQ_ADMINISTRATOR_ROLE TO aq_test; GRANT EXECUTE ON DBMS_AQ TO aq_test; GRANT EXECUTE ON DBMS_AQADM TO aq_test;
执行以上语句后,使用新创建的
二、创建消息载荷类型
高级队列中的消息需要定义明确的载荷类型,通常我们使用对象类型来定义消息结构。以下是一个简单的订单消息类型示例:
-- 创建订单消息对象类型 CREATE TYPE order_msg_type AS OBJECT ( order_id NUMBER, order_name VARCHAR2(100), order_time DATE, amount NUMBER(10,2) ); / -- 创建该类型的队列载荷类型(用于队列存储) CREATE TYPE order_queue_payload AS TABLE OF order_msg_type; /
这里定义的order_msg_type包含订单ID、订单名称、下单时间和订单金额四个字段,后续队列中的消息都将遵循这个结构。
三、创建队列表和队列
队列表用于存储队列中的消息,队列则是操作消息的接口。我们需要先创建队列表,再基于队列表创建队列。
1. 创建队列表
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'order_queue_table', -- 队列表名称 queue_payload_type => 'order_msg_type', -- 载荷类型 multiple_consumers => FALSE -- 是否为多消费者队列,FALSE表示单消费者 ); END; /
2. 创建队列
BEGIN DBMS_AQADM.CREATE_QUEUE( queue_name => 'order_queue', -- 队列名称 queue_table => 'order_queue_table' -- 关联的队列表 ); END; /
3. 启动队列
队列创建后默认是未启动状态,需要手动启动才能使用:
BEGIN DBMS_AQADM.START_QUEUE( queue_name => 'order_queue' -- 要启动的队列名称 ); END; /
四、入队操作:发送消息
入队操作是将消息放入队列的过程,使用DBMS_AQ.ENQUEUE过程实现。以下是一个入队示例:
DECLARE
v_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; -- 入队选项
v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; -- 消息属性
v_msgid RAW(16); -- 消息ID
v_order_msg order_msg_type; -- 订单消息对象
BEGIN
-- 初始化消息内容
v_order_msg := order_msg_type(
1001,
'笔记本电脑订单',
SYSDATE,
5999.00
);
-- 执行入队操作
DBMS_AQ.ENQUEUE(
queue_name => 'order_queue',
enqueue_options => v_enqueue_options,
message_properties => v_message_properties,
payload => v_order_msg,
msgid => v_msgid
);
-- 提交事务,确保消息持久化
COMMIT;
DBMS_OUTPUT.PUT_LINE('消息入队成功,消息ID:' || RAWTOHEX(v_msgid));
END;
/执行上述代码后,消息会被存入order_queue队列中,等待消费者处理。
五、出队操作:接收消息
出队操作是从队列中取出消息的过程,使用DBMS_AQ.DEQUEUE过程实现。以下是一个出队示例:
DECLARE
v_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; -- 出队选项
v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; -- 消息属性
v_msgid RAW(16); -- 消息ID
v_order_msg order_msg_type; -- 订单消息对象
BEGIN
-- 设置出队选项:从队列头部取消息,等待时间10秒(如果没有消息则等待10秒后返回)
v_dequeue_options.wait := 10;
-- 执行出队操作
DBMS_AQ.DEQUEUE(
queue_name => 'order_queue',
dequeue_options => v_dequeue_options,
message_properties => v_message_properties,
payload => v_order_msg,
msgid => v_msgid
);
-- 输出消息内容
DBMS_OUTPUT.PUT_LINE('消息出队成功,消息ID:' || RAWTOHEX(v_msgid));
DBMS_OUTPUT.PUT_LINE('订单ID:' || v_order_msg.order_id);
DBMS_OUTPUT.PUT_LINE('订单名称:' || v_order_msg.order_name);
DBMS_OUTPUT.PUT_LINE('下单时间:' || TO_CHAR(v_order_msg.order_time, 'yyyy-mm-dd hh24:mi:ss'));
DBMS_OUTPUT.PUT_LINE('订单金额:' || v_order_msg.amount);
-- 提交事务,确认消息已被处理
COMMIT;
END;
/执行上述代码后,会从队列中取出之前入队的消息,并打印出消息的详细内容,同时该消息会从队列中移除。
六、清理环境(可选)
如果不再需要使用队列,可以按照以下步骤清理相关对象:
BEGIN -- 停止队列 DBMS_AQADM.STOP_QUEUE(queue_name => 'order_queue'); -- 删除队列 DBMS_AQADM.DROP_QUEUE(queue_name => 'order_queue'); -- 删除队列表 DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'order_queue_table'); -- 删除消息类型(注意顺序,先删依赖的对象) DROP TYPE order_msg_type; END; /
七、注意事项
所有队列操作都需要在事务中执行,入队和出队后需要显式提交或回滚事务,否则消息可能不会持久化或出队状态无法确认。
多消费者队列需要额外配置订阅者,本实例为单消费者场景,实际使用中可根据需求调整
multiple_consumers参数。消息出队后默认会被移除,如果需要保留消息,可以设置消息属性中的
RETENTION参数。生产环境和消费者可以是不同的会话甚至不同的数据库用户,只要具备相应的队列操作权限即可。