Using the Event Hub Java SDK

Using the Event Hub Java SDK

The Event Hub Java SDK allows you to configure the publish/subscribe client to connect your application to Event Hub.

Including the Event Hub SDK in Your Project

Procedure

  1. Update your Maven settings to use the Predix platform Artifactory by copying the following into the <servers> block of your settings.xml file, located in your ~/.m2 folder:
    <server>
        <id>artifactory.external</id>
        <username><predix-user-name></username>
        <password><predix-password></password>
    </server>

    where <predix-user-name> and <predix-password> are your Predix.io user name and password.

  2. Add the repository to the POM file located in your project's root directory:
    
    <repositories>
    ...
      <repository>
        <id>artifactory.snapshots</id>
        <name>artifactory.snapshots</name>
        <url>https://artifactory.predix.io/artifactory/PREDIX-EXT</url>
      </repository>
    ...
    </repositories>
  3. A common issue is caused when Jetty ALPN/NPN is not properly configured. When using SSL over HTTP2, use BoringSSL.
    The following dependency needs to be added first in the dependencies in the pom.xml of the project using the Java SDK.
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-tcnative-boringssl-static</artifactId>
        <version>1.1.33.Fork23</version>
    </dependency>
    <repositories>
      <repository>
        <id>netty-repo-2</id>
        <url>http://repo1.maven.org/maven2/</url>
      </repository>
    </repositories>
  4. Add the Event Hub SDK as a dependency in your POM file:
    <dependencies>
    ...
      <dependency>
        <groupId>com.ge.predix.eventhub</groupId>
        <artifactId>predix-event-hub-sdk</artifactId>
        <version>1.2.11</version>
      </dependency>
    ...
    </dependencies>

Event Hub Configuration

For the creation of a client connecting to the Event Hub, a configuration object (EventHubConfiguration) must be present. The configuration object contains the Event Hub authentication and option details. Event Hub details can be set by environment variables or manually, using the Builder.

The following tables describe the configuration requirements:

Table 1. Required Configuration Fields
Required FieldsTypeDescriptionDefault Value
hostStringEvent Hub host
portintEvent Hub port443
zoneIDStringPredix zone ID for Event Hub and the OAuth2 client
clientID*StringOAuth2 client ID
clientSecret*StringOAuth2 client secret
authURL*StringURL of OAuth2 provider, for example:

https://<oauth provider>/oauth/token

Note: The clientID, clientSecret, and authURL are not required if the automaticTokenRenew option is set to false.
Table 2. Optional Configuration Fields
Optional FieldsTypeDescriptionDefault
automaticTokenRenewbooleanToken is automatically retrieved and cached for connectionsTrue
publishConfigurationPublishConfigurationDefines publish parameters. Defines publish type and other parameters PublishAsyncConfiguration with default configuration
subscribeConfigurationSubscribeConfigurationDefines subscription parametersSubscribeConfiguration with:
  • default-subscriber-name – default subscriber name, retry interval "30 seconds", and
  • "default-subscriber-id" – default instance.
authScopesStringUsed to request specific null
reconnetRetryLimitintSpecify the number of retries to attemptDepends on the connection error

Configuring Manually with the Builder

You can manually set each Event Hub and authentication parameter if the Event Hub and UAA instances are not bound to the application. The methods used to set the parameters can also be used in conjunction with fromEnvironmentVariables().

EventHubConfiguration configuration = new EventHubConfiguration.Builder()
          .host("<Event Hub Service Host>")
          .port("<Event Hub Service Port>")
          .zoneID("<Event Hub Zone ID>")
          .clientID("<Client_ID>")
          .clientSecret("<Client_Secret>")
          .authURL("<UAA URI>")
          .build();

Configuring Event Hub–Using Environment Variables or Setting Manually

Event Hub configuration details can be set as environment variables or hard coded.

Configuring Environmental Variables and VCAP_SERVICES

If the application is running on Predix, the most flexible way to connect to Event Hub is to specify the following details in the environment variables. By doing so, the required configuration fields will be set automatically through the method fromEnvironmentVariables() (host, port, zone id). Note that VCAP_SERVICE will already be populated by Cloud Foundry as long as the Event Hub and UAA instances are bound to your application.
Environmental Variable NameDescription
EVENTHUB_INSTANCE_NAMEEvent Hub service instance name
UAA_INSTANCE_NAMEUAA service instance name
CLIENT_IDOAuth2 client ID
CLIENT_SECRETOAuth2 client secret
VCAP_SERVICESDefined by Cloud Foundry if the services are bound
When the above environment variables are set, fromEnvironmentVariables() automatically sets all the Event Hub and Oauth2 information. This allows the Event Hub service instance to change without requiring a change in the code.
EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
    .fromEnvironmentVariables().build();
If the EVENTHUB_INSTANCE_NAME and UAA_INSTANCE_NAME are not set through environmental variables, they can be defined in the code using the Builder. Note that CLIENT_ID and CLIENT_SECRET are still required.
EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
    .fromEnvironmentVariables("<event-hub-instance-service-name>", "<uaa-instance-service-name>")
    .build();

The CLIENT_ID and CLIENT_SECRET can be set in tandem with fromEnvironmentVariables(…)

// If the clientID and clientSecret are not in environment variables
EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
    .clientID("id")
    .clientSecret("secret")
    .fromEnvironmentVariables("event-hub-instance-service-name", "uaa-instance-service-name")
    .build();

Using Event Hub Without the Predix UAA Instance

The following example does not require an UAA instance to be bound. It pulls only the Event Hub information from VCAP_SERVICES leaving all OAuth2 details as null. This is useful if the OAuth2 or authentication provider is not in Predix. However, setAuthToken() must be called to set a token to connect to Event Hub and automaticTokenRenew() must be false.

EventHubConfiguration eventHubConfiguration = new EventHubConfiguration.Builder()
    .fromEnvironmentVariables("event-hub-instance-service-name")
    .automaticTokenRenew(false)
    .build();

The following pulls only the Event Hub information from VCAP_SERVICES leaving all OAuth2 details as null, so a bound UAA service instance is not required. This is useful if the OAuth2 or authentication provider is not in Predix. However, setAuthToken() must be called to set a token to connect to Event Hub and automaticTokenRenew() must be false.

EventHubConfiguration eventHubConfiguration =newEventHubConfiguration.Builder()
    .fromEnvironmentVariables("event-hub-instance-service-name")
    .automaticTokenRenew(false)
    .build();

Connecting to the Event Hub

After the configuration object is created, a Event Hub Client can be created. This client handles both publishing and subscribing.
Client eventHubClient  = new Client(EventHubConfiguration configuration);

Publish Configuration

There are publish and subscribe details you can configure after setting the Event Hub and authentication details.

The EventHubConfiguration includes a default publish configuration. You can use the following configuration options to change from the default parameter settings.

There are two acknowledgements or publishing modes:
  • acks – positive acknowledgements for successful message ingestion
  • nacks – negative acknowledgements in which the STATUS field describes the error
There are two publishing modes:
  • async – This is the default configuration allowing acks to be received asynchronously between the time the message was sent and when the ack was received via a callback.
  • sync – Blocks and waits for acks and nacks to be received in a given timeout before continuing.
Note: The total number of acks and nacks should equal the number of messages sent. However, if the messages never reached the Event Hub, no acknowledgements are returned. The acks and nacks received are not guaranteed to be in order, therefore, using the id to identify the acks is suggested.
There are some shared configurations that you can use in both the sync and async publish modes:
Shared OptionsTypeDescriptionDefault
maxAddedMessagesintDefines the maximum number of messages that can be added before sending to prevent too many messages from accumulating.999

Example

EventHubConfiguration configuration =newEventHubConfiguration.Builder()
          ...
          .publishConfiguration(newPublishAsyncConfiguration.Builder()
            .maxAddedMessages(500)
            .build())
          ...
          .build();

Async Configuration

The async publishing mode uses the PublishAsyncConfiguration configuration. The async parameters determine the type of and rate of acknowledgements sent from Event Hub.

Async Configuration OptionsTypeDescriptionDefault
ackTypeAcknowledgementOptionsDefines the type of acknowledgements that are returned.
Options include:
  • ACKS_AND_NACKS – Receives both acks and nacks.
  • NACKS_ONLY – Receives only nacks.
  • NONE – No acknowledgements are returned. Errors will continue to be returned.
ACKS AND NACKS
cacheAckIntervalMillislongDefines how long Event Hub caches the acks before sending them. Values range form 100ms to 1000ms.500

Example

EventHubConfiguration configuration =newEventHubConfiguration.Builder()
          ...
          .publishConfiguration(newPublishAsyncConfiguration.Builder()
            .ackType(PublishAsyncConfiguration.AcknowledgementOptions.NONE)
            .cacheAckIntervalMillis(200)
            .build())
          ...
          .build();

Sync Configuration

The sync publishing mode uses the PublishSyncConfiguration configuration. When publishing, the publisher will block and wait for the acks and nacks. However, the total throughput may be less than async.

Sync Configuration OptionsTypeDescriptionDefault
timeoutlongSets maximum timeout, in milliseconds, to wait for acks and nacks.1000

Example

EventHubConfiguration configuration =newEventHubConfiguration.Builder()
          ...
          .publishConfiguration(newPublishSyncConfiguration.Builder()
              .timeout(2000)
              .build())
          ...
          .build();

Shared

The field maxAddedMessages can be used in both sync and async publish modes:

Shared Publish Config OptionsTypeDescriptionDefault
maxAddedMessagesintDefines the maximum number of messages that can be added before sending to prevent too many messages from accumulating.999
ventHubConfiguration configuration = new EventHubConfiguration.Builder()
          ...
          .publishConfiguration(new PublishAsyncConfiguration.Builder()
            .maxAddedMessages(500)
            .build())
          ...
          .build();

          EventHubConfiguration configuration = new EventHubConfiguration.Builder()
          ...
          .publishConfiguration(new PublishSyncConfiguration.Builder()
            .maxAddedMessages(500)
            .build())
          ...
          .build();

Sending Messages

Messages are added to the client before sending. A message contents contains the following three fields:

Message FieldTypeDescription
idString

A user configurable field to identify the returned acks.

body*StringA string to store and send information.
tag*Map<String,String>A map to store and send information.
Note: Tag is an optional fields.
The code for sending a message is as follows:
Message.newBuilder().setId("id").setZoneId("zoneId")
        .setBody(ByteString.copyFromUtf8("body"));
There are three methods you can use to add a message:
eventHubClient.addMessage(String id, String body, Map<String, String> tags);
eventHubClient.addMessage(Message newMessage);
eventHubClient.addMessages(Messages newMessages);
These methods can also be chained together:
eventHubClient.addMessages(newMessages).addMessage("1","body",null);
Once the messages haves been added, two methods can be used to send the messages:
  • flush() sends all added messages
  • send (Message message) sends only the passed in message and ignores added messages

Receiving Acknowledgements

In sync mode, the send methods returns List<Ack> which contains the acks for the messages sent. Async mode does not return anything because the acks are received in a callback.

eventHubClient.flush(); // if async mode
List<Ack> messages = eventHubClient.flush(); // if sync mode
eventHubClient.send(Message message); // if async mode
List<Ack> message = eventHubClient.send(Message message); // if sync mode
When using async mode, a callback is used to receive acks, nacks and errors. The callback can be any class that implements the following interface:
public interface PublishCallback {
    void onAck(List<Ack> acks);
    void onFailure(Throwable throwable);
  }

acks and nacks are returned through the callback's onAck() method. Connection closure or errors are returned through the callback's onFailure() method.

Adding your callback is done as follows:
eventHubClient.registerPublishCallback(Client.PublishCallback callback);

To reopen the stream after, call either forceRenewToken() or setAuthToken() which will use the new token and reconnect the stream.

Subscribe Configuration

Only one configuration is allowed per client. To create additional subscriptions,or to use a different subscriber name or ID, create additional clients. Each Event Hub sends a unique subscriberName in all the messages. If there are multiple subscriberInstance per subscriberName then collectively all instances of that subscriberName will receive all the messages.

Subscribe Configuration OptionsTypeDescriptionDefault
subscriberNameStringSets the subscriber name.default-subscriber-name
subscriberInstanceStringSets the instance of the subscriber.default-subscriber-id
retryIntervalintSet time between each retry attempt when resending messages.30 seconds
durationBeforeFirstRetryStringSet the time to wait before attempting to resend a message.
Note: retryInterval and durationBeforeFirstRetry are only used with when subscribing with acks.

Example

EventHubConfiguration configuration = new EventHubConfiguration.Builder()
          ...
          .subscribeConfiguration(new SubscribeConfiguration.Builder()
            .subscriberName("name")
            .retryIntervalSeconds(30)
            .subscribeRecency(EventHubConfiguration.SubscribeRecency.NEWEST)
            .build())
          ...
          .build();

        Client eventHubClient = new Client(eventHubConfiguration);

Subscribing to Events

Single Subscription

After a Client is created, then Subscribe or subscribeWithAcks can be called by providing a callback to receive the messages. The callback must implement the following interface.

 public interface SubscribeCallback {
    void onMessage(Message message); //Called when new message received
    void onFailure(Throwable throwable); //Called with stream or connection issue
  }

  eventHubClient.subscribe(Client.SubscribeCallback callback);

A received message triggers the onMessage() method. Similarly to the publish callback, onFailure() is called if there is a stream or connection issue.

The methods subscribe, or subscribeWithAck, are used to obtain a stream of messages from the Event Hub.
  • subscribe creates a stream of messages directed to the provided callback.
    eventHubClient.subscribe(Client.SubscribeCallback callback);
  • SubscribeWithAck also creates a stream of messages for the provided callback. The difference is that the received messages must be explicitly acknowledged. If the messages are not acknowledged (acked), the service will continually redeliver the messages.
    eventHubClient.subscribeWithAck(Client.SubscribeCallback callback);

Messages can be acknowledged as follows:

eventHubClient.sendAck(Message message);
eventHubClient.sendAcks(Messages messages);

unsubscribe() can be called to close the subscription stream, eventHubClient.unsubscribe();.

Either subscribe or subscribeWithAck can be used to recreate the subscription. Note that creating subscribe and subscribeWithAck subscriptions is not supported.

Multiple Subscriptions

Only one subscription per subscriberName is allowed. Event Hub sends each uniquesubscriberNameor subscription all stored messages. If there are multiple clients persubscriberName (five maximum) those clients collectively receive all of the messages. That is, the stored messages are divided and dispersed among the clients for thatsubscriberName. Note that this can be used to increase the speed at which messages are retrieved from EventHub for a subscription.

Closing Connections

Best practice when an application has completed sending data is to close all connections to Event Hub.
eventHubClient.shutdown();

Automatic Reconnection

By default, the SDK will attempt to reconnect to the Event Hub upon disconnection. It employs a back off with increasing delays which recycles based on the error. The following are some of the errors and how they are handled.

Error CodePossible ReasonReconnect Behavior
UNAVAILABLE
Event Hub not availableSDK tries to reconnect until connection is established.
UNAUTHENTICATED
Could not retrieve OAuth tokenSDK tries to retrieve token repeatedly and if successful, reconnects to the Event Hub.
UNKNOWN
Permission denied for OAuth userSDK makes three attempts to reconnect before failing. After failing a new connection must be created. There are no more attempts to reconnect.
Client
must be created

OAuth2 Token Management

Renewing the Token

Given OAuth2 parameters are set, the default configuration will renew and cache the token as needed. However, there are methods for finer control of the token. automaticTokenRenew is an optional configuration which has a default of True. Setting to False prevents the automatic retrieval of tokens and the use of the SDK to get a token

EventHubConfiguration configuration = new EventHubConfiguration.Builder()
    ...
    .automaticTokenRenew(Boolean bool)
    .build()

Use the following method to update the tokens manually.

eventHubClient.setAuthToken(String token);

Both forceRenewToken() and setAuthToken() restart the connection for the client to ensure the new tokens are active.

Requesting Specific Scopes

The SDK tries to requests all Event Hub scopes associated to the publish and subscribe by default. If it fails it will fall back to trying only either publish or subscribe scopes. However depending on the number of scopes the auth user has, the token length passed to Event Hub must not exceed 60,000 characters else the client may not be able to authenticate with Event Hub. Specifics scopes can also be passed into a EventHubConfiguration via authScopes(String scopes) where string is all the scopes comma separated. The following are the Event Hub scopes:

ScopesUse
predix-event-hub.zones.
<zoneID>
.user
Required for any token
predix-event-hub.zones.
<zoneID>
.grpc.publish
Required to publish
predix-event-hub.zones.
<zoneID>
.grpc.subscribe
Required to subscribe

Debugging the Event Hub SDK

Debugging can be enabled by setting the following environment variable.
Note: Logging include sensitive information like OAuth2 client secret and tokens. Please use caution.
Environment Variable NameDescription
EVENTHUB_ENABLE_DEBUGIf "true", debug statements are logged.

Building the SDK from the Repository

Automatically create Protobf classes from proto file

After pulling the project, do a mvn clean compile which will automatically generate the protobuf classes.

Manually create Protobf classes from proto file

This SDK uses Proton v3, which must also be used when compiling the proton file manually. Link to the Protobuf compiler protoc (if generating manually):

https://github.com/google/protobuf#protocol-compiler-installation

Common Issues

Jetty ALPN/NPN has not been properly configured

In order to use SSL over HTTP2, BoringSSL needs to be used. The following dependency needs to be listed first in the pom.xml of the project using the Java SDK.

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-tcnative-boringssl-static</artifactId>
    <version>1.1.33.Fork23</version>
</dependency>
<repositories>
  <repository>
    <id>netty-repo-2</id>
    <url>http://repo1.maven.org/maven2/</url>
  </repository>
</repositories>

Debugging Event Hub

Procedure

Set the following environment variable to enable debugging:
Environment Variable NameDescription
EVENTHUB_ENABLE_DEBUGIf set to true, debug statements will be logged.
Note: Logging will include sensitive information like the OAuth2 client secret and tokens.

Building the Event Hub SDK

Build the Event Hub SDK from the repository.

Procedure

To compile the SDK, the Protobuf classes must be generated in one of the two following ways:
  • Automatically create Protobuf classes from the proto file.

    After pulling the project, do a mvn clean compile, which will automatically generate the protobuf classes.

  • Manually create Protobuf classes from the proto file.

    The version of Protobuf used in the SDK is v3, therefore, when compiling the proto file, be sure to use version 3 if you build the Protobuf class manually. See https://github.com/google/protobuf#protocol-compiler-installation for information about the Protobuf compiler protoc.

Best Practices

Consider the following best practices when you work with the Event Hub service.

  • Read acks after you publish messages, unless acks are not explicitly enabled.
  • Limit the number of subscriptions to not more than seven per tenant or zone ID.
  • Subscriptions stay in effect until you delete them. Do not create a subscription client in a loop to read data. Instead, consider using one subscription to read data.
  • You can create a publisher to both publish and stream messages. There is no need to open and close such a publisher for every message.
  • Check credentials for bad authentication. Refrain from connection dialing in a loop.
  • Do not publish messages larger than 1 MB. Keep the total size of publish requests, including arrays of messages, to not more than 1 MB. Tags data is included in the 1 MB size limit.
  • For web sockets, send data as binary JSON.
  • Use tag fields to store metadata, which allows for easy access to the specific types of data instead of data ingestion to multiple zone IDs.
  • If a subscriber does not receive messages, check for another running instance of the subscriber with an identical name. Messages are distributed for subscriber connections with the same name for the same topic.

Maximum Message Size for Message Published to the Event Hub

The Event Hub service supports message batching. When you call addMessage multiple times, the messages are added to a publish request. When the application calls flush(), the request is sent to the Event Hub service. The Event Hub service serializes the messages and then adds them to stable storage.

The size of serialized messages in a publish request cannot exceed 900*1024 bytes. Call msg.getSerializedSize() before adding messages to the publish request to ensure that this limit size is not exceeded. This is to ensure that the messages are rejected at the client side instead of messages being sent to the server and then being rejected. Any messages that exceed 921,600 bytes will immediately throw an exception, AddMessageException with an error message indicating that the message size has exceeded.

After you serialize the messages, call the client.publishClient.messages to get the previous messages and flush them. This step ensures that the data that has been added until now is flushed.

Note: If your app sends only one message in each publish request, the maximum size of a single serialized message cannot exceed 921,600 bytes.

From a network I/O perspective, sending multiple small-size messages in a batch is optimal and is the recommended practice.