Kaazing WebSocket Gateway

Orange guy with box of questions

How to Use the JavaScript AmqpClient Client Library

Technologies used: AMQP 0-9 and JavaScript

This document contains the following sections:

Overview of AMQP 0-9

Advanced Message Queuing Protocol (AMQP) is an open standard for messaging middleware that was originally designed by the financial services industry to provide an interoperable protocol for managing the flow of enterprise messages. To guarantee messaging interoperability, AMQP defines both a wire-level protocol and a model--the AMQP Model--of messaging capabilities.

The AMQP Model defines three main components:

  1. Exchange--clients publish messages to an exchange
  2. Queue--clients read messages from a queue
  3. Binding--a mapping from an exchange to a queue

An AMQP client connects to an AMQP broker and opens a channel. Once the channel is established, the client can send messages to an exchange and receive messages from a queue. To learn more about AMQP functionality, take a look at Kaazing's Real-Time Interactive Guide to AMQP, an interactive guide that takes you step-by-step through the main features of AMQP version 0-9.

For more information about AMQP, visit http://www.amqp.org.

WebSocket and AMQP

WebSocket enables direct communication from the browser to an AMQP broker. Kaazing Gateway radically simplifies Web application design by providing the AmqpClient client libraries for the JavaScript, Adobe Flex, and Microsoft Silverlight client technologies. Web developers can code directly against the back-end AMQP broker without the need for custom Servlets or server-side programming.

Using the AmqpClient client libraries, you can take advantage of the AMQP features, making the browser a first-class citizen in AMQP systems (similar to C, Java, Python, and other clients). This means that you can now run AMQP clients directly in a browser.

The AmqpClient libraries use Kaazing's ByteSocket client library, because AMQP messages use a binary format. Since the implementation is layered on top of ByteSocket, which uses WebSocket--and thus the entire stack of Kaazing HTML 5 Communications client libraries--applications developed using the AmqpClient libraries are provided with guaranteed persistence, reliability, and message-receipt acknowledgment all the way to the browser.

Taking a Look at the AMQP Demo

Before you start, take a look at a demonstration that was built with the JavaScript version of the AmqpClient client library. To see this demo in action, perform the following steps:

  1. Start the Kaazing WebSocket Gateway demo bundle as described in Getting Started.
  2. In a browser, navigate to http://localhost:8000/demo/demo.html#amqp.
    From this page, you can navigate to additional AMQP demos for different client technologies (Adobe Flex and Microsoft Silverlight).

The demo requires a connection to an AMQP broker. The next section describes how to set up a local AMQP broker.

Setting Up an AMQP Broker

There is a wide variety of AMQP brokers available that implement different AMQP versions. For example, OpenAMQ, Apache Qpid, Red Hat Enterprise MRG, RabbitMQ, ØMQ, and Zyre. If you do not have an AMQP broker installed yet, you can use Apache Qpid AMQP broker, which supports AMQP version 0-9. To set up the Apache Qpid broker on your system, perform the following steps:

  1. Download the Apache Qpid (the broker or the full release) from http://qpid.apache.org.
  2. Unpack the distribution into a directory of your choice (for example, C:\qpid). This directory contains the AMQP broker and its components and is referred to as AMQP_HOME in the Kaazing documentation.
  3. Follow the steps in the Apache Qpid Getting Started to start the broker.
    Note: You may have to run a script to create the example SSL stores before you can start the broker. This script creates the Qpid keystore and truststore files that must be present in the AMQP_HOME/etc directory before you can start the broker.
  4. Start Kaazing Gateway as described in Getting Started.
  5. Open a browser and navigate to http://localhost:8000/demo/demo.html#amqp and log in to the demo to verify that it is working.

Configuring Kaazing Gateway to Connect to an AMQP Broker

Note: If you have the Kaazing WebSocket Gateway demo bundle running on localhost and if you have an AMQP broker running on localhost at the default AMQP port 5672, you do not have to configure anything to see the AMQP demos and the interactive AMQP guide.

The following is an example of the default configuration element for the AMQP service in the Kaazing WebSocket Gateway demo bundle, as specified in the configuration file KAAZING_HOME/conf/gateway-config.xml:

<!-- Proxy service to AMQP server -->
<service>
  <accept>ws://localhost:8000/amqp</accept>
  <accept>wss://localhost:9000/amqp</accept>

  <type>proxy</type>
  <properties>
    <connect>tcp://localhost:5672</connect>
  </properties>

  <cross-site-constraint>
    <allow-origin>http://localhost:8000</allow-origin>
  </cross-site-constraint>
  <cross-site-constraint>
    <allow-origin>https://localhost:9000</allow-origin>
  </cross-site-constraint>
</service>

In this case, the service is configured to accept WebSocket AMQP requests from the browser at ws://localhost:8000/amqp and securely at wss://localhost:9000/amqp and proxy those requests to a locally installed AMQP broker (localhost) at port 5672.

To configure Kaazing Gateway to accept WebSocket requests at another URL or to connect to a different AMQP broker, you can edit KAAZING_HOME/conf/gateway-config.xml, update the values for the accept elements, change the connect property, and restart Kaazing Gateway. For example, the following configuration configures Kaazing Gateway to accept WebSocket AMQP requests at ws://www.example.com:80/amqp and proxy those requests to an AMQP broker (amqp.example.com) on port 5672.

<!-- Proxy service to AMQP server -->
<service>
  <accept>ws://www.example.com:80/amqp</accept>

  <type>proxy</type>
  <properties>
    <connect>tcp://amqp.example.com:5672</connect>
  </properties>
</service>

Using the JavaScript AmqpClient Library

This section contains the following subsections:

Create the AmqpClient Object

First, create an AmqpClient client object. Before you create the client object, open your Web application in your favorite editor and import the appropriate client libraries. In the <HEAD> section of your HTML Web application file, add the following script includes:

<script src="/html5/ByteSocket.js"></script>
<script src="/protocol/AmqpClient.js"></script>

Since AMQP messages are binary blobs, you must include a reference to the ByteSocket client library.

Next, create an instance of the AmqpClient object as shown in the following example.

var amqpClient = new AmqpClient();

Now that you have created an instance of the AmqpClient object, you can use the AMQP protocol commands to handle the interactions between the client and the AMQP broker. Refer to the AmqpClient JavaScript API documentation for the complete list of all the AMQP command and callback functions.

Connect to an AMQP Broker

Next, you must to connect and log in to an AMQP broker. The client generally manages all of its communication on a single connection to an AMQP broker. You establish a connection to an AMQP broker by passing in the broker address, a user name and password, the AMQP version you want to use, and, optionally, a virtual host name (the name of a collection of exchanges and queues hosted on independent server domains). These parameters are passed in when you call the connect method as shown in the following example.

amqpClient.connect(url, virtualHost, {username:user, password:pwd}, amqpVersion,
  openHandler);

Note: Version 0-9-1 is used if you do not specify an AMQP version number. If your AMQP broker does not support version 0-9-1, you must specify another version (for example, 0-9-0). Refer your AMQP broker documentation and to Release Notes for information about certified AMQP versions.

Create Channels

Once a connection to an AMQP broker has been established, the client must create a channel to communicate to the broker. A channel is a bi-directional connection between an AMQP client and an AMQP broker. AMQP is multi-channeled, which means that channels are multiplexed over a single network socket connection. Channels are light-weight and cheap, and therefore used in AMQP's exception handling mechanism--channels are closed when an exception occurs.The following example shows how you can create two channels (one for publishing to an exchange and one for consuming from a queue):

publishChannel = amqpClient.openChannel(publishChannelOpenHandler);
consumeChannel = amqpClient.openChannel(consumeChannelOpenHandler);

Declare an Exchange

AMQP messages are published to exchanges. Messages contain a routing key that contains the information about the message's destination. The exchange accepts messages and their routing keys and delivers them to a message queue. You can think of an exchange as an electronic mailman that delivers the messages to a mailbox (the queue) based on the address on the message's envelope (the routing key). Exchanges do not store messages.

AMQP defines different exchange types. Some of these exchange types (Direct, Fanout, and Topic) must be supported by all AMQP brokers while others (Headers and System) are optional. AMQP brokers can also support custom exchange types. The following are the different types of exchanges:

  • Direct--Messages are sent only to a queue that is bound with a binding key that matches the message's routing key
  • Fanout--Messages are sent to every queue that is bound to the exchange
  • Topic--Messages are sent to a queue based on categorical binding keys and wildcards
  • Headers--Messages are sent to a queue based on their header property values
  • System--Messages are sent to system services

Exchanges can be durable, meaning that the exchange survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the exchange lasts only until the broker is shut down. Finally, to check if an exchange exists on the AMQP broker (without actually creating it), you can create a passive exchange. The following example shows how you can create a direct exchange on the publish channel:

publishChannel.declareExchange(exchangeName, direct, passive, durable, noWait,
  declareExhangeHandler);

Note: In this example, the arguments passive, durable, and noWait represent boolean values.

After the exchange is created successfully, the declareExchangeHandler callback is called.

Declare a Queue

AMQP messages are consumed from queues. You can think of a queue as a mailbox; messages addressed to a particular address (the routing key) are placed in the mailbox for the consumer to pick up. If multiple consumers are bound to a single queue, only one of the consumers receives the message (the one that picked up the mail).

To check if a queue exists on the AMQP broker (without creating it), you can create a passive queue. Additionally, queues can be marked exclusive, which means that they are tied to a specific connection. If a queue is marked exclusive, it is deleted when the connection on which it was created is closed.

Queues can be durable, meaning that the queue survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the queue lasts only until the broker is shut down. Queues can also be marked auto delete, which means that the queue is automatically deleted when it is no longer in use. The following example shows how you can create a queue on the consume channel:

consumeChannel.declareQueue(myQueueName, passive, durable, exclusive, autoDelete,   noWait, declareQueueHandler);

Note: In this example, the arguments passive, durable, exclusive, autoDelete, and noWait represent boolean values.

After the queue is created successfully, the declareQueueHandler callback is called.

Bind an Exchange to a Queue

Once you have created an exchange and a queue in AMQP, you must bind--or map--one to the other so that messages published to a specific exchange are delivered to a particular queue. You bind a queue to an exchange with a routing key as shown in the following example.

publishChannel.bindQueue(myQueueName, exchangeName, routingKeyName, noWait,
  bindQueueHandler);

After the exchange is bound to the queue successfully, the bindQueueHandler callback is called.

Publish Messages

Messages are published to exchanges. The established binding rules (routing keys) then determine to which queue a message is delivered. Messages have content that consists of two parts:

  1. Content Header--A set of properties that describes the message
  2. Content Body--A blob of binary data

Additionally, messages can be marked mandatory to send a notification to the publisher in case a message cannot be delivered to a queue. You can also mark a message immediate so that it is returned to the sender if the message cannot be routed to a queue consumer immediately. The following example function shows how the content body of a message is added to a buffer (AMQP uses a binary message format) and published to an exchange using the publish channel:

var publishString = function(message, exchangeName, routingKey, mandatory,     immediate) {
  var body = new ByteBuffer();
  body.putString(message, Charset.UTF8);
  body.flip();
  var headers = {};

  publishChannel.publishBasic(body, headers, exchangeName, routingkey, mandatory,     immediate);
};

Note: The arguments mandatory and immediate use boolean values.

Consume Messages

Once messages are published, they can be consumed from a queue. A variety of options can be applied to messages in a queue. For example, publishers can choose to require acknowledgement (ack) of messages so that messages can be redelivered in the case of a delivery failure. If the queue is set to exclusive, it is scoped to just the current connection and deleted when the connection on which it was established is closed. Additionally, you can use the no local setting to notify the broker not to send messages to the connection on which the messages were published. The following example shows how you can consume messages from a queue on the consume channel:

consumeChannel.consumeBasic(myQueueName, myConsumerTag, noLocal, noAck, noWait,
  exclusive, consumeMessageHandler);

Note: In this example, the arguments noLocal, noAck, noWait, and exclusive represent boolean values.

After the consumeBasic method is successful, the onconsume event is raised, which calls the consumeMessageHandler callback. The AMQP broker can then start delivering messages to the client and these messages raise the message event, which calls a messageHandler callback. The following example shows how the messageHandler callback function retrieves information from the AmqpEvent object:

var messageHandler = function(m) {
  var body = m.body.getString(Charset.UTF8);
  log("MESSAGE: " + body);
};

Use Transactions

AMQP supports transactional messaging, through server local transactions. In a transaction the server only publishes a set of messages as one unit when the client commits the transaction. Transactions only apply to message publishing and not to the consumption of the messages.

Note: Once you commit or roll back a transaction on a channel, a new transaction is started automatically. For this reason you must commit all future messages you want to publish on that channel or create a new, non-transactional channel to publish messages on.

The following transaction-related methods can be used to work select (start), commit, and rollback a transaction.

publishChannel.selectTx(function() {log(output, "Select succeeded");});
publishChannel.commitTx(function() {log(output, "Commit succeeded");});
publishChannel.rollbackTx(function() {log(output, "Rolled back");});

After the transaction is successfully selected, committed, or rolled back, the corresponding events (selecttransaction, committransaction, and rollbacktransaction) are raised and the callback functions are called.

Control Message Flow

You can use flow control in AMQP to temporarily--or permanently--halt the flow of messages on a channel from a queue to a consumer. If you turn the message flow off, no messages is sent to the consumer. The following example shows how you can turn the flow of messages on a channel off and back on:

consumeChannel.flowChannel(false, function() {log(output, "Flow set to false");});
consumeChannel.flowChannel(true, function() {log(output, "Flow set to true");});

After the flow on a channel is halted or resumed successfully, the flow event is raised and the callback functions are called..

Handle Exceptions

Channels are light-weight and cheap, and therefore used in AMQP's exception handling mechanism--channels are closed when an exception occurs. The following example shows how you can handle exceptions that occur when the channel is closed:

consumeChannel.onclose = function(e) { log(e.args.replyText); }

When the channel is closed, the function that is called receives an AmqpEvent object, which contains a frame with detailed information about the exception.

Other Coding Styles

In this how-to, you have used a continuation-passing style of JavaScript programming. Additionally, you can use an event listener programming style or a callback properties programming style. The following example shows how you can declare an exchange using the event-listener programming style:

publishChannel.addEventListener("declareexchange", function(e) {log(output,   "Exchange declared");});

The following example shows how you can declare an exchange using the callback properties programming style:

publishChannel.ondeclareexchange = function(e) {log(output, "Exchange declared");};

You can also combine one or more of these programming styles.

Summary

As you have seen in this how-to, you can build Web applications that can communicate directly from the browser with an AMQP broker using the JavaScript AmqpClient client library. Thus, with the help of WebSocket the browser now enjoys the first-class citizenry of AMQP communications that has, until recently, been enjoyed only by desktop applications.

Refer to the AmqpClient JavaScript API documentation for the complete list of all the AMQP command and callback functions.