Using the Event Hub Service

Publish - Binary JSON Formats

The Event Hub supports multiple binary JSON formats for publishing streaming data.

Event Hub supports binary JSON over WebSocket and binary protobuf over gRPC streams for publishing,

Event Hub retains published messages in durable store for 24 hours.

Sending a Publish Request

Before You Begin

If you are using gRPC to publish, you must set up the gRPC client before sending a publish request.

About This Task

For all publish requests, use the token you received from UAA in the Authorization: section of the HTTP Header in the form of Bearer <token from trusted issuer>.

Once a publisher or subscriber client is successfully authenticated and authorized, messages can be published to Event Hub.

Procedure

Compose the message to be sent in the publish request.
As shown in the example .proto file in ehs-getting-started.html#task_311fa789-ab21-4fb3-a2ca-3fa92eef586a, data is published to Event Hub using PublishRequest types. When creating messages to send to Event Hub, you need to follow the standards outlined in the Message type, as shown in the table below:
Table 1. Message Form
FieldTypeDescription
idstringID of the message being sent. This field will match the "id" field in the acknowledgements sent back to the client, which allows the client to determine which messages failed to publish.
bodybytesContains the data to send.
zone_idstringPredix Zone ID used to authenticate and authorize a client.
tagskey/valueKey/value mapping of additional metadata you can add to help identify and group messages.
Note: Messages must be under 1 MB in size.
See gRPC client to see how to send the required metadata from the client to Event Hub.

The following shows an example of sending a single-message publish request, in Go, using the gRPC protocol:

// Create gRPC context
ctx := metadata.NewContext(context.Background(), headers)
cancelCtx, _ := context.WithCancel(ctx)
md, _ := metadata.FromContext(cancelCtx)
client := pb.NewPublisherClient(conn)

stream, err := client.Send(ctx)

// Compose messages
var messages []*pb.Message
var tags map[string]string
tags = make(map[string]string)
tags["my_tag"] = "my_tag"
messages = append(messages, &pb.Message{
	Id: "001",
	Body: []byte(`{
			"attribute1": "value1",
			"attribute2": "value2",
		}`),
	ZoneId: md["predix-zone-id"][0],
	Tags:   tags,
})

// Send messages to Event Hub
stream.Send(&pb.PublishRequest{
	Messages: &pb.Messages{
		Msg: messages,
	},
})
The preceding example displays a NewPublisherClient instance sending a PublishRequest using the Send method.
For gRPC, once your messages are formed, you must reference the source code in the generated .proto file in order to push a PublishRequest type to Event Hub.
Note: The following shows an example of the JSON payload publish request, in JSON array format, containing two messages, using the WebSocket protocol, before it is converted to binary format:
Sample code for websocket:
URL: wss://publish_url
Headers:
   Authorization: Bearer <token from trusted issuer>
   Predix-Zone-Id: <Predix-Zone-Id>
   Origin: http://<origin-hostname>/

Request Payload:
{
	"id": "my_message_id_001",
	"body": {
		"data_value_here": "value1",
		"more_text": "value2",
	},
	"zoneId": "my_zone_id",
	"tags": {
		"tag1": "tag_value_1"
	}
}, {
	"id": "my_message_id_002",
	"body": {
		"attribute1": "value1",
		"attribute2": "value2",
	},
	"zoneId": "my_zone_id",
	"tags": {
		"tag1": "tag_value_1"
	}
}
Note: When you dial a WebSocket connection, you need to pass the Authorization, Predix-Zone-Id, and Origin headers. The wss://publish_url is contained in the VCAP environment variables for your application. To see the VCAP environment variables, enter cf env <application_name> on the Cloud Foundry CLI.

Sending a Publish Request With C++

The following example shows how you can send publish requests using C++.

#include <grpc++/grpc++.h>
#include "EventHub.grpc.pb.h"
 
int main() {
  // Create a channel
  std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel("<event hub uri>", grpc::SslCredentials(grpc::SslCredentialsOptions()));
 
  // Create a stub
  std::unique_ptr<predix::eventhub::Publisher::Stub> stub = predix::eventhub::Publisher::NewStub(channel);
 
  // Create a client context with necessary headers
  grpc::ClientContext context;
  context.AddMetadata("authorization", "<oauth token>");
  context.AddMetadata("predix-zone-id", "<predix zone id>");
  context.AddMetadata("content-type", "application/grpc");
  context.AddMetadata("topic", "<topic>");
 
  // Create a stream from the stub
  std::shared_ptr<grpc::ClientReaderWriter<predix::eventhub::PublishRequest, predix::eventhub::PublishResponse>> stream(stub->send(&context));
 
  // Create a new set of messages and add a message to the set
  predix::eventhub::Messages* messages = new predix::eventhub::Messages();
  predix::eventhub::Message* message = messages->add_msg();
 
  // Set message details
  message->set_body("<message payload>");
  message->set_zone_id("<predix zone id>");
  message->set_id("<id>");
  message->set_key("<key>");
  message->set_topic("<topic>");
  message->set_offset(1);
  predix::eventhub::Timestamp* timestamp = new predix::eventhub::Timestamp();
  timestamp->set_nanos(1);
  timestamp->set_seconds(1);
  message->set_allocated_timestamp(timestamp);
 
  // Create a request and add the set of messages (containing one message) to it
  predix::eventhub::PublishRequest request;
  request.set_allocated_messages(messages);
 
  // Send the publish request (Write() blocks until success or failure)
  bool write_successful = stream->Write(request);
  if (write_successful) {
    std::cout << "Write() was succesful" << std::endl;
 } else {
    std::cout << "Write() failed" << std::endl;
  }
 
  // Create response
  predix::eventhub::PublishResponse response;
 
  // Obtain the response (Read() blocks until success or failure)
  bool read_successful = stream->Read(&response);
  if (read_successful) {
    std::cout << "Read() was succesful" << std::endl;
 
    // Obtain the acknowledgement
    google::protobuf::RepeatedPtrField<predix::eventhub::Ack> acks = response.ack();
    predix::eventhub::Ack ack = acks.Get(0);
 
    // Check the acknowledgement
    predix::eventhub::AckStatus status = ack.status_code();
    if (status == 0) {
      std::cout << "Message published successfully" << std::endl;;
    } else {
      std::cout << "Message published failed with status code " << status << std::endl;;
    }
  } else {
    std::cout << "Read() failed" << std::endl;
  }
 
  // Close the stream
  stream->WritesDone();
  grpc::Status status = stream->Finish();
 
  if (status.ok()) {
    std::cout << "The call was successful." << std::endl;
  } else {
    std::cout << "There was an error during the call." << std::endl;
    std::cout << "Error code " << status.error_code() << std::endl;
    std::cout << status.error_details() << std::endl;
    std::cout << status.error_message() << std::endl;
  }
 
  return 0;
}

Acknowledgement Messages

The Event Hub service sends an acknowledgment message back to the client for each message packet when messages are published.

An acknowledgement message is sent back to the client for each message packet in the form of the PublishResponse type. The following shows an example of an acknowledgement message:
{
	"id": "my_message_id_001",
	"status_code": 202,
	"desc": "Message successfully sent"
}

Acknowledgement messages use the same format for both WebSocket and gRPC protocols.

Acknowledgement messages contain the following status codes:
CodeMessage
202Accepted successfully.
400Bad request.
503Failed to publish data.
Note: Messages must be under 1 MB in size.

Go Example for Receiving Acknowledgements

In Go, you can use the Recv method with gRPC on a publisherClient instance to receive the acknowledgments back.

Rate Limiting

Rate limiting is applied in cases where a publisher uses excessive capacity for active tenants at a specific point in time. This scenario can arise when the number of active tenants exceeds the provisioned capacity. Therefore, this new functionality is implemented to support the shared service model of Event Hub and prevent just a few tenants from consuming so much capacity that other tenants experience degraded performance.

When tenants are rate-limited, publishing continues, with each active tenant restricted to a maximum of 1 MB of published data every 30 seconds. This is the minimum publishing rate that you can expect per tenant, and throughput rates can increase during intervals where fewer tenants are active.

When a publisher is rate-limited, PublishResponse includes the following:

status: FAILED, description: Too Many Requests.

If this status message appears, throttle your publish rate. To identify which publisher application is rate-limited, search the logs and the PublishResponse code for the text of the status message shown above.

Subscribe

Event Hub supports a unified API using gRPC for subscribing to a topic. This helps build a high-velocity consumer application and ensures efficient network usage. Each topic can have single or multiple subscribers (unique named users). When multiple subscribers that share the same name are added to a topic, message processing is distributed among subscriber connections. There is a limit of seven concurrent subscriber connections per tenant.

Subscriptions are processed using the following workflows:

  • New subscribers – Start receiving data in Event Hub that was pushed in the last 24 hours.
  • New client to a subscriber – Event Hub partitions and distributes the data to all clients. Event Hub maintains five partitions, allowing subscribers to define up to seven clients.
    Note: Only five of the seven clients can be active and receiving messages at any given time. The additional two clients enable active/passive subscribers.
  • Existing subscriber re-opening a connection – Event Hub starts receiving messages from the point where the connection was previously closed.

Creating a Subscription Request

Before You Begin

Set up your client as outlined in ehs-getting-started.html#task_79b3d83c-cc6d-4e11-bbb0-ee5c6b82afa1.

About This Task

Once a subscriber client has been successfully authenticated and authorized, messages can be subscribed to in Event Hub.

Procedure

Event Hub provides a SubscriptionRequest for a client to subscribe to a topic.
As shown in the example .proto file in ehs-getting-started.html#task_311fa789-ab21-4fb3-a2ca-3fa92eef586a, a SubscriptionRequest contains the following:
FieldTypeDescription
zone_idstringPredix Zone ID used to authenticate and authorize a client.
subscriberstringName to identify the subscriber used for parallel processing.
instance_idstringID to identify a subscriber instance.

Creating a Subscription Request With C++

You can also subscribe to messages by using C++.

Procedure
  1. To create a subscription request, use the following code:
    SubscriptionRequest request;
    request.set_zone_id(predix_zone_id);
    request.set_subscriber("<subscriber-name2>");
    request.set_instance_id("<subscriber-instance-id2>");
    
  2. To create a reader and read in the published data, use the following code:
    Message message;
    std::unique_ptr<grpc::ClientReader<Message>> reader(stub->receive(&context, request));
    
    while (reader->Read(&message)) {
      std::cout << "Message received. '" + message.body() + "'";
    }
  3. To perform cleanup, use the following code:
    Status status = reader->Finish();
    if (status.ok()) {
      std::cout << "receive rpc succeeded." << std::endl;
    } else {
      std::cout << "receive rpc failed." << std::endl;
      std::cout << status.error_code() << std::endl;
      std::cout << status.error_details() << std::endl;
      std::cout << status.error_message() << std::endl;
    }

Receiving Messages

Every tenant can have multiple connections running under the same subscriber name at the same time.

About This Task

Each tenant can contain up to seven connections, up to five running at the same time with the same subscriber name. Each subscriber can contain different clients, or multiple subscribers can contain multiple clients. Review the source code generated by the .proto file in order to subscribe to the Event Hub using the Subscription Request types, as the process varies according to the language you are using.

For example, in Go, on a SubscribeClient instance, you can use the Recv method to receive messages from a topic as a subscriber by providing a SubscriptionRequest that contains the topic (<Predix-Zone-Id>), subscriber name, and client instance. Once a subscriber client is initialized, you can use the Recv method to listen for messages.

Subscriptions with Acknowledgements

Some types of subscriptions require acknowledgments, and other do not.

About This Task

You can create two types of Event Hub subscriptions. One type does not require the client to acknowledge received messages. The other type of subscription requires the client to send an acknowledgement message back to the Event Hub service for each message packet in the form of a subscription response type. If these acknowledgment messages are not received, the Event Hub service resends the message.

The following example shows an acknowledgement message:

{ "id": "my_message_id_001", "status_code": 0, "desc": "Message received successfully" } 
CodeMessage
0Accepted
3Failed
Note: All messages must be under 1 MB in size. Publish requests that include an array of messages should be no larger than 1 MB. Tags data is included in the 1 MB size limit.

Unsubscribing from Topics

Procedure

To unsubscribe from one or more topics, the client can cancel the gRPC context.

For example, in Go, a subscriber can unsubscribe from a topic by calling the cancel Context function used to create the subscriber client.

Multiple Topics

You can now create and use multiple topics within a single Event Hub service instance.

Multiple topic support has been implemented in the Event Hub service. You can create multiple topics per instance. Each topic functions independently during publishing and subscribing. There is no longer any need for you to to manage multiple Event Hub service instances to work with multiple topics. When you use multiple topics, an Event Hub service instance functions as a topic broker. You can create a limited number of topics per instance, and additional costs may also be incurred when you use this functionality.

Using Multiple Topics

Multiple topics is an additional feature of Event Hub, and does not affect the previous usage of the service.

To start using multiple topics, you must first create an Event Hub service instance. For details, see Creating an Event Hub Service Instance.

After you create your Event Hub instance, you add the standard scopes for WSS and gRPC publishing and gRPC subscribing. For more details, see ehs-getting-started.html#task_9e08677f-d894-49bf-b34d-e376b44ca015. The default topic is automatically created as part of a new instance, and is used by Event Hub for publish and subscribe operations. You can then create more topics to use with your instance.

To create additional topics, users must have appropriate UAA user permissions to manage topics, including create, delete and get rights. To assign these rights, add the following scope to UAA users:

predix-event-hub.zones.<zone-id>.admin

You use the admin API to create topics. The URL appears in the admin block of the VCAP_SERVICES environment variables after you bind an application to the Event Hub service, for example:

{
    "VCAP_SERVICES": {
        "predix-event-hub": [{
            "credentials": {
 
                "admin": {
                    "protocol_details": [{
                        "protocol": "https",
                        "uri": ":predix-event-hub-admin.run.example.predix.io",
                        "zone-token-scope": [
                            "predix-event-hub.zones.88ec4bd5-e149-4c98-8e8e-952e86ba5fae.user",
                            "predix-event-hub.zones.88ec4bd5-e149-4c98-8e8e-952e86ba5fae.admin"
                        ]
                    }],
                    "zone-http-header-name": "Predix-Zone-Id",
                    "zone-http-header-value": "88ec4bd5-e149-4c98-8e8e-952e86ba5fae"
                },
 
                "publish": {...},
                "subscribe": {...},
                "label": "predix-event-hub",
                "name": "event-hub-admin-demo",
                "plan": "Tiered",
                "provider": null,
                "syslog_drain_url": null,
                "tags": [
                    "eventhub",
                    "event-hub",
                    "event hub"
                ],
                "volume_mounts": []
            }
        }]
    }
}

In this example, the admin API URL is predix-event-hub-admin.run.example.predix.io.

After you add the admin scope and generate a net token for the user, use the following topic management endpoints to work with multiple topics. These headers must be in every request.

Table 2. Admin Topic Management Endpoints
Header KeyHeader Value
Authorizationbearer <uaa-token>
grant_typeclient_credentials

When you send requests, you use the following URL patterns and receive the following responses.

Table 3. Request Syntax and Responses
MethodURLResponse
PUThttps://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics/<topic-name>
  • 201: Created successfully.
  • 200: Topic already created.
  • 400: Maximum topics limit has been reached.
DELETEhttps://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics/<topic-name>200: OK (newly deleted or already deleted)
GEThttps://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics
  • 200:

    { "topics": ["topic1","topic2"], "topicLimit": 10 }

  • 401: Invalid token or missing user permissions

Creating Topics

About This Task
Follow these steps to create new topics. Topic names must adhere to the following requirements:
  • Only alphanumerics, dashes, underscores, and periods can be used.
  • Names cannot duplicate the names of existing topics within an instance.
  • Maximum length is 100 characters.
Procedure
  1. Obtain a UAA token for a user with appropriate admin permissions and set that value in the header.
  2. Send the PUT request.
    For example, the following URL pattern is appropriate for CF3:
    https://predix-event-hub-service-broker.run.aws-usw02-dev.ice.predix.io/v2/admin/service_instances/<zone-id>/topics/<topic-name>
    A 201 response indicates that the new topic is created and is ready to be used. If the topic name in the PUT request matches the name of a previously created topic, a 200 response is returned. If the maximum multiple topic count for the instance has been reached, a 400 response is returned, and you must delete previously created topics before you can create new ones.

Using Topics

After you create topics, you must add UAA user permissions before the topics can be used.

Procedure
  1. Create a new topic.
  2. Add the following scopes:
    • predix-event-hub.zones.<zone-id>.<topic-name>.wss.publish
    • predix-event-hub.zones.<zone-id>.<topic-name>.grpc.publish
    • predix-event-hub.zones.<zone-id>.<topic-name>.grpc.subscribe
  3. To publish or subscribe to the new topic, set <topic-name> as the value of the topic key in the request headers.
    • If you use an SDK, add <topic-name> to the topics used.

Deleting Topics

When an instance has already reached the maximum limit for multiple topics, you can delete unused topics to make room for new topics.

Procedure
Send a DELETE request.
For example, the following URL pattern is appropriate for CF3:
https://predix-event-hub-service-broker.run.aws-usw02-dev.ice.predix.io/v2/admin/service_instances/<zone-id>/topics/<topic-name>
The topic is deleted and topic allocation space is cleared. The response for both successful and failed topic deletion is 200.
Note: You cannot delete the default topic.

Getting All Topics for an Instance

Procedure
Send a GET request.
For example, the following URL pattern is appropriate for CF3:
https://predix-event-hub-service-broker.run.aws-usw02-dev.ice.predix.io/v2/admin/service_instances/<zone-id>/topics/
A list of existing topics is returned, along with the maximum number of additional topics that can be created.
Note: The title of the default topic is topic, and is always in this list.

Using Default and Multiple Topics Together

About This Task
Before you can use default and multiple topics together, you must explicitly specify the scopes for the default topic. The default scopes lack the appropriate permissions for the default topic.
Procedure
Add the following scopes for the default topic (whose title is topic):
  • predix-event-hub.zones.<zone-id>.topic.wss.publish
  • predix-event-hub.zones.<zone-id>.topic.grpc.publish
  • predix-event-hub.zones.<zone-id>.topic.grpc.subscribe
Results
Adding scopes for the default topic affects most subscribe operations, and has little effect on publish operations.

Granting UAA Users Access to All Topics

You can grant UAA users access to all multiple topics without specifying individual topic names by using a wildcard when you add scopes.

Procedure
Add the following scopes:
  • predix-event-hub.zones.<zone-id>.*.wss.publish
  • predix-event-hub.zones.<zone-id>.*.grpc.publish
  • predix-event-hub.zones.<zone-id>.*.grpc.subscribe

Best Practices

Consider the following best practices when you work with the Event Hub service.

  • Read acks after you publish messages, unless acks are not explicitly enabled.
  • Limit the number of subscriptions to not more than seven per tenant or zone ID.
  • Subscriptions stay in effect until you delete them. Do not create a subscription client in a loop to read data. Instead, consider using one subscription to read data.
  • You can create a publisher to both publish and stream messages. There is no need to open and close such a publisher for every message.
  • Check credentials for bad authentication. Refrain from connection dialing in a loop.
  • Do not publish messages larger than 1 MB. Keep the total size of publish requests, including arrays of messages, to not more than 1 MB. Tags data is included in the 1 MB size limit.
  • For web sockets, send data as binary JSON.
  • Use tag fields to store metadata, which allows for easy access to the specific types of data instead of data ingestion to multiple zone IDs.
  • If a subscriber does not receive messages, check for another running instance of the subscriber with an identical name. Messages are distributed for subscriber connections with the same name for the same topic.

Maximum Message Size for Message Published to the Event Hub

The Event Hub service supports message batching. When you call addMessage multiple times, the messages are added to a publish request. When the application calls flush(), the request is sent to the Event Hub service. The Event Hub service serializes the messages and then adds them to stable storage.

The size of serialized messages in a publish request cannot exceed 900*1024 bytes. Call msg.getSerializedSize() before adding messages to the publish request to ensure that this limit size is not exceeded. This is to ensure that the messages are rejected at the client side instead of messages being sent to the server and then being rejected. Any messages that exceed 921,600 bytes will immediately throw an exception, AddMessageException with an error message indicating that the message size has exceeded.

After you serialize the messages, call the client.publishClient.messages to get the previous messages and flush them. This step ensures that the data that has been added until now is flushed.

Note: If your app sends only one message in each publish request, the maximum size of a single serialized message cannot exceed 921,600 bytes.

From a network I/O perspective, sending multiple small-size messages in a batch is optimal and is the recommended practice.