Spring Cloud Stream Binder
Spring Cloud Stream is a Java framework for building event-driven microservices backed by a scalable, fault-tolerant messaging system. The Oracle Database Transactional Event Queues Stream Binder allows developers to leverage Oracle’s database messaging platform within the Spring Cloud Stream ecosystem.
This section covers the key features of the Spring Cloud Stream Binder for Oracle Database Transactional Event Queues and getting started examples for developers.
Key Features of the Transactional Event Queues Stream Binder
The Spring Cloud Stream Binder for Oracle Database Transactional Event Queues provides a high-throughput, reliable messaging platform built directly into the database.
- Real-time messaging with multiple publishers, consumers, and topics — all with a simple functional interface.
- Convergence of data: Your messaging infrastructure integrates directly with the database, eliminating the need for external brokers.
- Integration with Spring Cloud Stream provides an interface that’s easy-to-use and quick to get started.
Configuring the Transactional Event Queues Stream Binder
In this section, we’ll cover how to configure the Transactional Event Queues Stream Binder for a Spring Boot project.
Project Dependencies
To start developing with the Stream Binder, add the spring-cloud-stream-binder-oracle-txeventq dependency to your Maven project:
<dependency>
<groupId>com.oracle.database.spring.cloud-stream-binder</groupId>
<artifactId>spring-cloud-stream-binder-oracle-txeventq</artifactId>
<version>${txeventq-stream-binder.version}</version>
</dependency>
For Gradle users, add the following dependency to your project:
implementation "com.oracle.database.spring.cloud-stream-binder:spring-cloud-stream-binder-oracle-txeventq:${txeventqStreamBinderVersion}"
Stream Binder Permissions
The database user producing/consuming events with the Stream Binder requires the following database permissions. Modify the username and tablespace grant as appropriate for your application:
-- Grant tablespace as appropriate to your TxEventQ user
grant select_catalog_role to testuser;
grant execute on dbms_aq to testuser;
grant execute on dbms_aqadm to testuser;
grant execute on dbms_aqin to testuser;
grant execute on dbms_aqjms_internal to testuser;
grant execute on dbms_teqk to testuser;
grant execute on DBMS_RESOURCE_MANAGER to testuser;
grant select on sys.aq$_queue_shards to testuser;
grant select on user_queue_partition_assignment_table to testuser;
Stream Binder Database Connection
The Stream Binder uses a standard JDBC connection to produce and consume messages. With YAML-style Spring application properties, it’ll look something like this:
spring:
datasource:
username: ${USERNAME}
password: ${PASSWORD}
url: ${JDBC_URL}
driver-class-name: oracle.jdbc.OracleDriver
type: oracle.ucp.jdbc.PoolDataSourceImpl
oracleucp:
initial-pool-size: 1
min-pool-size: 1
max-pool-size: 30
connection-pool-name: StreamBinderSample
connection-factory-class-name: oracle.jdbc.pool.OracleDataSource
Suppliers, Functions and Consumers
Spring Cloud Stream uses Java suppliers, functions, and consumers to abstract references to the underlying messaging system (in this case, Transactional Event Queues). To illustrate how this works, we’ll create a basic Supplier, Function, and Consumer with Spring Cloud Stream.
Our example workflow will use three functional interfaces:
- A Supplier that streams a phrase word-by-word
- A Function that processes each word from supplier and capitalizes it
- A Consumer that receives each capitalized word from the function and prints it to
stdout
Once we’ve implemented the Java interfaces, we’ll wire them together with Spring Cloud Stream.
Supplier Implementation
The following Supplier implementation supplies a phrase word-by-word, indicating when it has processed the whole phrase.
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class WordSupplier implements Supplier<String> {
private final String[] words;
private final AtomicInteger idx = new AtomicInteger(0);
private final AtomicBoolean done = new AtomicBoolean(false);
public WordSupplier(String phrase) {
this.words = phrase.split(" ");
}
@Override
public String get() {
int i = idx.getAndAccumulate(words.length, (x, y) -> {
if (x < words.length - 1) {
return x + 1;
}
done.set(true);
return 0;
});
return words[i];
}
public boolean done() {
return done.get();
}
}
Next, let’s add Spring beans for our Supplier, a Function, and a Consumer. The toUpperCase
Function takes a string and capitalizes it, and the stdoutConsumer
Consumer prints each string it receives to stdout
.
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StreamConfiguration {
// Input phrase for the producer
@Value("${phrase}")
private String phrase;
// Function, Capitalizes each input
@Bean
public Function<String, String> toUpperCase() {
return String::toUpperCase;
}
// Consumer, Prints each input
@Bean
public Consumer<String> stdoutConsumer() {
return s -> System.out.println("Consumed: " + s);
}
// Supplier, WordSupplier
@Bean
public WordSupplier wordSupplier() {
return new WordSupplier(phrase);
}
}
Configure Beans with Spring Cloud Stream
Returning to the application properties, let’s configure the Spring Cloud Stream bindings for each bean defined previously.
In our binding configuration, the wordSupplier
Supplier has the toUpperCase
Function as a destination, and the stdoutConsumer
Consumer reads from toUpperCase. The result of this acyclic configuration is that each word from the phrase is converted to uppercase and sent to stdout.
# Input phrase for the wordSupplier
phrase: "Spring Cloud Stream simplifies event-driven microservices with powerful messaging capabilities."
spring:
cloud:
stream:
bindings:
wordSupplier-out-0:
# wordSupplier output
destination: toUpperCase-in-0
group: t1
producer:
required-groups:
- t1
stdoutConsumer-in-0:
# stdoutConsumer input
destination: toUpperCase-out-0
group: t1
function:
# defines the stream flow, toUppercase bridges
# wordSupplier and stdoutConsumer
definition: wordSupplier;toUpperCase;stdoutConsumer
If you’ve added the prior code to a Spring Boot application, you should see the following messages sent to stdout
when it is run:
Consumed: SPRING
Consumed: CLOUD
Consumed: STREAM
Consumed: SIMPLIFIES
Consumed: EVENT-DRIVEN
Consumed: MICROSERVICES
Consumed: WITH
Consumed: POWERFUL
Consumed: MESSAGING
Consumed: CAPABILITIES.