Advanced Features
This section explains advanced features of Transactional Event Queues, including message propagation between queues and the database, and error handling.
Message Propagation
Messages can be propagated within the same database or across a database link to different queues or topics. Message propagation is useful for workflows that require processing by different consumers or for event-driven actions that need to trigger subsequent processes.
Queue to Queue Message Propagation
Create and start two queues. source
will be the source queue, and dest
will be the propagated destination queue.
begin
dbms_aqadm.create_transactional_event_queue(
queue_name => 'source',
queue_payload_type => 'JSON',
multiple_consumers => true
);
dbms_aqadm.create_transactional_event_queue(
queue_name => 'dest',
queue_payload_type => 'JSON',
multiple_consumers => true
);
dbms_aqadm.start_queue(
queue_name => 'source'
);
dbms_aqadm.start_queue(
queue_name => 'dest'
);
end;
/
Schedule message propagation so messages from source
are propagated to dest
, using DBMS_AQADM.SCHEDULE_PROPAGATION
procedure.
begin
dbms_aqadm.schedule_propagation(
queue_name => 'source',
destination_queue => 'dest'
);
end;
/
Let’s enqueue a message into source
. We expect this message to be propagated to dest
:
declare
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msg_id raw(16);
message json;
body varchar2(200) := '{"content": "this message is propagated!"}';
begin
select json(body) into message;
dbms_aq.enqueue(
queue_name => 'source',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msg_id
);
commit;
end;
/
If propagation does not occur, check the JOB_QUEUE_PROCESSES
parameter and ensure its value is high enough. If the value is very low, you may need to update it with a larger value:
alter system set job_queue_processes=10;
Stopping Queue Propagation
You can stop propagation using the DBMS_AQADM.UNSCHEDULE_PROPAGATION procedure:
begin
dbms_aqadm.unschedule_propagation(
queue_name => 'source',
destination_queue => 'dest'
);
end;
/
Your can view queue subscribers and propagation schedules from the respective DBA_QUEUE_SCHEDULES
and DBA_QUEUE_SUBSCRIBERS
system views. These views are helpful for debugging propagation issues, including error messages and schedule status.
Using Database Links
To propagate messages between databases, a database link from the local database to the remote database must be created. The subscribe and propagation commands must be altered to use the database link.
begin
dbms_aqadm.schedule_propagation(
queue_name => 'source',
-- replace with your database link
destination => 'database_link',
-- replace with your remote schema and queue name
destination_queue => 'schema.queue_name'
);
end;
/
Error Handling
Error handling is a critical component of message processing, ensuring malformed or otherwise unprocessable messages are handled correctly. Depending on the message payload and exception, an appropriate action should be taken to either replay, discard, or otherwise process the failed message. If a message cannot be dequeued due to errors, it may be moved to the exception queue, if one exists.
For errors on procedures like enqueue you may also use the standard SQL exception mechanisms:
declare
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
msg_id raw(16);
message json;
message_buffer varchar2(500);
begin
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.wait := dbms_aq.no_wait;
dbms_aq.dequeue(
queue_name => 'json_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => msg_id
);
select json_value(message, '$.content') into message_buffer;
dbms_output.put_line('message: ' || message_buffer);
exception
when others then
rollback;
dbms_output.put_line('error dequeuing message: ' || sqlerrm);
end;
/