One key Eventuate.io design principle is to leverage existing infrastructure technologies as much as possible. Specifically rather than reinventing storage and messaging technologies, it uses relational databases, such as MySQL and Postgres and message brokers, such as Apache Kafka and RabbitMQ. Eventuate glues together the database and message broker using the Transactional Outbox Pattern. The Eventuate CDC service uses either Transaction log tailing or Polling publisher patterns to pull messages from the outbox table and publish them to the message broker.
I must confess that I’ve mostly ignored the Oracle database for longer than I remember. But while implementing some enhancements for an Eventuate customer that uses Oracle, I was reminded of an intriguing alternative to the Transactional Outbox pattern: Oracle’s transactional event queues (formerly known as Advanced Queuing). The new Oracle 23c Free - developer release provides a great opportunity to explore this alternative. It’s freely available - no license to sign! And, it supports the Oracle Kafka API, which claims to allow Kafka applications to use Oracle’s transactional event queues with minimal code changes.
This article describes what I’ve learned so far in my investigation of Oracle 23c Free.
Running Oracle 23c Free is remarkably easy:
docker run -it container-registry.oracle.com/database/free:latest
Or at least it is easy if you have a Intel-based Mac. The container image only supports Intel and doesn’t run on an M1-based Mac. As a result, I had to develop using a Gitpod environment. Gitpod worked reasonably well although downloading the large database image was really slow.
Once the database container was running, I then dove into the Oracle documentation to learn how to do the following:
KafkaAdmin
to do thisLet’s look at each step.
Creating a queue using PL/SQL is straightforward:
call DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(queue_name, queue_payload_type => 'json').
Since my goal was to send JSON messages, I specified the queue_payload_type
to be json
.
My code invoked the stored procedure using jdbcTemplate.call()
.
The Kafka Producer can be used to send messages.
However, since the goal is to send a message in the same transaction that updates business entities, the sending must be done using JDBC.
Fortunately, sending a message using JDBC also appeared to be straightforward using the DBMS_AQ.ENQUEUE()
stored procedure:
DBMS_AQ.ENQUEUE(queue_name, enqueue_options, message_properties, json(message), msgid);
The json()
function converts the string message to a JSON value.
Since enqueue_options
and message_properties
arguments are PL/SQL records I wrote a wrapper stored procedure that was easier to invoke using jdbcTemplate.call()
.
So far so good.
Let’s now look at using the Kafka Java client to consume messages from the queue.
The first thing I discovered is that you don’t use the regular Kafka Client.
Instead, you must use an Oracle-specific replacement called okafka
.
The API is very similar to the Kafka client but the package name is different: org.oracle.okafka.clients.consumer.KafkaConsumer
There are also various limitations.
Most notably you cannot commit specific offsets.
Once I’d added the right dependencies to the project and overrode the Maven BOM to downgrade to the Kafka Client 2.8.1, I had a consumer running.
Sadly, however, it didn’t receive any messages.
Moreover, the ConsumerRebalanceListener
was never invoked.
I carefully studied the docs to see what I was doing wrong.
dbms_teqk.aq$_create_kafka_topic()
While there wasn’t anything obviously wrong, I noticed that the example wasn’t creating queues using DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE()
.
Instead, it was using dbms_teqk.aq$_create_kafka_topic()
.
I then changed the queue creation code to use this apparently undocumented stored procedure.
After I changed the queue creation code, the DBMS_AQ.ENQUEUE()
call started failing.
Apparently, the queue’s payload type was now JMS message instead of JSON
.
This makes sense since the Oracle Kafka client actually uses Oracle JMS.
I then changed the message sending code to create a JMS binary message.
I then got this error:
ORA-25600: Invalid shard: Input shard does not match with shard in the queue
ORA-06512: at "SYS.DBMS_AQ", line 240
Fixing this error took a remarkably long time. I could not find any relevant documentation on message/shard keys. ChatGPT was equally confused, especially since it denied that Oracle 23c existed.
Somehow I eventually discovered that I needed to call this stored procedure after creating the queue:
DBMS_AQADM.SET_QUEUE_PARAMETER(?,'KEY_BASED_ENQUEUE', 1)
According to the Oracle documentation:
When set, the shard to which a message gets enqueued is determined by the key value specified in the message. Refer to key-based sharding (link) for more details. This parameter cannot be unset once set.
Sadly, ‘link’ isn’t a link and so I couldn’t find out more information about key-based sharding.
But after making this change, DBMS_AQ.ENQUEUE()
started working.
I also followed ChatGPT’s suggestion to specify the message/shard key using the correlation
property of message_properties
:
message_properties.correlation = shard_key;
No idea, however, if this is correct.
Once I’d made the above changes, the consumer started to work.
The ConsumerRebalanceListener
was invoked.
And, the consumer received messages.
Yay!
However, even though the consumer appears to work, a background thread repeatedly generated the following error:
java.sql.SQLException: ORA-06533: Subscript beyond count
ORA-06512: at "SYS.DBMS_TEQK", line 825
ORA-06512: at "SYS.DBMS_TEQK", line 790
ORA-06512: at line 1
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:630)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:564)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1231)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:772)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:299)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:512)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:159)
at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1237)
at oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1820)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1472)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3761)
at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:4136)
at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1014)
at org.oracle.okafka.clients.consumer.internals.AQKafkaConsumer.syncGroup(AQKafkaConsumer.java:948)
My code appears to work, at least partially.
But I don’t know why.
And I don’t know how to fix ORA-06533: Subscript beyond count
.
Yet another day of 21st century software development - a maze of twisty little passages, all alike.
Suggestions welcome!
The next step is to investigate the ORA-06533: Subscript beyond count
error.
I’ll also write a simple application that updates a business entity and sends a message to a queue within the same transaction.
I’m assuming that this will work as expected but …
Microservices.io is brought to you by Chris Richardson. Experienced software architect, author of POJOs in Action, the creator of the original CloudFoundry.com, and the author of Microservices patterns.
Chris helps clients around the world adopt the microservice architecture through consulting engagements, and training workshops.
Chris teaches comprehensive workshops for architects and developers that will enable your organization use microservices effectively.
Avoid the pitfalls of adopting microservices and learn essential topics, such as service decomposition and design and how to refactor a monolith to microservices.
Learn moreChris offers numerous other resources for learning the microservice architecture.
Want to see an example? Check out Chris Richardson's example applications. See code
Got a specific microservice architecture-related question? For example:
Consider signing up for a two hour, highly focussed, consulting session.
My virtual bootcamp, distributed data patterns in a microservice architecture, is now open for enrollment!
It covers the key distributed data management patterns including Saga, API Composition, and CQRS.
It consists of video lectures, code labs, and a weekly ask-me-anything video conference repeated in multiple timezones.
The regular price is $395/person but use coupon MECNPWNR to sign up for $120 (valid until May 16th, 2023). There are deeper discounts for buying multiple seats.
Take a look at my Manning LiveProject that teaches you how to develop a service template and microservice chassis.
Engage Chris to create a microservices adoption roadmap and help you define your microservice architecture,
Use the Eventuate.io platform to tackle distributed data management challenges in your microservices architecture.
Eventuate is Chris's latest startup. It makes it easy to use the Saga pattern to manage transactions and the CQRS pattern to implement queries.
Engage Chris to conduct an architectural assessment.
Note: tagging is work-in-process
anti-patterns · application api · application architecture · architecting · architecture documentation · assemblage · beer · containers · dark energy and dark matter · deployment · design-time coupling · development · devops · docker · eventuate platform · glossary · hexagonal architecture · implementing commands · implementing queries · inter-service communication · kubernetes · loose coupling · microservice architecture · microservice chassis · microservices adoption · microservicesio updates · multi-architecture docker images · observability · pattern · refactoring to microservices · resilience · sagas · security · service api · service collaboration · service design · service discovery · service granularity · service template · software delivery metrics · success triangle · tacos · team topologies · transaction management · transactional messaging