Message Operations

This section explains message operations using queues, topics, and different programming interfaces (SQL, Java, Spring JMS, and more). You’ll learn how to enqueue, dequeue, and manage messages effectively.

Enqueue and Dequeue, or Produce and Consume

Queues

When working with queues, the preferred terms for adding and retrieving messages from Transactional Event Queues are enqueue and dequeue.

Topics

When using topics, the preferred terms are produce and consume. A service that writes data to a topic is called a producer, and a service that reads data from a topic is called a consumer.

Enqueuing and Dequeuing with SQL

To write data to a queue, the queue must be both created and started. See Queue Management for creating and starting queues.

The following SQL script enqueues a message for a queue with the JSON payload type:

declare
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    msg_id raw(16);
    message json;
    body varchar2(200) := '{"content": "my first message"}';
begin
    select json(body) into message;
    dbms_aq.enqueue(
        queue_name => 'json_queue',
        enqueue_options => enqueue_options,
        message_properties => message_properties,
        payload => message,
        msgid => msg_id
    );
    commit;
end;
/

Next, we’ll dequeue the message and print it to the console:

declare
    dequeue_options dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_t;
    msg_id raw(16);
    message json;
    message_buffer varchar2(500);
begin
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.wait := dbms_aq.no_wait;
    
    dbms_aq.dequeue(
        queue_name => 'json_queue',
        dequeue_options => dequeue_options,
        message_properties => message_properties,
        payload => message,
        msgid => msg_id
    );
    select json_value(message, '$.content') into message_buffer;
    dbms_output.put_line('message: ' || message_buffer);
end;
/

You may also query a message’s content by ID from the underlying queue table:

select q.user_data from json_queue q
where msgid = '<msg id>'; -- Query using the message ID

-- {"content":"my first message"}

Kafka Producers and Consumers

To produce or consume topic data, the topic must be created. See Queue Management for a topic creation example.

Kafka Producer

The following Java snippet creates an org.oracle.okafka.clients.producer.KafkaProducer instance capable of producing data to Transactional Event Queue topics. Note the use of Oracle Database connection properties, and Kafka producer-specific properties like enable.idempotence and key.serializer.

The org.oracle.okafka.clients.producer.KafkaProducer class implements the org.apache.kafka.clients.producer.Producer interface, allowing it to be used in place of a Kafka Java client producer.

Properties props = new Properties();
// Use your database service name
props.put("oracle.service.name", "freepdb1");
// Choose PLAINTEXT or SSL as appropriate for your database connection
props.put("security.protocol", "SSL");
// Your database server 
props.put("bootstrap.servers", "my-db-server");
// Path to directory containing ojdbc.properties
// If using Oracle Wallet, this directory must contain the unzipped wallet (such as in sqlnet.ora)
props.put("oracle.net.tns_admin", "/my/path/");

props.put("enable.idempotence", "true");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> okafkaProducer = new KafkaProducer<>(props);

The following Java class produces a stream of messages to a topic, using the Kafka Java Client for Oracle Database Transactional Event Queues. Note that the implementation does not use any Oracle-specific classes, only Kafka interfaces. This allows developers to drop in an org.oracle.okafka.clients.producer.KafkaProducer instance without code changes.

import java.util.stream.Stream;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SampleProducer<T> implements Runnable, AutoCloseable {
    private final Producer<String, T> producer;
    private final String topic;
    private final Stream<T> inputs;

    public SampleProducer(Producer<String, T> producer, String topic, Stream<T> inputs) {
        this.producer = producer;
        this.topic = topic;
        this.inputs = inputs;
    }

    @Override
    public void run() {
        inputs.forEach(t -> {
            System.out.println("Produced record: " + t);
            producer.send(new ProducerRecord<>(topic, t));
        });
    }

    @Override
    public void close() throws Exception {
        if (this.producer != null) {
            producer.close();
        }
    }
}
Kafka Consumer

The following Java snippet creates an org.oracle.okafka.clients.consumer.KafkaConsumer instance capable of records from Transactional Event Queue topics. Note the use of Oracle Database connection properties, and Kafka consumer-specific properties like group.id and max.poll.records.

The org.oracle.okafka.clients.consumer.KafkaConsumer class implements the org.apache.kafka.clients.consumer.Consumer interface, allowing it to be used in place of a Kafka Java client consumer.

Properties props = new Properties();
// Use your database service name
props.put("oracle.service.name", "freepdb1");
// Choose PLAINTEXT or SSL as appropriate for your database connection
props.put("security.protocol", "SSL");
// Your database server 
props.put("bootstrap.servers", "my-db-server");
// Path to directory containing ojdbc.properties
// If using Oracle Wallet, this directory must contain the unzipped wallet (such as in sqlnet.ora)
props.put("oracle.net.tns_admin", "/my/path/");

props.put("group.id" , "MY_CONSUMER_GROUP");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> okafkaConsumer = new KafkaConsumer<>(props);

The following Java class consumes messages from a topic, using the Kafka Java Client for Oracle Database Transactional Event Queues. Like the producer example, the consumer only does not use any Oracle classes, only Kafka interfaces.

import java.time.Duration;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class SampleConsumer<T> implements Runnable, AutoCloseable {
    private final Consumer<String, T> consumer;
    private final String topic;

    public SampleConsumer(Consumer<String, T> consumer, String topic) {
        this.consumer = consumer;
        this.topic = topic;
    }

    @Override
    public void run() {
        consumer.subscribe(List.of(topic));
        while (true) {
            ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(100));
            System.out.println("Consumed records: " + records.count());
            processRecords(records);
            // Commit records when done processing.
            consumer.commitAsync();
        }
    }

    private void processRecords(ConsumerRecords<String, T> records) {
        // Application implementation of record processing.
    }

    @Override
    public void close() throws Exception {
        if (consumer != null) {
            consumer.close();
        }
    }
}

Enqueuing and Dequeuing with JMS

JMS (Java Message Service) provides a standard way to enqueue and dequeue messages. This section shows how to use plain Java JMS APIs and Spring JMS integration using the oracle.jms Java package and the Oracle Spring Boot Starter for AqJms.

JMS APIs

The AQJmsFactory class is used to create a JMS ConnectionFactory, from a Java DataSource or connection parameters. Once configured, the JMS ConnectionFactory instance can be used with standard JMS APIs to produce and consume messages.

The following Java snippet uses a JMS ConnectionFactory to produce a text message.

DataSource ds = // Configure the Oracle Database DataSource according to your database connection information
ConnectionFactory cf = AQjmsFactory.getConnectionFactory(ds);
try (Connection conn = cf.createConnection()) {
    Session session = conn.createSession();
    Queue myQueue = session.createQueue("my_queue");
    MessageProducer producer = session.createProducer(myQueue);
    producer.send(session.createTextMessage("Hello World"));
}

The following Java snippet uses a JMS ConnectionFactory to consume a text message.

DataSource ds = // Configure the Oracle Database DataSource according to your database connection information
ConnectionFactory cf = AQjmsFactory.getConnectionFactory(ds);
try (Connection conn = cf.createConnection()) {
    Session session = conn.createSession();
    Queue myQueue = session.createQueue("my_queue");
    MessageConsumer consumer = session.createConsumer(myQueue);
    conn.start();
    Message msg = consumer.receive(10000); // Wait for 10 seconds
    if (msg != null && msg instanceof TextMessage) {
        TextMessage textMsg = (TextMessage) msg;
        System.out.println("Received message: " + textMsg.getText());
    }
}
Message Operations in Other Languages and APIs

For Python, Javascript, .NET, and ORDS, refer to the respective documentation for code samples:

Message Expiry and Exception Queues

When enqueuing a message, you can specify an expiration time using the expiration attribute of the message_properties object. This sets the number of seconds during which the message is available for dequeuing. Messages that exceed their expiration time are automatically moved to an exception queue for further processing or inspection (including queries or dequeue operations). The exception queue contains any expired or failed messages, and uses the same underlying table as the main queue.

The following SQL script creates a queue for JMS payloads, and an associated exception queue for failed or expired messages.

begin
    dbms_aqadm.create_transactional_event_queue(
            queue_name         => 'my_queue',
            multiple_consumers => false
    );
    dbms_aqadm.start_queue(
            queue_name         => 'my_queue'
    );
    dbms_aqadm.create_eq_exception_queue(
        queue_name => 'my_queue',
        exception_queue_name => 'my_queue_eq'
    );
end;

The following SQL script enqueues a JMS payload, configuring the expiry time so that after 60 seconds, the message is moved to the exception queue.

declare
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    message_handle raw(16);
    message sys.aq$_jms_text_message;
begin
    message := sys.aq$_jms_text_message.construct();
    message.set_text('this is my message');

    message_properties.expiration := 60; -- message expires in 60 seconds
    dbms_aq.enqueue(
        queue_name => 'my_queue',
        enqueue_options => enqueue_options,
        message_properties => message_properties,
        payload => message,
        msgid => message_handle
    );
    commit;
end;
/

Message Delay

When enqueuing a message, you can specify a delay (in seconds) before the message becomes available for dequeuing. Message delay allows you to schedule messages to be available for consumers after a specified time, and is configured using the delay attribute of the message_properties object.

When enqueuing delayed messages, the DELIVERY_TIME column will be configured with the date the message is available for consumers.

declare
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    message_handle raw(16);
    message sys.aq$_jms_text_message;
begin
    message := sys.aq$_jms_text_message.construct();
    message.set_text('this is my message');

    message_properties.delay := 7*24*60*60; -- Delay for 7 days
    dbms_aq.enqueue(
        queue_name => 'my_queue',
        enqueue_options => enqueue_options,
        message_properties => message_properties,
        payload => message,
        msgid => message_handle
    );
    commit;
end;
/

Message Priority

When enqueuing a message, you can specify its priority using the priority attribute of the message_properties object. This attribute allows you to control the order in which messages are dequeued. The lower a message’s priority number, the higher the message’s precedence for consumers.

When enqueuing prioritized messages, the PRIORITY column in the queue table will be populated with the priority number.

declare
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    message_handle raw(16);
    message sys.aq$_jms_text_message;
begin
    message := sys.aq$_jms_text_message.construct();
    message.set_text('this is my message');

    message_properties.priority := 1; -- A lower number indicates higher priority
    dbms_aq.enqueue(
        queue_name => 'my_queue',
        enqueue_options => enqueue_options,
        message_properties => message_properties,
        payload => message,
        msgid => message_handle
    );
    commit;
end;
/

Transactional Messaging: Combine Messaging with Database Queries

Enqueue and dequeue operations occur within database transactions, allowing developers to combine database DML with messaging operations. This is particularly useful when the message contains data relevant to other tables or services within your schema.

In the following example, a DML operation (an INSERT query) is combined with an enqueue operation in the same transaction. If the enqueue operation fails, the INSERT is rolled back. The orders table serves as the example.

create table orders (
    id number generated always as identity primary key,
    product_id number not null,
    quantity number not null,
    order_date date default sysdate
);

declare
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    msg_id raw(16);
    message json;
    body varchar2(200) := '{"product_id": 1, "quantity": 5}';
    product_id number;
    quantity number;
begin
    -- Convert the JSON string to a JSON object
    message := json(body);

    -- Extract product_id and quantity from the JSON object
    product_id := json_value(message, '$.product_id' returning number);
    quantity := json_value(message, '$.quantity' returning number);

    -- Insert data into the orders table
    insert into orders (product_id, quantity)
        values (product_id, quantity);

    -- Enqueue the message
    dbms_aq.enqueue(
        queue_name => 'json_queue',
        enqueue_options => enqueue_options,
        message_properties => message_properties,
        payload => message,
        msgid => msg_id
    );
    commit;
exception
    when others then
        -- Rollback the transaction on error
        rollback;
        dbms_output.put_line('error dequeuing message: ' || sqlerrm);
end;
/

Note: The same pattern applies to the dbms_aq.dequeue procedure, allowing developers to perform DML operations within dequeue transactions.