Using the Event Hub Java SDK
Using the Event Hub Java SDK
The Event Hub Java SDK allows you to configure the publish/subscribe client to connect your application to Event Hub.
Including the Event Hub SDK in Your Project
Procedure
Event Hub Configuration
For the creation of a client connecting to the Event Hub, a configuration object (EventHubConfiguration
) must be present. The configuration object contains the Event Hub authentication and option details. Event Hub details can be set by environment variables or manually, using the Builder.
The following tables describe the configuration requirements:
Required Fields | Type | Description | Default Value |
---|---|---|---|
host | String | Event Hub host | |
port | int | Event Hub port | 443 |
zoneID | String | Predix zone ID for Event Hub and the OAuth2 client | |
clientID* | String | OAuth2 client ID | |
clientSecret* | String | OAuth2 client secret | |
authURL* | String | URL of OAuth2 provider, for example: https://<oauth provider>/oauth/token |
clientID
, clientSecret
, and authURL
are not required if the automaticTokenRenew
option is set to false.Optional Fields | Type | Description | Default |
---|---|---|---|
automaticTokenRenew | boolean | Token is automatically retrieved and cached for connections | True |
publishConfiguration | PublishConfiguration | Defines publish parameters. | Defines publish type and other parameters PublishAsyncConfiguration with default configuration |
subscribeConfiguration | SubscribeConfiguration | Defines subscription parameters | SubscribeConfiguration with:
|
authScopes | String | Used to request specific | null |
reconnetRetryLimit | int | Specify the number of retries to attempt | Depends on the connection error |
Configuring Manually with the Builder
You can manually set each Event Hub and authentication parameter if the Event Hub and UAA instances are not bound to the application. The methods used to set the parameters can also be used in conjunction with fromEnvironmentVariables()
.
EventHubConfiguration configuration = new EventHubConfiguration.Builder()
.host("<Event Hub Service Host>")
.port("<Event Hub Service Port>")
.zoneID("<Event Hub Zone ID>")
.clientID("<Client_ID>")
.clientSecret("<Client_Secret>")
.authURL("<UAA URI>")
.build();
Configuring Event Hub–Using Environment Variables or Setting Manually
Event Hub configuration details can be set as environment variables or hard coded.
Configuring Environmental Variables and VCAP_SERVICES
fromEnvironmentVariables()
(host, port, zone id). Note that VCAP_SERVICE
will already be populated by Cloud Foundry as long as the Event Hub and UAA instances are bound to your application.Environmental Variable Name | Description |
---|---|
EVENTHUB_INSTANCE_NAME | Event Hub service instance name |
UAA_INSTANCE_NAME | UAA service instance name |
CLIENT_ID | OAuth2 client ID |
CLIENT_SECRET | OAuth2 client secret |
VCAP_SERVICES | Defined by Cloud Foundry if the services are bound |
fromEnvironmentVariables()
automatically sets all the Event Hub and Oauth2 information. This allows the Event Hub service instance to change without requiring a change in the code. EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
.fromEnvironmentVariables().build();
EVENTHUB_INSTANCE_NAME
and UAA_INSTANCE_NAME
are not set through environmental variables, they can be defined in the code using the Builder. Note that CLIENT_ID and CLIENT_SECRET are still required. EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
.fromEnvironmentVariables("<event-hub-instance-service-name>", "<uaa-instance-service-name>")
.build();
The CLIENT_ID
and CLIENT_SECRET
can be set in tandem with fromEnvironmentVariables(…)
// If the clientID and clientSecret are not in environment variables
EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
.clientID("id")
.clientSecret("secret")
.fromEnvironmentVariables("event-hub-instance-service-name", "uaa-instance-service-name")
.build();
Using Event Hub Without the Predix UAA Instance
The following example does not require an UAA instance to be bound. It pulls only the Event Hub information from VCAP_SERVICES leaving all OAuth2 details as null. This is useful if the OAuth2 or authentication provider is not in Predix. However, setAuthToken() must be called to set a token to connect to Event Hub and automaticTokenRenew() must be false.
EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
.fromEnvironmentVariables("event-hub-instance-service-name")
.automaticTokenRenew(false)
.build();
The following pulls only the Event Hub information from VCAP_SERVICES leaving all OAuth2 details as null, so a bound UAA service instance is not required. This is useful if the OAuth2 or authentication provider is not in Predix. However, setAuthToken()
must be called to set a token to connect to Event Hub and automaticTokenRenew()
must be false.
EventHubConfiguration eventHubConfiguration =newEventHubConfiguration.Builder()
.fromEnvironmentVariables("event-hub-instance-service-name")
.automaticTokenRenew(false)
.build();
Connecting to the Event Hub
Client eventHubClient = new Client(EventHubConfiguration configuration);
Publish Configuration
There are publish and subscribe details you can configure after setting the Event Hub and authentication details.
The EventHubConfiguration
includes a default publish configuration. You can use the following configuration options to change from the default parameter settings.
- acks – positive acknowledgements for successful message ingestion
- nacks – negative acknowledgements in which the
STATUS
field describes the error
- async – This is the default configuration allowing acks to be received asynchronously between the time the message was sent and when the ack was received via a callback.
- sync – Blocks and waits for acks and nacks to be received in a given timeout before continuing.
acks
and nacks
should equal the number of messages sent. However, if the messages never reached the Event Hub, no acknowledgements are returned. The acks
and nacks
received are not guaranteed to be in order, therefore, using the id
to identify the acks is suggested.Shared Options | Type | Description | Default |
---|---|---|---|
maxAddedMessages | int | Defines the maximum number of messages that can be added before sending to prevent too many messages from accumulating. | 999 |
Example
EventHubConfiguration configuration =newEventHubConfiguration.Builder()
...
.publishConfiguration(newPublishAsyncConfiguration.Builder()
.maxAddedMessages(500)
.build())
...
.build();
Async Configuration
The async
publishing mode uses the PublishAsyncConfiguration
configuration. The async
parameters determine the type of and rate of acknowledgements sent from Event Hub.
Async Configuration Options | Type | Description | Default |
---|---|---|---|
ackType | AcknowledgementOptions | Defines the type of acknowledgements that are returned. Options include:
| ACKS AND NACKS |
cacheAckIntervalMillis | long | Defines how long Event Hub caches the acks before sending them. Values range form 100ms to 1000ms. | 500 |
Example
EventHubConfiguration configuration =newEventHubConfiguration.Builder()
...
.publishConfiguration(newPublishAsyncConfiguration.Builder()
.ackType(PublishAsyncConfiguration.AcknowledgementOptions.NONE)
.cacheAckIntervalMillis(200)
.build())
...
.build();
Sync Configuration
The sync
publishing mode uses the PublishSyncConfiguration
configuration. When publishing, the publisher will block and wait for the acks
and nacks
. However, the total throughput may be less than async
.
Sync Configuration Options | Type | Description | Default |
---|---|---|---|
timeout | long | Sets maximum timeout, in milliseconds, to wait for acks and nacks . | 1000 |
Example
EventHubConfiguration configuration =newEventHubConfiguration.Builder()
...
.publishConfiguration(newPublishSyncConfiguration.Builder()
.timeout(2000)
.build())
...
.build();
Shared
The field maxAddedMessages
can be used in both sync
and async
publish modes:
Shared Publish Config Options | Type | Description | Default |
---|---|---|---|
maxAddedMessages | int | Defines the maximum number of messages that can be added before sending to prevent too many messages from accumulating. | 999 |
ventHubConfiguration configuration = new EventHubConfiguration.Builder()
...
.publishConfiguration(new PublishAsyncConfiguration.Builder()
.maxAddedMessages(500)
.build())
...
.build();
EventHubConfiguration configuration = new EventHubConfiguration.Builder()
...
.publishConfiguration(new PublishSyncConfiguration.Builder()
.maxAddedMessages(500)
.build())
...
.build();
Sending Messages
Messages are added to the client before sending. A message contents contains the following three fields:
Message Field | Type | Description |
---|---|---|
id | String |
A user configurable field to identify the returned acks. |
body* | String | A string to store and send information. |
tag* | Map<String,String> | A map to store and send information. |
Message.newBuilder().setId("id").setZoneId("zoneId")
.setBody(ByteString.copyFromUtf8("body"));
eventHubClient.addMessage(String id, String body, Map<String, String> tags);
eventHubClient.addMessage(Message newMessage);
eventHubClient.addMessages(Messages newMessages);
eventHubClient.addMessages(newMessages).addMessage("1","body",null);
- flush() sends all added messages
- send (Message message) sends only the passed in message and ignores added messages
Receiving Acknowledgements
In sync mode, the send methods returns List<Ack> which contains the acks for the messages sent. Async mode does not return anything because the acks are received in a callback.
eventHubClient.flush(); // if async mode
List<Ack> messages = eventHubClient.flush(); // if sync mode
eventHubClient.send(Message message); // if async mode
List<Ack> message = eventHubClient.send(Message message); // if sync mode
public interface PublishCallback {
void onAck(List<Ack> acks);
void onFailure(Throwable throwable);
}
acks and nacks are returned through the callback's onAck() method. Connection closure or errors are returned through the callback's onFailure() method.
eventHubClient.registerPublishCallback(Client.PublishCallback callback);
To reopen the stream after, call either forceRenewToken() or setAuthToken() which will use the new token and reconnect the stream.
Subscribe Configuration
Only one configuration is allowed per client. To create additional subscriptions,or to use a different subscriber name or ID, create additional clients. Each Event Hub sends a unique subscriberName
in all the messages. If there are multiple subscriberInstance
per subscriberName
then collectively all instances of that subscriberName
will receive all the messages.
Subscribe Configuration Options | Type | Description | Default |
---|---|---|---|
subscriberName | String | Sets the subscriber name. | default-subscriber-name |
subscriberInstance | String | Sets the instance of the subscriber. | default-subscriber-id |
retryInterval | int | Set time between each retry attempt when resending messages. | 30 seconds |
durationBeforeFirstRetry | String | Set the time to wait before attempting to resend a message. |
Example
EventHubConfiguration configuration = new EventHubConfiguration.Builder()
...
.subscribeConfiguration(new SubscribeConfiguration.Builder()
.subscriberName("name")
.retryIntervalSeconds(30)
.subscribeRecency(EventHubConfiguration.SubscribeRecency.NEWEST)
.build())
...
.build();
Client eventHubClient = new Client(eventHubConfiguration);
Subscribing to Events
Single Subscription
After a Client is created, then Subscribe or subscribeWithAcks can be called by providing a callback to receive the messages. The callback must implement the following interface.
public interface SubscribeCallback {
void onMessage(Message message); //Called when new message received
void onFailure(Throwable throwable); //Called with stream or connection issue
}
eventHubClient.subscribe(Client.SubscribeCallback callback);
A received message triggers the onMessage() method. Similarly to the publish callback, onFailure() is called if there is a stream or connection issue.
- subscribe creates a stream of messages directed to the provided callback.
eventHubClient.subscribe(Client.SubscribeCallback callback);
- SubscribeWithAck also creates a stream of messages for the provided callback. The difference is that the received messages must be explicitly acknowledged. If the messages are not acknowledged (acked), the service will continually redeliver the messages.
eventHubClient.subscribeWithAck(Client.SubscribeCallback callback);
Messages can be acknowledged as follows:
eventHubClient.sendAck(Message message);
eventHubClient.sendAcks(Messages messages);
unsubscribe() can be called to close the subscription stream, eventHubClient.unsubscribe();.
Either subscribe or subscribeWithAck can be used to recreate the subscription. Note that creating subscribe and subscribeWithAck subscriptions is not supported.
Multiple Subscriptions
Only one subscription persubscriberName
is allowed. Event Hub sends each uniquesubscriberName
or subscription all stored messages. If there are multiple clients persubscriberName
(five maximum) those clients collectively receive all of the messages. That is, the stored messages are divided and dispersed among the clients for thatsubscriberName
. Note that this can be used to increase the speed at which messages are retrieved from EventHub for a subscription.Closing Connections
eventHubClient.shutdown();
Automatic Reconnection
By default, the SDK will attempt to reconnect to the Event Hub upon disconnection. It employs a back off with increasing delays which recycles based on the error. The following are some of the errors and how they are handled.
Error Code | Possible Reason | Reconnect Behavior |
---|---|---|
| Event Hub not available | SDK tries to reconnect until connection is established. |
| Could not retrieve OAuth token | SDK tries to retrieve token repeatedly and if successful, reconnects to the Event Hub. |
| Permission denied for OAuth user | SDK makes three attempts to reconnect before failing. After failing a new connection must be created. There are no more attempts to reconnect. must be created |
OAuth2 Token Management
Renewing the Token
Given OAuth2 parameters are set, the default configuration will renew and cache the token as needed. However, there are methods for finer control of the token. automaticTokenRenew
is an optional configuration which has a default of True
. Setting to False
prevents the automatic retrieval of tokens and the use of the SDK to get a token
EventHubConfiguration configuration = new EventHubConfiguration.Builder()
...
.automaticTokenRenew(Boolean bool)
.build()
Use the following method to update the tokens manually.
eventHubClient.setAuthToken(String token);
Both forceRenewToken()
and setAuthToken()
restart the connection for the client to ensure the new tokens are active.
Requesting Specific Scopes
The SDK tries to requests all Event Hub scopes associated to the publish and subscribe by default. If it fails it will fall back to trying only either publish or subscribe scopes. However depending on the number of scopes the auth user has, the token length passed to Event Hub must not exceed 60,000 characters else the client may not be able to authenticate with Event Hub. Specifics scopes can also be passed into a EventHubConfiguration
via authScopes
(String scopes) where string
is all the scopes comma separated. The following are the Event Hub scopes:
Scopes | Use |
---|---|
predix-event-hub.zones. .user | Required for any token |
predix-event-hub.zones. .grpc.publish | Required to publish |
predix-event-hub.zones. .grpc.subscribe | Required to subscribe |
Debugging the Event Hub SDK
Environment Variable Name | Description |
---|---|
EVENTHUB_ENABLE_DEBUG | If "true", debug statements are logged. |
Building the SDK from the Repository
After pulling the project, do a mvn clean compile which will automatically generate the protobuf classes.
This SDK uses Proton v3, which must also be used when compiling the proton file manually. Link to the Protobuf compiler protoc (if generating manually):
https://github.com/google/protobuf#protocol-compiler-installationCommon Issues
Jetty ALPN/NPN has not been properly configuredIn order to use SSL over HTTP2, BoringSSL needs to be used. The following dependency needs to be listed first in the pom.xml of the project using the Java SDK.
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>1.1.33.Fork23</version>
</dependency>
<repositories>
<repository>
<id>netty-repo-2</id>
<url>http://repo1.maven.org/maven2/</url>
</repository>
</repositories>
Debugging Event Hub
Procedure
Environment Variable Name | Description |
---|---|
EVENTHUB_ENABLE_DEBUG | If set to true , debug statements will be logged. |
Building the Event Hub SDK
Build the Event Hub SDK from the repository.
Procedure
- Automatically create Protobuf classes from the proto file.
After pulling the project, do a
mvn clean compile
, which will automatically generate the protobuf classes. - Manually create Protobuf classes from the proto file.
The version of Protobuf used in the SDK is v3, therefore, when compiling the proto file, be sure to use version 3 if you build the Protobuf class manually. See https://github.com/google/protobuf#protocol-compiler-installation for information about the Protobuf compiler protoc.
Best Practices
Consider the following best practices when you work with the Event Hub service.
- Read acks after you publish messages, unless acks are not explicitly enabled.
- Limit the number of subscriptions to not more than seven per tenant or zone ID.
- Subscriptions stay in effect until you delete them. Do not create a subscription client in a loop to read data. Instead, consider using one subscription to read data.
- You can create a publisher to both publish and stream messages. There is no need to open and close such a publisher for every message.
- Check credentials for bad authentication. Refrain from connection dialing in a loop.
- Do not publish messages larger than 1 MB. Keep the total size of publish requests, including arrays of messages, to not more than 1 MB. Tags data is included in the 1 MB size limit.
- For web sockets, send data as binary JSON.
- Use tag fields to store metadata, which allows for easy access to the specific types of data instead of data ingestion to multiple zone IDs.
- If a subscriber does not receive messages, check for another running instance of the subscriber with an identical name. Messages are distributed for subscriber connections with the same name for the same topic.
Maximum Message Size for Message Published to the Event Hub
The Event Hub service supports message batching. When you call addMessage
multiple times, the messages are added to a publish request. When the application calls flush()
, the request is sent to the Event Hub service. The Event Hub service serializes the messages and then adds them to stable storage.
The size of serialized messages in a publish request cannot exceed 900*1024 bytes. Call msg.getSerializedSize()
before adding messages to the publish request to ensure that this limit size is not exceeded. This is to ensure that the messages are rejected at the client side instead of messages being sent to the server and then being rejected. Any messages that exceed 921,600 bytes will immediately throw an exception, AddMessageException
with an error message indicating that the message size has exceeded.
After you serialize the messages, call the client.publishClient.messages
to get the previous messages and flush them. This step ensures that the data that has been added until now is flushed.
From a network I/O perspective, sending multiple small-size messages in a batch is optimal and is the recommended practice.