6 Kafka Java Client Interface for Oracle Transactional Event Queues
This chapter includes the following topics:
Apache Kafka Overview
Apache Kafka is a community distributed event streaming platform that is horizontally-scalable and fault-tolerant.
Kafka is run as a cluster run on one or more servers. Each Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. Kafka APIs allow application to connect to Kafka cluster and use Kafka messaging platform.
Kafka Java Client for Transactional Event Queues
Oracle Database 20c introduces Kafka application compatibility with Oracle database. This provides easy migration for Kafka Java applications to Transaction Event Queues (TEQ). The Kafka Java APIs can now connect to Oracle database server and use TEQ as a messaging platform.
Figure 6-1 Kafka Application Integration with Transactional Event Queue
The figure shows OKafka library, which contains Oracle specific implementation of Kafka's Java APIs. This implmentation internally invokes AQ-JMS APIs which in turn uses JDBC driver to communicate with Oracle Database.
Developers can now migrate an existing Java application that uses Kafka to the Oracle database. Oracle Database 20c provides client side library which allows Kafka applications to connect to Oracle Database instead of Kafka cluster and use TEQ's messaging platform transparently.
Configuring Kafka Java Client for Transactional Event Queues
Two levels of configuration are required to migrate Kafka application to TEQ messaging platform:
-
Database level configuration
-
Application level configuration.
Kafka application needs to set certain properties which will allow OKafka library to locate the Oracle Database. This is analogous to how Kafka application provides zoo keeper information. These connection properties can be set in the following two ways:
-
using database user and pasword provided in plain text
-
using JDBC wallet.
Prerequisites
The following are the prerequisites for configuring and running Kafka Java client for TEQ in an Oracle Database.
-
Create a database user.
-
Grant the following user privileges.
-
grant connect
,resource
touser.
-
grant execute
ondbms_aq
touser.
-
grant execute
ondbms_aqadm
touser.
-
grant execute
ondbms_aqin
touser.
-
grant execute
ondbms_aqjms
touser.
-
grant select_catalog_role
touser.
-
-
Set the correct database configuration parameter to use TEQ.
set streams_pool_size=400M
-
Set
LOCAL_LISTENER
database parameterset LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST NAME/ IP> )(PORT=<PORT NUMBER>))
Connection Configuration
The OKafka library connects to Oracle Database using JDBC Thin Driver. To setup this connection, Kafka application can provide username password in plain text or applications can configure SSL. To run Kafka application against Oracle Autonomous Transaction Processing (ATP) Database on Cloud, only SSL configuration is supported. You can connect to the Oracle Database using PLAINTEXT or SSL.
-
PLAINTEXT: In this protocol JDBC connection uses username and password to connect to Oracle instance.
To use plaintext protocol the user has to provide following properties through application
-
oracle.service.name = <name of the service running on the instance>
-
oracle.instance.name = <name of the Oracle Database instance>
-
bootstrap.servers = <host:port>
-
security.protocol
="PLAINTEXT" -
oracle.net.tns_admin
= <location of tnsnames.ora file
> (for parsing jdbc connection string)
The following properties in
ojdbc.properties
file andojdbc.properties
file should be in locationoracle.net.tns_admin
.-
user = <nameofdatabaseuser>
-
password = <userpassword>
-
-
SSL: To use ssl secured connections to connect to and ATP Database, perform the following steps.
-
JDBC Thin Driver Connection prerequisites for ssl security:
-
JDK8u162 or higher.
-
oraclepki.jar
,osdt_cert.jar
, andosdt_core.jar
-
18.3 JDBC Thin driver or higher( recommended)
-
-
To leverage JDBC ssl security to connect to Oracle Database instance, user has to provide following properties. JDBC supports ssl secured connections to Oracle Database in two ways.
-
Using wallets. To use wallets:
-
Add the required dependant jars for using Oracle Wallets in classpath.
Download
oraclepki.jar
,osdt_cert.jar
, andosdt_core.jar
files alomg with JDBC thin driver add these jars to classpath. -
Enable Oracle PKI provider
Add
OraclePKIProvider
at the end of filejava.security
(located at$JRE_HOME/jre/lib/security/java.security
) if SSO wallet, that is,cwallet.sso
is used for providing ssl security. For example,java.security
:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=com.sun.net.ssl.internal.ssl.Provider security.provider.4=com.sun.crypto.provider.SunJCE security.provider.5=sun.security.jgss.SunProvider security.provider.6=com.sun.security.sasl.Provider security.provider.7=oracle.security.pki.OraclePKIProvider
To use
ewallet.p12
for ssl security then placeOraclePKIProvider
before sun provider in filejava.security
. For example,java.security
:security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=oracle.security.pki.OraclePKIProvider security.provider.4=com.sun.net.ssl.internal.ssl.Provider security.provider.5=com.sun.crypto.provider.SunJCE security.provider.6=sun.security.jgss.SunProvider security.provider.7=com.sun.security.sasl.Provider
-
Provide following properties through application.
security.protocol = “”SSL” oracle.net.tns_admin = “location of tnsnames.ora file” (for parsing jdbc connection string) tns.alias = “alias of connection string in tnsnames.ora”
And following properties in
ojdbc.properties
file andojdbc.properties
file should be in locationoracle.net.tns_admin
.user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true oracle.net.wallet_location=“(SOURCE=(METHOD=FILE) (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
-
-
Using Java key store. To Provide JDBC SSL security with Java key store, provide following properties through the application:
security.protocol = "SSL" oracle.net.tns_admin = "location of tnsnames.ora file" tns.alias = "alias of connection string in tnsnames.ora"
And following properties in
ojdbc.properties
file andojdbc.properties
file should be in locationoracle.net.tns_admin
.user(in smallletters)=nameofdatabaseuser password(in smallletters)=userpassword oracle.net.ssl_server_dn_match=true javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks javax.net.ssl.trustStorePassword = password javax.net.ssl.keyStore= ${TNS_ADMIN}/keystore.jks javax.net.ssl.keyStorePassword="password"
Note:
tnsnames.ora
file in wallet downloaded from ATP contains jdbc connection string which is used for establishing jdbc .
-
-
Kafka Client Interfaces
Kafka applications mainly use Producer, Consumer, and Admin APIs to communicate with Kafka cluster. This version of Kafka client for TEQ supports only subset of Kafka 2.0's Producer, Consumer, and Admin APIs and properties.
Overview of Kafka Producer Implementation for TEQ
Producer APIs, allow Kafka application to publish messages into Oracle Transaction Event Queues. Kafka application needs to provide Oracle specific properties which are oracle.host
, oracle.port
, oracle.servicename
, and oracle.instancename
. More details about this properties are mentioned in configuration section. These properties are used to setup the database connection and produce the message into TEQ. In the current release, Oracle's implementation of KafkaProducer
supports only a subset of the APIs.
Internally, Oracle Kafka Producer object encapsulates AQJMS producer object which is used to publish messages into Oracle TEQ. Similar to Kafka Producer, Producer also stores messages in batches. Each send()
call will append a Kafka Record into a particular batch based on its Topic and Partition. A background thread will publish the entire batch, one at a time into Oracle TEQ. Each batch publish is committed by Producer. In current release, a topic can have only one partition and hence all KafkaRecords
will be published into a single partition of TEQ.
The following KafkaProducer
APIs are supported in Oracle Database 20c.
-
Constructor:
KafkaProducer
: Creates a producer object and internal AQ JMS objects.KafkaProducer
class has four types of constructors defined, which all takes configuration parameters as input. -
Methods:
-
send(ProducerRecord) , send(ProducerRecord, Callback)
:The
send
method asynchronously publishes a message into TEQ. This method returns immediately once a Kafka Record has been stored in the buffer of records waiting to be sent. If buffer memory is full, then send call blocks for a maximum of timemax.block.ms
. This allows sending many records in parallel without blocking to wait for the response after each one. Records will be published into the topic using AQ JMS.The result of the send is a
Future<RecordMetadata>
specifying the partition the record was sent to, the offset it was assigned and the timestamp of the record. Both the versionsend(ProducerRecord)
andsend(ProducerRecord, Callback)
will be supported. -
close
: Closes the producer, its sender thread and frees the accumulator. It also closes internal AQ JMS objects like connection, session JMS producer and so on.
-
-
Classes
-
ProducerRecord
: A class that represents a message in Kafka platform. It is translated into a message for TEQ platform, namely, AQ JMS message. Relevant fields likePayload
andKey
can be directly translated into TEQ payload and message key for TEQ. -
RecordMetadata
: This contains metadata of the record like topic, partition, offset, timestamp etc of the Record inKafkaPlatform
. This is assigned value relevant for TEQs. A message id of TEQ is converted into an offset ofRecordMetadata
. -
Callback Interface: A callback function which is executed once a Record is successfully published into Kafka topic.
-
Partitioner Interface: Defines methods which maps a
Key
of the message to a partition number of the topic. A partition number is analogous to a stream id of TEQs.
-
-
Properties
-
Key Serializer and Value Serializer: Converts Key and payload into byte array respectively. The Accumulator module will store the payloads in the form of byte array. Sender thread will then form an
AQjmsBytes
message and publish the message using AQ JMS Array Enqueue API. -
acks
: Forokafka
, only value relevant for acks property isall
. Any other field set by the user is ignored. -
linger.ms
: Time in miliseconds for which sender thread waits before publishing the records in TEQ. -
batch.size
: Total size of records to be batched in bytes for which sender thread waits before publishing records in TEQ. -
buffer.memory
: Total memory in bytes the accumulator can hold. -
max.block.ms
: Ifbuffer.memory
size is full in accumulator, then wait formax.block.ms
amount of time beforesend()
method can receive out of memory error. -
retries
: This property enables producer to resend the record in case of transient errors. This value is an upper limit on how many resends. -
retry.backoff.ms
: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios -
bootstrap.servers
: IP address and port of a machine where database instance running.
-
Overview of Kafka Consumer implementation for TEQ
Consumer API allows applications to read streams of data from Transactional Event Queue. Kafka consumer for TEQ uses AQ JMS APIs and use JDBC driver to consume messages from Oracle TEQ. For Oracle Kafka, consuming message from a topic implies dequeuing messages from Transactional Event Queue.
Similar to Kafka, in TEQ's implementation , a consumer group contains many consumer instances. Each consumer group has a unique group-id. Each consumer internally maintains a single connection/session to Oracle Database instance provided by bootstrap.servers
property. For this release, since a topic can have only one partition, only one of the consumer instances will be assigned this single partition. A partition once assigned to a consumer of a consumer group then remains with that consumer till the session is closed. No two consumers from same group are assigned same partition of a topic.
The following KafkaConsumer
APIs are supported in Oracle Database 20c.
-
Constructor:
KafkaConsumer
: Creates a consumer that allows the application to consume message from key based TEQ. Internal client side TEQ objects created are not visible to client application. All variation of theKafkaConsumer
constructor are supported in Oracle Database 20c. -
Methods:
-
Subscribe
: This method takes a list of topics to subscribe to. In Oracle Database 20c, only the first topic of the list will be subscribed to. An exception is thrown if size of list is > 1. This method creates a durable subscriber on TEQ server side with Group-Id as subscriber name. -
Poll
:poll
method returns a batch of messages from the assigned partition from TEQ. It attempts to dequeue a message from the key based TEQ for the subscriber. TEQ uses array dequeue API of AQ JMS to receive a batch of messages dequeued from the queue. The size of the batch depends on the parametermax.poll.records
set by the kafka client application.Poll
takes time in milliseconds as an argument. AQ JMS API of array dequeue can pass this timeout time as a dequeue option to the TEQ Server and make dequeue call wait for messages till the timeout time, if the full array batch is not complete.When poll is invoked for the first time, Oracle TEQ assigns a single available partition to this Kafka consumer. This assignment stays for the entire lifetime of the Kafka consumer. Messages returned belongs to the partition assigned to the consumer. One queue partition is assigned to each Kafka consumer. It is the responsibility of the application developer to start as many consumers as number of partitions of the queue. If the number of Kafka consumers are less than the number of partitions, then messages from unassigned partitions are never consumed. If the number of Kafka consumers are more than the number of partitions, then extra consumers will not be assigned any partition and hence, will not be able to consume any messages. No two consumer application will consume from same partition at the same time.
-
commitSync
: Commits all consumed messages. Commit to an offset is not supported in Oracle Database 20c. This call directly calls commit on the database which commits all consumed messages from TEQ. -
commitAsync
: This call is translated intocommitSync
. A callback function passed as argument gets executed once the commit is successful. -
Unsubscribe
: Unsubscribes the topic that it has subscribed to. A consumer can no longer consume messages from unsubscribed topics. This call does not remove a subscriber group from the TEQ metadata. Other consumer application can still continue to consume. -
close
: Closes the consumer and unsubscribes the topic it has subscribed to.
-
-
Class:
ConsumerRecord
: A class that represents a consumed record in Kafka platform. In Oracle Dataase 20c, AQ JMS message is converted intoConsumerRecord
. -
Properties:
-
key.deserializer
andvalue.deserialzer
: In Oracle TEQ's Key based partitioned queue key, value are stored as byte array in user property, payload of JMS message respectively. On consuming these byte arrays are deserialized into key, value having user provided format internally by the consumer usingkey.deserialize
andvalue.deserializer
respectively. -
group.id
: This is a consumer group name for which messages are consumed from the Kafka topic. This property is used as a durable subscriber name for key based TEQs. max.poll.records
: Maximum number of records to fetch in single array dequeue call from an Oracle TEQ server.-
fetch.max.wait.ms
: Maximum amount of time in milliseconds to wait for fetching messages if not available. -
enable.auto.commit
: Enables auto commit of consumed messages for every specified interval. -
auto.commit.interval.ms
: Interval in milliseconds for auto commit of messages. -
bootstrap.servers
: IP address and port of a machine where database instance running.
-
Overview of Kafka Admin Implementation for TEQ
Kafka admin API allows applications to perform administrative tasks like creating a topic, deeting a topic, add partition to a topic and so on. Oracle Database 21c supports the following admin APIs:
-
Methods
-
create(props)
andcreate(config)
: Creates an object ofKafkaAdmin
class that uses passed parameters. The user creates a database session which is used for further operations. Client application has to provideoracle.host
,oracle.port
,oracle.servicename
,oracle.instancename
,oracle.user
, andoracle.password
. These Oracle Database properties are used to setup the database connection. -
close()
: Closes a database session and Admin client. -
deleteTopic
: Stops and drops a TEQ.
-
-
Classes:
NewTopic
: Class used for creating a new topic. This class contains parameters with which a transactional event queue is created. -
Properties
-
bootstrap.servers
: IP address and port of a machine where database instance running.
-
Examples: How to Use
Example 6-1 Producer.java
import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.oracle.okafka.clients.admin.AdminClient; import org.oracle.okafka.clients.admin.CreateTopicsOptions; import org.oracle.okafka.clients.admin.NewTopic; import org.oracle.okafka.clients.producer.KafkaProducer; import org.oracle.okafka.clients.producer.ProducerRecord; import org.oracle.okafka.common.KafkaFuture; public class Producer { public static void main(String[] args) { if(args.length != 1) { System.out.println("Please provide topic name to produce messages."); return ; } String topic = args[0].trim(); KafkaProducer<String,String> prod = null; Properties props = new Properties(); props.put("oracle.instance.name", "kafka"); props.put("oracle.service.name", "kafka.regress.rdbms.dev.us.oracle.com"); props.put("oracle.user.name", "aq"); props.put("oracle.password", "aq"); props.put("bootstrap.servers", "localhost:1521"); props.put("batch.size", 200); props.put("linger.ms", 100); props.put("buffer.memory", 335544); props.put("key.serializer", "org.oracle.okafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.oracle.okafka.common.serialization.StringSerializer"); System.out.println("Creating producer now"); prod=new KafkaProducer<String, String>(props); System.out.println("Producer created."); try { int i; for(i = 0; i < 10; i++) prod.send(new ProducerRecord<String, String>(topic ,0, i+"000","This is new message"+i)); System.out.println("Sent "+ i + "messages"); } catch(Exception ex) { System.out.println("Failed to send messages:"); ex.printStackTrace(); } finally { prod.close(); } } }