Using Predix Message Queue
Code Examples for Use with Predix Message Queue
Guide to Using Code Examples with Predix Message Queue
The examples included here demonstrate how to use the Predix Message Queue service with each of our supported build pack languages.
These examples are based on the official RabbitMQ Tutorials. You can find more information about working with the RabbitMQ service there. These examples all demonstrate using direct routing with a routing key as shown in tutorial 4, Routing in the official tutorials.
- Java
- Ruby
- Node.js
- Python
- PHP
- GO
Java Code Examples for Predix Message Queue
Recommended Library
Alternative Libraries
Java Code Example using RabbitMQ Java Client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>
VCAP_SERVICES
environment variable. In this example we're using the JSON-java
library to parse the JSON data inVCAP_SERVICES
.// Parse the RabbitMQ config from the VCAP_SERVICES environment. Assumes there's only one RabbitMQ instance bound.
String vcapServices = System.getenv("VCAP_SERVICES");
JSONObject vcapJson = new JSONObject(vcapServices);
JSONObject amqpCredentials = vcapJson.getJSONArray("predix-message-queue") //
.getJSONObject(0) //
.getJSONObject("credentials")
.getJSONObject("protocols")
.getJSONObject("amqp");
/* Set up the RabbitMQ connection. Alternatively you can also use the URI:
*
* factory.setUri(amqpCredentials.getString("uri"));
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(amqpCredentials.getString("host"));
factory.setUsername(amqpCredentials.getString("username"));
factory.setPassword(amqpCredentials.getString("password"));
factory.setPort(amqpCredentials.getInt("port"));
factory.setVirtualHost(amqpCredentials.getString("vhost"));
Connection connection = factory.newConnection();
Channel
to declare a Queue
. You can bind the Queue
to an Exchange
with a routing key as needed for your specific use case (See the RabbitMQ Tutorials for more details):// Define Queue name, Exchange name, and routing key as strings.
String QUEUE_NAME = "PredixRabbitMQSample";
String EXCHANGE_NAME = "sample-exchange";
String ROUTING_KEY = "test";
// Create the channel, declare the queue and exchange, and bind them.
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
basicPublish
method of the Channel
. Convert the message data to bytes before publishing. In this example the message body is a randomly generated UUID string.// Generate a random UUID to use for the message
String message = UUID.randomUUID().toString();
byte[] messageBodyBytes = message.getBytes();
// Publish the message using basicPublish
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, messageBodyBytes);
Consumer
class. First, declare the Queue
and create the Channel
as described above, then configure the Consumer
:Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
String output = String.format("Received message from exchange %s: %s", envelope.getExchange(), message);
System.out.println(output);
}
};
channel.basicConsume(queueName, true, consumer);
Spring AMQP Client Code Sample for Predix Message Queue
Spring Boot Framework Examples
The Spring Boot framework offers some libraries that simplify the process of integrating with services in CloudFoundry. Services will be auto-discovered without the need to manually parse the VCAP_SERVICES
environment variable. Here's how to achieve results similar to the above example using Spring AMQP.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
</dependencies>
Configuration
class which injects the following Beans:@Configuration
public class RabbitMQSpringSampleConsumerConfig {
@Bean
public Queue queue() {
return new Queue("PredixRabbitMQSpringSample");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("sample-exchange");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test");
}
}
Beans
in your producing app:@Autowired
private DirectExchange exchange;
@Autowired
private RabbitTemplate template;
Configuration
class when your app starts. You can now use them to publish messages to the queue:// Generate a random UUID to use for the message
String message = UUID.randomUUID().toString();
// Publish the message to the configured exchange, using 'test' as the routing key.
template.convertAndSend(exchange.getName(), "test", message);
RabbitListener
class. Use a similar Configuration class as above to configure the queue and bind it to an exchange, then set up the RabbitListener
for that queue and define a callback method to handle incoming messages:@RabbitListener(queues="#{queue.name}")
public void receiveMessage(String data) throws InterruptedException {
String exchangeName = exchange.getName();
System.out.println(String.format("Received message from exchange %s: %s", exchangeName, data));
}
Ruby Code Sample for Predix Message Queue
Recommended Library
Alternative Libraries
Code Sample using Bunny
Install the Library
Bunny
library in your Gemfile
to ensure that it will be installed by bundler in CloudFoundry:source 'https://rubygems.org'
ruby '>=2.3.1'
gem 'bunny', '>=2.6.4'
Discover the Bound Predix Message Queue Service in Ruby
VCAP_SERVICES
environment variable to get the predix-message-queue
service credentials using the JSON
library. This example assumes only one predix-message-queue
service is bound to the app.vcap = JSON.parse ENV['VCAP_SERVICES']
amqp_credentials = vcap['predix-message-queue'][0]['credentials']['protocols']['amqp']
cf-app-utils
gem:credentials = CF::App::Credentials.find_by_service_label('predix-message-queue')
amqp_credentials = credentials['protocols']['amqp']
# Create the RabbitMQ connection. You can also use the URI:
#
# conn = Bunny.new(amqp_credentials['uri'])
#
conn = Bunny.new(host: amqp_credentials['host'],
port: amqp_credentials['port'],
username: amqp_credentials['username'],
password: amqp_credentials['password'],
vhost: amqp_credentials['vhost'] )
conn.start
Publishing and Consuming Messages
Create the channel, exchange, and queue. Bind the queue to the exchange. In this example we use a direct exchange with a routing key. See the RabbitMQ Tutorials for more details on the different types of message routing available.
channel = conn.create_channel
exchange = channel.direct('sample-exchange')
queue = channel.queue('predix-message-queue-ruby-sample')
queue.bind(exchange, routing_key: 'test')
In your publishing application you can now use the Queue
object to publish messages. In this case we generate a random string and publish it to the queue:
message = SecureRandom.uuid
STDOUT.puts "Publishing message to exchange #{exchange.name}: #{message}"
queue.publish(message)
subscribe
method of the Queue
to define a callback message to use for handling the messages:queue.subscribe(block: true, exclusive: true) do |delivery_info, properties, message|
STDOUT.puts "Received message from the queue: #{message}"
end
channel.close
conn.close
Node.js Code Sample for using Predix Message Queue
Recommended Library
Alternative Libraries
Code Sample using amqp.node
- Install the Liberary
- Install the
amplib
using npm:npm init npm install amqplib --save
- Discover the Bound Predix Message Queue Service in Ruby
-
Parse the credentials from the app's
VCAP_SERVICES
environment variable:var vcap = process.env.VCAP_SERVICES; var vcapJSON = JSON.parse(vcap); var credentials = vcapJSON['predix-message-queue'][0]['credentials']['protocols']['amqp'];
- Sample App for Publishing Messages using amqplib
- This example uses the
amqplib
callback API to publish a message to the Predix Message Queue service every 5 seconds using an exchange and routing key for direct routing. It will stop and close the connection after five minutes.var vcap_services = require('vcap_services'); var amqp = require('amqplib/callback_api'); var uuid = require("uuid"); var credentials = vcap_services.getCredentials('predix-message-queue', 'Dedicated-3HA-Q20'); // Error handler function logs the message and closes the connection if there is one. function fail(err, conn) { console.log(err); if (conn) conn.close(function() { process.exit(1); }); } // Generate a random UUID and publish to the exchange with the given routing key function publishMessage(ch, exchange, routingKey) { var message = uuid.v4(); ch.publish(exchange, routingKey, Buffer.from(message)); console.log("Publishing message to exchange %s: %s", exchange, message); } // Callback function to run when the amqp connection is created. function on_connect(err, conn) { if (err !== null) return fail(err); // Callback function to run when channel is opened function on_channel_open(err, ch) { if (err !== null) return fail(err, conn); // Configure the exchange, then use setInterval to publish a message once every 5 seconds. var exchange = 'sample-exchange'; ch.assertExchange(exchange, 'direct', {durable: false}); var intervalObj = setInterval(publishMessage, 5000, ch, exchange, 'test'); // After five minutes clear the Interval and close down the channel and connection setTimeout(function() { clearInterval(intervalObj); ch.close(function() { conn.close(); }); }, 300000); } // Open a channel conn.createChannel(on_channel_open); } // Connect to the predix-message-queue service using the URI amqp.connect(credentials.uri, on_connect);
- Sample App for Consuming Messages using amqplib
- This example uses the
amqplib
callback API to consume messages from the publisher app created in . It creates a queue and binds it to the same exchange used by the publisher. It will log each message as it is received.var amqp = require('amqplib/callback_api'); // Parse the VCAP_SERVICES environment variable to JSON to get the credentals. var vcap = process.env.VCAP_SERVICES; var vcapJSON = JSON.parse(vcap); var credentials = vcapJSON['predix-message-queue'][0]['credentials']['protocols']['amqp']; // Define a method for error handling. function fail(err, conn) { console.log(err); if (conn) conn.close(function() { process.exit(1); }); } function on_connect(err, conn) { if (err !== null) return fail(err); process.once('SIGINT', function() { conn.close(); }); conn.createChannel(function(err, ch) { if (err !== null) return fail(err, conn); var exchange = 'sample-exchange'; var queue = 'nodejs-example-queue'; ch.assertExchange(exchange, 'direct', {durable: false}); ch.assertQueue(queue, {exclusive: false}, function(err, ok) { if (err !== null) return fail(err, conn); ch.bindQueue(queue, exchange, 'test', {}); console.log("Listening for messages on %s", exchange); ch.consume(queue, function(message) { var routingKey = message.fields.routingKey; var msg = message.content.toString(); console.log("Received message with routing key %s: %s", routingKey, msg); }, {noAck: true}); }); }); } amqp.connect(credentials['uri']+ "?heartbeat=30", on_connect);
Python Code Sample for Predix Message Queue
Recommended Library
Alternative Libraries
Code Sample using pika
- Install the Library
- To ensure that CloudFoundry will install the pika module add a
requirements.txt
to your app directory that contains the following line:pika>=0.10.0
- Discover the Bound Predix Message Queue Service in Python
- You can use os.environ and the
json
library to parse theVCAP_SERVICES
environment variable. If the variable isn't defined, or if the service isn't found, aKeyError
will be thrown.import sys import json from os import environ try: vcap_services = json.loads(environ['VCAP_SERVICES']) amqp_credentials = vcap_services['predix-message-queue'][0]['credentials']['protocols']['amqp'] except KeyError as error: print("Could not parse credentials from the environment. Key not found: {0}".format(error)) sys.exit(1)
- Set Up the Ra bbitMQ Connection
- Use the
PlainCredentials
andConnectionParameters
classes to set up the connection configuration using the service credentials, then use them to open the connection and create a channel.# Configure the authentication credentials rabbit_creds = pika.PlainCredentials(amqp_credentials['username'], amqp_credentials['password']) # Set up connection parameters params = pika.ConnectionParameters(host=amqp_credentials['host'], port=amqp_credentials['port'], virtual_host=amqp_credentials['vhost'], ssl=amqp_credentials['ssl'], credentials=rabbit_creds) # Open the connection and create a channel connection = pika.BlockingConnection(params) channel = connection.channel()
- Publishing Messages
- Use the
basic_publish
method of theChannel
to publish a message. This example uses direct routing with a routing key:channel.basic_publish(exchange='sample-exchange', routing_key='test', body='Hello World!')
- Consuming Messages
- To consume messages first declare a queue and an exchange, and then bind them with a routing key:
To consume messages first declare a queue and an exchange, and then bind them with a routing key:
- Full Sample Producer App
- Here's the full example publisher which will publish a random UUID to the exchange once every five seconds for five minutes:
- requirements.txt
-
pika>=0.10.0
- manifest.yml
-
applications: - name: python-sample-producer command: python sample-producer.py no-route: true mem: 256M disk: 1024M instances: 1
- Procfile
-
web: python sample-producer.py
- sample-producer.py
-
import sys import json import pika from os import environ from uuid import uuid4 from time import sleep # Parse the credentials from the VCAP_SERVICES environment variable try: vcap_services = json.loads(environ['VCAP_SERVICES']) amqp_credentials = vcap_services['predix-message-queue'][0]['credentials']['protocols']['amqp'] # Configure the authentication credentials rabbit_creds = pika.PlainCredentials(amqp_credentials['username'], amqp_credentials['password']) # Set up connection parameters params = pika.ConnectionParameters(host=amqp_credentials['host'], port=amqp_credentials['port'], virtual_host=amqp_credentials['vhost'], ssl=amqp_credentials['ssl'], credentials=rabbit_creds) except KeyError as error: print("Could not parse credentials from the environment. Key not found: {0}".format(error)) sys.exit(1) # Open the connection and create a channel connection = pika.BlockingConnection(params) channel = connection.channel() # Publish a message to the queue every 5 seconds for 5 minutes. max_seconds = 300 total_seconds = 0 while total_seconds < max_seconds: # Generate a random UUID string to use as the message. message = str(uuid4()) # Publish the message to the channel using basic_publish with an exchange and routing key channel.basic_publish(exchange='sample-exchange', routing_key='test', body=message) print("Published message to exchange sample-exchange: {0}".format(message)) sleep(1) total_seconds += 1 connection.close() sys.exit(0)
- Sample log Output:
-
2017-08-04T10:48:09.28-0700 [DEA/5] OUT Starting app instance (index 0) with guid 0b803338-8c3a-4620-b6c7-73bdc2de9efa 2017-08-04T10:48:11.83-0700 [App/0] OUT Published message to exchange sample-exchange: 5769f6a3-2587-4202-b5c4-050aaf8ea16f 2017-08-04T10:48:12.83-0700 [App/0] OUT Published message to exchange sample-exchange: e83743d8-9d04-4c44-8f88-b3216b890b8d 2017-08-04T10:48:13.83-0700 [App/0] OUT Published message to exchange sample-exchange: 655b684a-8aab-41c3-8370-5dfe9ea8b64a 2017-08-04T10:48:14.83-0700 [App/0] OUT Published message to exchange sample-exchange: 19bbd3b9-dfc9-4c68-b854-ef68f2f82440
- Full Sample Consumer App
- This sample consumer app will retrieve the messages published by the above producer app and print them to the app's logs. It also demonstrates using the cfenv library to parse the service details from the environment.
- requirements.txt
-
pika>=0.10.0
- manifest.yml
-
applications: - name: python-sample-consumer command: python sample-consumer.py no-route: true mem: 256M disk: 1024M instances: 1
- Procfile
-
web: python sample-consumer.py
- sample-producer.py
-
import sys import json import pika from cfenv import AppEnv app_env = AppEnv() rabbit_service = app_env.get_service(label='predix-rabbitmq') amqp_credentials = rabbit_service.credentials['protocols']['amqp'] # Configure the authentication credentials rabbit_creds = pika.PlainCredentials(amqp_credentials['username'], amqp_credentials['password']) # Set up connection parameters params = pika.ConnectionParameters(host=amqp_credentials['host'], port=amqp_credentials['port'], virtual_host=amqp_credentials['vhost'], ssl=amqp_credentials['ssl'], credentials=rabbit_creds, heartbeat_interval=30) # Open the connection and create a channel connection = pika.BlockingConnection(params) channel = connection.channel() # Declare the exchange and the queue. Bind the queue to the exchange with a routing key. queue_name = 'predix-message-queue-python-sample' channel.exchange_declare(exchange='sample-exchange', type='direct') channel.queue_declare(queue=queue_name, exclusive=False) channel.queue_bind(exchange='sample-exchange', queue=queue_name, routing_key='test') # Define a callback method to use for handling messages def callback(ch, method, properties, body): print("Received message with routing key {0}: {1}".format(method.routing_key, body)) print("Listening for messages on queue {0}".format(queue_name)) # Set up the consumer and begin listening for messages. channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
- Sample Log Output
-
2017-08-04T10:47:10.97-0700 [DEA/10] OUT Starting app instance (index 0) with guid f37adf39-15f0-45cb-9028-b9ebe9981cd3 2017-08-04T10:47:13.46-0700 [App/0] OUT Listening for messages on queue predix-messagae-queue-python-sample 2017-08-04T10:48:11.58-0700 [App/0] OUT Received message with routing key test: 5769f6a3-2587-4202-b5c4-050aaf8ea16f 2017-08-04T10:48:12.59-0700 [App/0] OUT Received message with routing key test: e83743d8-9d04-4c44-8f88-b3216b890b8d 2017-08-04T10:48:13.59-0700 [App/0] OUT Received message with routing key test: 655b684a-8aab-41c3-8370-5dfe9ea8b64a 2017-08-04T10:48:14.59-0700 [App/0] OUT Received message with routing key test: 19bbd3b9-dfc9-4c68-b854-ef68f2f82440
PHP Code Sample for Predix Message Queue
Recommended Library
Alternative Libraries
Code Sample using amqplib
- Install the Library
- To ensure that CloudFoundry will install
php-amqplib
when the app is uploaded add the following entry to thecomposer.json
file:{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
- Discover the Bound Predix RabbitMQ Service in PHP
GO Code Sample for Predix Message Queue
Recommended Library
Alternative Libraries
Rabbit Hole (RabbitMQ HTTP API client for GO)
Code Sample using GO RabbitMQ
- Install the Library
- Install the library using
go get
:go get github.com/streadway/amqp
- Discover the Bound Predix Message Queue Service in GO
- You can use
os.Getenv
and theencoding/json
library to parse the credentials from theVCAP_SERVICES
environment variable:package main import ( "os" "encoding/json" ) func failOnError(err error, message string) { if err != nil { fmt.Println(fmt.Sprintf("%s: %s\n", message, err)) os.Exit(1) } } func main() { // Parse the VCAP_SERVICES variable to map[string]interface{} with json vcapServicesString := os.Getenv("VCAP_SERVICES") var services interface{} err := json.Unmarshal([]byte(vcapServicesString), &services) failOnError(err, "Failed to parse VCAP_SERVICES to JSON") // Get the amqp protocol credentials amqpCreds, ok := services. (map[string]interface{})["predix-rabbitmq"]. ([]interface{})[0]. (map[string]interface{})["credentials"]. (map[string]interface{})["protocols"]. (map[string]interface{})["amqp"]. (map[string]interface{}) if !ok { fmt.Println("Failed to parse credentials from the VCAP_SERVICES environment.") os.Exit(1) } }
- Set Up the RabbitMQ Connection
- Create the connection and channel, and declare an exchange:
// Open a connection to RabbitMQ using the URI conn, err := amqp.Dial(amqpCreds["uri"].(string)) failOnError(err, fmt.Sprintf("Failed to connect to RabbitMQ with uri %v", amqpCreds["uri"])) defer conn.Close() // Declare a channel channel, err := conn.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() // Declare a direct exchange err = channel.ExchangeDeclare( "sample-exchange", // name "direct", // type false, // durable false, // autoDelete false, // internal false, // noWait nil, // args ) failOnError(err, "Failed to declare an exchange")
- Publishing Messages
-
// Convert the message string to []byte message := "Hello!" body := []byte(message) // Publish a message err := channel.Publish( "sample-exchange", // exchange "test", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: body, }) failOnError(err, "Failed to publish message")
- Consuming Messages
- To consume messages first declare a queue, then bind it to the exchange. This example creates a goroutine to handle incoming messages, and uses an unbuffered channel to block the thread while waiting to receive them.
queue, err := channel.QueueDeclare( "predix-rabbitmq-go-sample", // name false, // durable false, // autoDelete true, // exclusive false, // noWait nil, // args ) failOnError(err, "Failed to declare a queue") // Bind the queue to the exchange with a routing key err = channel.QueueBind(queue.Name, "test", "sample-exchange", false, nil) failOnError(err, "Failed to bind the queue") messages, err := channel.Consume( queue.Name, // queue name "", // consumer true, // autoAck false, // exclusive false, // noLocal false, // noWait nil, // args ) failOnError(err, "Failed to register the consumer") // Declare an empty channel to use to block the thread while waiting for messages forever := make(chan bool) // Use a goroutine to handle incoming messages go func() { for m := range messages { out := fmt.Sprintf("Received message from exchange %v with routing key '%v': %s", m.Exchange, m.RoutingKey, string(m.Body)) fmt.Println(out) } }() fmt.Println(fmt.Sprintf("Listening for messages on %v", queue.Name)) <-forever // Listen on the channel to block the thread
Backups for Predix Message Queue
Backup Overview
The Predix Message Queue service instance configuration is exported and saved on an hourly basis.
What is Backed Up
- Current RabbitMQ Configuration Data
- The backup process exports the current server state using data returned by calls made to the
/api/definitions/<vhost>
Management API REST interface. Data retuned from his API call will include the following information:
What is Not Backed Up
RabbitMQ doesn't provide an interface to backup or export data stored in 'Durable Queues'.
- Failure Scenarios in which Data Loss is Possible (Durable Queues)
-
Single Node plans, when a VM's data partition is destroyed, RabbitMQ server configuration will be restored, however, the data partition will be empty and can't be restored. Durable Queue Data will be lost!
Recovery Time Objective (RTO)_
The Recovery Time Objective for the Predix Message Queue is four hours. In the event of a catastrophic failure, your instance should be restored to service within 4 hours.
Recovery Point Objective (RPO)
The Recovery Point Objective for the Predix Message Queue is one hour. In the event of a catastrophic failure in which data recovery is necessary, you should expect no more than 60 minutes of RabbitMQ Server configuration data loss. See failure scenarios which detail events which could contribute to data loss.
Migrating from an Existing Service or Plan to a New Plan
Migrate your messaging to a new service plan with Predix Message Queue.
Migrating Data to the New Service
Currently there is no supported method for migrating any existing state or messages from another RabbitMQ service. You will need to recreate any Exchanges, Queues, Bindings, or Policies you wish to use with the new service.
Changing from an Existing Plan to a New Plan
To migrate from a previous RabbitMQ plan perform the steps provided here. A brief downtime for your application will be required, so plan the migration accordingly.
Procedure
- Create a new Predix Message Queue service instance. See pmq-get-started.html#task_v2r_ty4_sbb for details.
- Stop the app.
- Unbind the existing RabbitMQ service instance that you want to replace with Predix Message Queue.
- Bind the new Predix Message Queue service instance to the app.
- Restart the application.