Using Cloud Message Queue Service

About Using Cloud Message Queue Service

Using the Cloud Message Queue service, you can connect your application with the ActiveMQ broker and create a message queue to send or receive messages. The subsequent topics provide code examples for working with Cloud Message Queue in the following programming languages:
Tip:
  • The transport endpoint for ActiveMQ Advanced Message Queuing Protocol (AMQP) starts with amqp+ssl. When creating an AMQP client, you must replace amqp+ssl with amqps in your application code.
  • If you choose to subscribe to a High Availability (HA) plan, you must use the Failover Transport protocol. For more information on the Failover Transport protocol, refer to the ActiveMQ documentation.

Connect to ActiveMQ Broker using Java Message Service (JMS)

Before You Begin

  • Ensure that Java is installed and configured in your system.
  • In the project directory, modify the pom.xml file as follows to add the activemq-client.jar and activemq-pool.jar packages to your Java class path:
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.8</version>
        </dependency>
    </dependencies>

About This Task

Cloud Message Queue supports JMS, which allows you to create message queues, and send or receive messages within a distributed communication system that is loosely coupled, reliable, and asynchronous. This topic describes how to connect your application with the message broker using JMS.

Procedure

  1. Create a message producer to send messages.
  2. Create a message consumer to receive messages.

Create Message Producer using Java Message Service (JMS)

Before You Begin

  • Ensure that Java is installed and configured in your system.
  • In the project directory, ensure that the pom.xml file is modified as follows to add the activemq-client.jar and activemq-pool.jar packages to your Java class path:
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.8</version>
        </dependency>
    </dependencies>

About This Task

This topic describes how to create a message producer using JMS. Additionally, you can create a connection to the message broker, create a queue, and send messages to the message consumer.

Note: This topic provides code examples to demonstate how to create a message producer using JMS.

Procedure

  1. Run the following command to add the library objects to the ActiveMQ web project:
    import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import org.apache.activemq.jms.pool.PooledConnectionFactory;
  2. Create a connection factory for the message producer using the ActiveMQ endpoint.
    Hashtable<Object, Object> env = new Hashtable<Object, Object>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    env.put("connectionfactory.factoryLookup", endpoint);
    javax.naming.Context context = new javax.naming.InitialContext(env);
    
    // Create a connection factory.
    ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
    
    // Create a pooled connection factory.
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections(10);
    
    // Establish a connection for the producer.
    Connection producerConnection = pooledConnectionFactory.createConnection(username, password);
    producerConnection.start();
  3. Create a session, a message queue, and a message producer to send messages to the destination application.
    The following code example will create a message queue named QUEUE:
    // Create a session.
    Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);
    
    // Create a queue named "QUEUE".
    Destination producerDestination = producerSession.createQueue(QUEUE);
    
    // Create a producer from the session to the queue.
    MessageProducer producer = producerSession.createProducer(producerDestination);
    producer.setDeliveryMode(DELIVERY_MODE);
  4. Register the message producer to send messages to the destination application.
    long totalMillis  = 0;
    while (totalMillis < RUN_MILLISECONDS) {
        // Generate a random UUID to use for the message
        String message =  UUID.randomUUID().toString();
        // byte[] messageBodyBytes = message.getBytes();
        System.out.println(String.format("Publishing message to exchange %s: %s", QUEUE, message));
        TextMessage producerMessage = producerSession.createTextMessage(message);
        // Send the message.
        producer.send(producerMessage);
    
        Thread.sleep(INTERVAL);
        totalMillis += INTERVAL;
    }
  5. Close the producer, session, and connection.
    producer.close();
    producerSession.close();
    producerConnection.close();
    pooledConnectionFactory.stop();

Create Message Consumer using Java Message Service (JMS)

Before You Begin

  • Ensure that Java is installed and configured in your system.
  • In the project directory, ensure that the pom.xml file is modified as follows to add the activemq-client.jar and activemq-pool.jar packages to your Java class path:
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.8</version>
        </dependency>
    </dependencies>

About This Task

This topic describes how to create a message consumer using JMS. Additionally, you can create a connection to the message broker, create a queue, and receive messages from the ActiveMQ message queue.

Note: This topic provides code examples to demonstate how to create a message consumer using JMS.

Procedure

  1. Run the following command to add the library objects to the ActiveMQ web project:
    importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.DeliveryMode; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.Message; importjavax.jms.MessageConsumer; importjavax.jms.MessageProducer; importjavax.jms.MessageListener; importjavax.jms.Session; importjavax.jms.TextMessage; importjavax.naming.Context; importjavax.naming.NamingException;
  2. Create a message listener to receive the messages asynchonously.
    class ConsumerMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("Message received : " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }    
    }
  3. Create a connection factory for the message consumer using the endpoint of the ActiveMQ message broker.
    Hashtable<Object, Object> env =newHashtable<Object, Object>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    env.put("connectionfactory.factoryLookup", endpoint);
    javax.naming.Context context =newjavax.naming.InitialContext(env);
    
    // Create a connection factory.ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
    
    // Establish a connection for the consumer.Connection consumerConnection = connectionFactory.createConnection(username, password);
    consumerConnection.start();
  4. Create a session, a message queue, and a message consumer to receive messages from the ActiveMQ message queue.
    The following code example will create a message queue named QUEUE:
    // Create a session.
    Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);
    
    // Create a queue named "QUEUE".
    Destination consumerDestination = consumerSession.createQueue(QUEUE);
    
    // Create a message consumer from the session to the queue.
    MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    consumer.setMessageListener(new ConsumerMessageListener());
    Thread.sleep(100000);
  5. Close the consumer, session, and connection.
    consumer.close();
    consumerSession.close();
    consumerConnection.close();

Connect to ActiveMQ Broker with Golang

Before You Begin

Run the following command to import the library package for AMQP 1.0 into your application.
import "pack.ag/amqp"

About This Task

The Cloud Message Queue service supports AMQP 1.0, which allows you to connect your application with the message queue broker, create message queues, and send or receive messages.

Code Examples for Connecting to ActiveMQ Broker using AMQP 1.0

  • The following code example demonstrates how to create a message producer, and then send messages to the destination application using the message queue:
    // Create session
    client, err := amqp.Dial("amqps://"+host+":"+port,
    	amqp.ConnSASLPlain(username, password),
    )
    if err != nil {
    	fmt.Println(fmt.Sprintf("Dialing AMQP server:", err))
    }
    defer client.Close()
    
    // Open a session
    session, err := client.NewSession()
    ctx := context.Background()
    
    queue := "test-queue"
    
    sender, err := session.NewSender(
    	amqp.LinkTargetAddress(queue),
    )
    if err != nil {
    	fmt.Println(fmt.Sprintf("Cannot create session, error:", err))
    }
    
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    Sending messages to queue:
    
    totalSeconds := 0
    maxSeconds := 30
    for totalSeconds < maxSeconds {
    	// Generate random UUID for message body. Convert to []byte
    	randomUUID := uuid.NewV4()
    	body := []byte(randomUUID.String())
    
    	// Publish a message
    	output := fmt.Sprintf("Published message to test-queue: %s", body)
    	fmt.Println(output)
    
    	err = sender.Send(ctx, amqp.NewMessage([]byte(body)))
    	if err != nil {
    		fmt.Println(fmt.Sprintf("Failed to publish message, error:", err))
    	}
    	time.Sleep(5 * time.Second)
    	totalSeconds += 5
    }
    
    // Close connection
    sender.Close(ctx)
    cancel()
  • The following code example demonstrates how to create a message receiver, and then receive messages using the message queue:
    // Create client
    client, err := amqp.Dial("amqps://"+host+":"+port,
    	amqp.ConnSASLPlain(username, password),
    )
    if err != nil {
    	fmt.Println(fmt.Sprintf("Dialing AMQP server:", err))
    }
    defer client.Close()
    
    // Open a session
    session, err := client.NewSession()
    if err != nil {
    	fmt.Println(fmt.Sprintf("Cannot create session, error:", err))
    }
    ctx := context.Background()
    
    queue := "test-queue"
    Create receiver and receive mesages
    
    // Create a receiver
    receiver, err := session.NewReceiver(
    	amqp.LinkSourceAddress(queue),
    	amqp.LinkCredit(10),
    )
    if err != nil {
    	fmt.Println(fmt.Sprintf("Creating receiver link, error:", err))
    }
    
    defer func() {
    	ctx, cancel := context.WithTimeout(ctx, 35*time.Second)
    	receiver.Close(ctx)
    	cancel()
    }()
    
    for {
    	// Receive next message
    	msg, err := receiver.Receive(ctx)
    	if err != nil {
    		fmt.Println(fmt.Sprintf("Reading message from AMQP error:", err))
    	}
    
    	// Accept message
    	msg.Accept()
    
    	fmt.Println(fmt.Sprintf("Message received: %s\n", msg.GetData()))
    }

About Connecting the ActiveMQ Broker with Ruby

The Cloud Message Queue service supports the following communication protocols:

If your application is built on Ruby, you can use one of these protocols to connect to the ActiveMQ broker and send or receive messages.

Note: All other transport protocols except AMQP 1.0 are supported with Ruby. Currently, no official Ruby library release supports AMQP 1.0.

Connect to ActiveMQ Broker using Simple (or Streaming) Text Oriented Messaging Protocol (STOMP)

Before You Begin

Run the following command to import the STOMP library packages to your application:
require 'stomp'

About This Task

The Cloud Message Queue service supports STOMP, which allows you to connect your application with the message queue broker, and send or receive messages.
Note: This topic provides code examples to demonstate how to connect your application with the message queue broker using STOMP.

Procedure

  1. Run the following command to define the SSL client certificate components (that are, public key and private certificate) in the Privacy Enhanced Mail (PEM) file:
    openssl req -newkey rsa:2048 -nodes -keyout privateKey.key -x509 -days 365 -out certificate.pem
    
    ssl_opts = Stomp::SSLParams.new(
    
    :key_file => "#{File.expand_path(File.dirname(File.dirname(__FILE__)))}/certificate/privateKey.key",
    
    :cert_file => "#{File.expand_path(File.dirname(File.dirname(__FILE__)))}/certificate/certificate.pem",
    
    :fsck => true,
    
    :ciphers => ciphers_list
    
    )
    Note: The following list of ciphers are supported with the STOMP client:
    ciphers_list = [
    
    ["DHE-RSA-AES256-SHA", "TLSv1/SSLv3", 256, 256],
    
    ["DHE-DSS-AES256-SHA", "TLSv1/SSLv3", 256, 256],
    
    ["AES256-SHA", "TLSv1/SSLv3", 256, 256],
    
    ["EDH-RSA-DES-CBC3-SHA", "TLSv1/SSLv3", 168, 168],
    
    ["EDH-DSS-DES-CBC3-SHA", "TLSv1/SSLv3", 168, 168],
    
    ["DES-CBC3-SHA", "TLSv1/SSLv3", 168, 168],
    
    ["DHE-RSA-AES128-SHA", "TLSv1/SSLv3", 128, 128],
    
    ["DHE-DSS-AES128-SHA", "TLSv1/SSLv3", 128, 128],
    
    ["AES128-SHA", "TLSv1/SSLv3", 128, 128],
    
    ["RC4-SHA", "TLSv1/SSLv3", 128, 128],
    
    ["RC4-MD5", "TLSv1/SSLv3", 128, 128],
    
    ["EDH-RSA-DES-CBC-SHA", "TLSv1/SSLv3", 56, 56],
    
    ["EDH-DSS-DES-CBC-SHA", "TLSv1/SSLv3", 56, 56],
    
    ["DES-CBC-SHA", "TLSv1/SSLv3", 56, 56],
    
    ["EXP-EDH-RSA-DES-CBC-SHA", "TLSv1/SSLv3", 40, 56],
    
    ["EXP-EDH-DSS-DES-CBC-SHA", "TLSv1/SSLv3", 40, 56],
    
    ["EXP-DES-CBC-SHA", "TLSv1/SSLv3", 40, 56],
    
    ["EXP-RC2-CBC-MD5", "TLSv1/SSLv3", 40, 128],
    
    ["EXP-RC4-MD5", "TLSv1/SSLv3", 40, 128]
    
    ]
  2. Create the STOMP client and connect the client with your application.
    hash = {
    
    :hosts => [
    
       {
    
         :login => stomp_credentials['username'],
    
         :passcode => stomp_credentials['password'],
    
         :host => stomp_credentials['host'],
    
         :port => stomp_credentials['port'],
    
         :ssl => ssl_opts
    
       }
    
    ],
    
    :reliable => false,
    
    :start_timeout => 0,
    
    :connect_headers => {"accept-version" => "1.1,1.2", "host" => "vhost", "heart-beat" => "5000,10000" }
    
    }
    
    STDOUT.puts "Creating client"
    
    
    
    client = Stomp::Client.new(hash)
    
    conn_frame = client.connection_frame()
  3. Ensure that the application and the STOMP client are connected.
    if conn_frame.command == Stomp::CMD_ERROR
    
    STDOUT.puts "Unable to connect...\n"
    
    raise conn_frame.body
    
    end
    
    puts "-" * 50
    
    puts "Connect version - #{conn_frame.headers['version']}"
    
    puts "Connect server   - #{conn_frame.headers['server']}"
    
    puts "Session ID       - #{conn_frame.headers['session']}"
    
    puts "Heartbeats       - #{conn_frame.headers['heart-beat']}"
    
    puts "SSL Verify       - #{ssl_opts.verify_result}"
    
    puts "-" * 50 , "\n\n"
  4. Perform the tests as specified in the following examples to ensure that the messages are sent and received from the message queue.
    The following code example demonstrates how to send the message text, Hello World!, to the message queue:
    client.publish( "/queue/myqueue", "Hello World!")
    The following code example demonstrates how to subscribe to a destination application to receive messages from the message queue:
    begin
    
    client.subscribe("/queue/myqueue", {:ack => "client", "activemq.prefetchSize" => 10000,:browser => "false"} ) do |message|
    
       STDOUT.puts "Received message from the queue: #{message.body}"
    
    end
    
    rescue Interrupt => _
    
    client.close
    
    end

Connect to ActiveMQ Broker using Message Queuing Telemetry Transport (MQTT)

Before You Begin

Run the following command to import the MQTT library packages to your application:
require 'mqtt'

About This Task

The Cloud Message Queue service supports the MQTT protocol, which allows you to connect your application with the message queue broker, and send or receive messages.
Note: This topic provides code examples to demonstate how to connect your application with the message queue broker using the MQTT protocol.

Procedure

  1. Run the following command to define the SSL client certificate components (that are, public key and private certificate) in the Privacy Enhanced Mail (PEM) file:
    openssl req -newkey rsa:2048 -nodes -keyout privateKey.key -x509 -days 365 -out certificate.pem
  2. Create the MQTT client and connect the client with your application.
    client = MQTT::Client.new
    
    client.host = mqtt_credentials['host']
    
    client.username = mqtt_credentials['username']
    
    client.password = mqtt_credentials['password']
    
    client.port = mqtt_credentials['port']
    
    client.ssl = true
    
    client.cert_file = "#{File.expand_path(File.dirname(File.dirname(__FILE__)))}/certificate/certificate.pem"
    
    client.key_file = "#{File.expand_path(File.dirname(File.dirname(__FILE__)))}/certificate/privateKey.key"
  3. Perform the tests as specified in the following examples to ensure that the messages are sent and received from the message queue.
    The following code example demonstrates how to send a message to the message queue:
    client.connect do
    
       client.publish(queue, message)
    
    end
    The following code example demonstrates how to subscribe to a destination application to receive messages from the message queue:
    client.connect do
    
    client.subscribe(queue)
    
    # wait 10 messages
    
    10.times.each do
    
       _topic, message = client.get
    
       STDOUT.puts "Received message from the queue: #{message}"
    
    end
    
    end