Using the Event Hub Service
Publish - Binary JSON Formats
The Event Hub supports multiple binary JSON formats for publishing streaming data.
Event Hub supports binary JSON over WebSocket and binary protobuf over gRPC streams for publishing,
Event Hub retains published messages in durable store for 24 hours.
Sending a Publish Request
Before You Begin
If you are using gRPC to publish, you must set up the gRPC client before sending a publish request.
About This Task
For all publish requests, use the token you received from UAA in the Authorization:
section of the HTTP Header in the form of Bearer <token from trusted issuer>
.
Once a publisher or subscriber client is successfully authenticated and authorized, messages can be published to Event Hub.
Procedure
PublishRequest
types. When creating messages to send to Event Hub, you need to follow the standards outlined in the Message
type, as shown in the table below: Field | Type | Description |
---|---|---|
id | string | ID of the message being sent. This field will match the "id" field in the acknowledgements sent back to the client, which allows the client to determine which messages failed to publish. |
body | bytes | Contains the data to send. |
zone_id | string | Predix Zone ID used to authenticate and authorize a client. |
tags | key/value | Key/value mapping of additional metadata you can add to help identify and group messages. |
The following shows an example of sending a single-message publish request, in Go, using the gRPC protocol:
// Create gRPC context
ctx := metadata.NewContext(context.Background(), headers)
cancelCtx, _ := context.WithCancel(ctx)
md, _ := metadata.FromContext(cancelCtx)
client := pb.NewPublisherClient(conn)
stream, err := client.Send(ctx)
// Compose messages
var messages []*pb.Message
var tags map[string]string
tags = make(map[string]string)
tags["my_tag"] = "my_tag"
messages = append(messages, &pb.Message{
Id: "001",
Body: []byte(`{
"attribute1": "value1",
"attribute2": "value2",
}`),
ZoneId: md["predix-zone-id"][0],
Tags: tags,
})
// Send messages to Event Hub
stream.Send(&pb.PublishRequest{
Messages: &pb.Messages{
Msg: messages,
},
})
NewPublisherClient
instance sending a PublishRequest
using the Send
method.PublishRequest
type to Event Hub. Sample code for websocket:
URL: wss://publish_url
Headers:
Authorization: Bearer <token from trusted issuer>
Predix-Zone-Id: <Predix-Zone-Id>
Origin: http://<origin-hostname>/
Request Payload:
{
"id": "my_message_id_001",
"body": {
"data_value_here": "value1",
"more_text": "value2",
},
"zoneId": "my_zone_id",
"tags": {
"tag1": "tag_value_1"
}
}, {
"id": "my_message_id_002",
"body": {
"attribute1": "value1",
"attribute2": "value2",
},
"zoneId": "my_zone_id",
"tags": {
"tag1": "tag_value_1"
}
}
cf env <application_name>
on the Cloud Foundry CLI.Sending a Publish Request With C++
The following example shows how you can send publish requests using C++.
#include <grpc++/grpc++.h>
#include "EventHub.grpc.pb.h"
int main() {
// Create a channel
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel("<event hub uri>", grpc::SslCredentials(grpc::SslCredentialsOptions()));
// Create a stub
std::unique_ptr<predix::eventhub::Publisher::Stub> stub = predix::eventhub::Publisher::NewStub(channel);
// Create a client context with necessary headers
grpc::ClientContext context;
context.AddMetadata("authorization", "<oauth token>");
context.AddMetadata("predix-zone-id", "<predix zone id>");
context.AddMetadata("content-type", "application/grpc");
context.AddMetadata("topic", "<topic>");
// Create a stream from the stub
std::shared_ptr<grpc::ClientReaderWriter<predix::eventhub::PublishRequest, predix::eventhub::PublishResponse>> stream(stub->send(&context));
// Create a new set of messages and add a message to the set
predix::eventhub::Messages* messages = new predix::eventhub::Messages();
predix::eventhub::Message* message = messages->add_msg();
// Set message details
message->set_body("<message payload>");
message->set_zone_id("<predix zone id>");
message->set_id("<id>");
message->set_key("<key>");
message->set_topic("<topic>");
message->set_offset(1);
predix::eventhub::Timestamp* timestamp = new predix::eventhub::Timestamp();
timestamp->set_nanos(1);
timestamp->set_seconds(1);
message->set_allocated_timestamp(timestamp);
// Create a request and add the set of messages (containing one message) to it
predix::eventhub::PublishRequest request;
request.set_allocated_messages(messages);
// Send the publish request (Write() blocks until success or failure)
bool write_successful = stream->Write(request);
if (write_successful) {
std::cout << "Write() was succesful" << std::endl;
} else {
std::cout << "Write() failed" << std::endl;
}
// Create response
predix::eventhub::PublishResponse response;
// Obtain the response (Read() blocks until success or failure)
bool read_successful = stream->Read(&response);
if (read_successful) {
std::cout << "Read() was succesful" << std::endl;
// Obtain the acknowledgement
google::protobuf::RepeatedPtrField<predix::eventhub::Ack> acks = response.ack();
predix::eventhub::Ack ack = acks.Get(0);
// Check the acknowledgement
predix::eventhub::AckStatus status = ack.status_code();
if (status == 0) {
std::cout << "Message published successfully" << std::endl;;
} else {
std::cout << "Message published failed with status code " << status << std::endl;;
}
} else {
std::cout << "Read() failed" << std::endl;
}
// Close the stream
stream->WritesDone();
grpc::Status status = stream->Finish();
if (status.ok()) {
std::cout << "The call was successful." << std::endl;
} else {
std::cout << "There was an error during the call." << std::endl;
std::cout << "Error code " << status.error_code() << std::endl;
std::cout << status.error_details() << std::endl;
std::cout << status.error_message() << std::endl;
}
return 0;
}
Acknowledgement Messages
The Event Hub service sends an acknowledgment message back to the client for each message packet when messages are published.
PublishResponse
type. The following shows an example of an acknowledgement message:{
"id": "my_message_id_001",
"status_code": 202,
"desc": "Message successfully sent"
}
Acknowledgement messages use the same format for both WebSocket and gRPC protocols.
Code | Message |
---|---|
202 | Accepted successfully. |
400 | Bad request. |
503 | Failed to publish data. |
Go Example for Receiving Acknowledgements
In Go, you can use the Recv
method with gRPC on a publisherClient
instance to receive the acknowledgments back.
Rate Limiting
Rate limiting is applied in cases where a publisher uses excessive capacity for active tenants at a specific point in time. This scenario can arise when the number of active tenants exceeds the provisioned capacity. Therefore, this new functionality is implemented to support the shared service model of Event Hub and prevent just a few tenants from consuming so much capacity that other tenants experience degraded performance.
When tenants are rate-limited, publishing continues, with each active tenant restricted to a maximum of 1 MB of published data every 30 seconds. This is the minimum publishing rate that you can expect per tenant, and throughput rates can increase during intervals where fewer tenants are active.
When a publisher is rate-limited, PublishResponse
includes the following:
status: FAILED, description: Too Many Requests.
If this status message appears, throttle your publish rate. To identify which publisher application is rate-limited, search the logs and the PublishResponse
code for the text of the status message shown above.
Subscribe
Event Hub supports a unified API using gRPC for subscribing to a topic. This helps build a high-velocity consumer application and ensures efficient network usage. Each topic can have single or multiple subscribers (unique named users). When multiple subscribers that share the same name are added to a topic, message processing is distributed among subscriber connections. There is a limit of seven concurrent subscriber connections per tenant.
Subscriptions are processed using the following workflows:
- New subscribers – Start receiving data in Event Hub that was pushed in the last 24 hours.
- New client to a subscriber – Event Hub partitions and distributes the data to all clients. Event Hub maintains five partitions, allowing subscribers to define up to seven clients.Note: Only five of the seven clients can be active and receiving messages at any given time. The additional two clients enable active/passive subscribers.
- Existing subscriber re-opening a connection – Event Hub starts receiving messages from the point where the connection was previously closed.
Creating a Subscription Request
Before You Begin
Set up your client as outlined in ehs-getting-started.html#task_79b3d83c-cc6d-4e11-bbb0-ee5c6b82afa1.
About This Task
Once a subscriber client has been successfully authenticated and authorized, messages can be subscribed to in Event Hub.
Procedure
SubscriptionRequest
for a client to subscribe to a topic. SubscriptionRequest
contains the following: Field | Type | Description |
---|---|---|
zone_id | string | Predix Zone ID used to authenticate and authorize a client. |
subscriber | string | Name to identify the subscriber used for parallel processing. |
instance_id | string | ID to identify a subscriber instance. |
Creating a Subscription Request With C++
You can also subscribe to messages by using C++.
Procedure
Receiving Messages
Every tenant can have multiple connections running under the same subscriber name at the same time.
About This Task
Each tenant can contain up to seven connections, up to five running at the same time with the same subscriber name. Each subscriber can contain different clients, or multiple subscribers can contain multiple clients. Review the source code generated by the .proto file in order to subscribe to the Event Hub using the Subscription Request types, as the process varies according to the language you are using.
For example, in Go, on a SubscribeClient
instance, you can use the Recv
method to receive messages from a topic as a subscriber by providing a SubscriptionRequest
that contains the topic (<Predix-Zone-Id>
), subscriber name, and client instance. Once a subscriber client is initialized, you can use the Recv
method to listen for messages.
Subscriptions with Acknowledgements
Some types of subscriptions require acknowledgments, and other do not.
About This Task
You can create two types of Event Hub subscriptions. One type does not require the client to acknowledge received messages. The other type of subscription requires the client to send an acknowledgement message back to the Event Hub service for each message packet in the form of a subscription response type. If these acknowledgment messages are not received, the Event Hub service resends the message.
The following example shows an acknowledgement message:
{ "id": "my_message_id_001", "status_code": 0, "desc": "Message received successfully" }
Code | Message |
---|---|
0 | Accepted |
3 | Failed |
Unsubscribing from Topics
Procedure
Context
function used to create the subscriber client.Multiple Topics
You can now create and use multiple topics within a single Event Hub service instance.
Multiple topic support has been implemented in the Event Hub service. You can create multiple topics per instance. Each topic functions independently during publishing and subscribing. There is no longer any need for you to to manage multiple Event Hub service instances to work with multiple topics. When you use multiple topics, an Event Hub service instance functions as a topic broker. You can create a limited number of topics per instance, and additional costs may also be incurred when you use this functionality.
Using Multiple Topics
Multiple topics is an additional feature of Event Hub, and does not affect the previous usage of the service.
To start using multiple topics, you must first create an Event Hub service instance. For details, see Creating an Event Hub Service Instance.
After you create your Event Hub instance, you add the standard scopes for WSS and gRPC publishing and gRPC subscribing. For more details, see ehs-getting-started.html#task_9e08677f-d894-49bf-b34d-e376b44ca015. The default topic is automatically created as part of a new instance, and is used by Event Hub for publish and subscribe operations. You can then create more topics to use with your instance.
To create additional topics, users must have appropriate UAA user permissions to manage topics, including create, delete and get rights. To assign these rights, add the following scope to UAA users:
predix-event-hub.zones.<zone-id>.admin
You use the admin API to create topics. The URL appears in the admin
block of the VCAP_SERVICES
environment variables after you bind an application to the Event Hub service, for example:
{
"VCAP_SERVICES": {
"predix-event-hub": [{
"credentials": {
"admin": {
"protocol_details": [{
"protocol": "https",
"uri": ":predix-event-hub-admin.run.example.predix.io",
"zone-token-scope": [
"predix-event-hub.zones.88ec4bd5-e149-4c98-8e8e-952e86ba5fae.user",
"predix-event-hub.zones.88ec4bd5-e149-4c98-8e8e-952e86ba5fae.admin"
]
}],
"zone-http-header-name": "Predix-Zone-Id",
"zone-http-header-value": "88ec4bd5-e149-4c98-8e8e-952e86ba5fae"
},
"publish": {...},
"subscribe": {...},
"label": "predix-event-hub",
"name": "event-hub-admin-demo",
"plan": "Tiered",
"provider": null,
"syslog_drain_url": null,
"tags": [
"eventhub",
"event-hub",
"event hub"
],
"volume_mounts": []
}
}]
}
}
In this example, the admin API URL is predix-event-hub-admin.run.example.predix.io.
After you add the admin scope and generate a net token for the user, use the following topic management endpoints to work with multiple topics. These headers must be in every request.
Header Key | Header Value |
---|---|
Authorization | bearer <uaa-token> |
grant_type | client_credentials |
When you send requests, you use the following URL patterns and receive the following responses.
Method | URL | Response |
---|---|---|
PUT | https://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics/<topic-name> |
|
DELETE | https://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics/<topic-name> | 200 : OK (newly deleted or already deleted) |
GET | https://<admin-url>.com/v2/admin/service_instances/<zone-id>/topics |
|
Creating Topics
About This Task
- Only alphanumerics, dashes, underscores, and periods can be used.
- Names cannot duplicate the names of existing topics within an instance.
- Maximum length is 100 characters.
Procedure
Using Topics
After you create topics, you must add UAA user permissions before the topics can be used.
Procedure
Deleting Topics
When an instance has already reached the maximum limit for multiple topics, you can delete unused topics to make room for new topics.
Procedure
DELETE
request.https://predix-event-hub-service-broker.run.aws-usw02-dev.ice.predix.io/v2/admin/service_instances/<zone-id>/topics/<topic-name>
200
. Getting All Topics for an Instance
Procedure
GET
request.https://predix-event-hub-service-broker.run.aws-usw02-dev.ice.predix.io/v2/admin/service_instances/<zone-id>/topics/
topic
, and is always in this list.Using Default and Multiple Topics Together
About This Task
Procedure
topic
):predix-event-hub.zones.<zone-id>.topic.wss.publish
predix-event-hub.zones.<zone-id>.topic.grpc.publish
predix-event-hub.zones.<zone-id>.topic.grpc.subscribe
Results
Granting UAA Users Access to All Topics
You can grant UAA users access to all multiple topics without specifying individual topic names by using a wildcard when you add scopes.
Procedure
predix-event-hub.zones.<zone-id>.*.wss.publish
predix-event-hub.zones.<zone-id>.*.grpc.publish
predix-event-hub.zones.<zone-id>.*.grpc.subscribe
Best Practices
Consider the following best practices when you work with the Event Hub service.
- Read acks after you publish messages, unless acks are not explicitly enabled.
- Limit the number of subscriptions to not more than seven per tenant or zone ID.
- Subscriptions stay in effect until you delete them. Do not create a subscription client in a loop to read data. Instead, consider using one subscription to read data.
- You can create a publisher to both publish and stream messages. There is no need to open and close such a publisher for every message.
- Check credentials for bad authentication. Refrain from connection dialing in a loop.
- Do not publish messages larger than 1 MB. Keep the total size of publish requests, including arrays of messages, to not more than 1 MB. Tags data is included in the 1 MB size limit.
- For web sockets, send data as binary JSON.
- Use tag fields to store metadata, which allows for easy access to the specific types of data instead of data ingestion to multiple zone IDs.
- If a subscriber does not receive messages, check for another running instance of the subscriber with an identical name. Messages are distributed for subscriber connections with the same name for the same topic.
Maximum Message Size for Message Published to the Event Hub
The Event Hub service supports message batching. When you call addMessage
multiple times, the messages are added to a publish request. When the application calls flush()
, the request is sent to the Event Hub service. The Event Hub service serializes the messages and then adds them to stable storage.
The size of serialized messages in a publish request cannot exceed 900*1024 bytes. Call msg.getSerializedSize()
before adding messages to the publish request to ensure that this limit size is not exceeded. This is to ensure that the messages are rejected at the client side instead of messages being sent to the server and then being rejected. Any messages that exceed 921,600 bytes will immediately throw an exception, AddMessageException
with an error message indicating that the message size has exceeded.
After you serialize the messages, call the client.publishClient.messages
to get the previous messages and flush them. This step ensures that the data that has been added until now is flushed.
From a network I/O perspective, sending multiple small-size messages in a batch is optimal and is the recommended practice.