Kaazing WebSocket Gateway |
|
How to Use the Adobe Flex AmqpClient Client Library |
|
Technologies used: AMQP 0-9 and Adobe Flex |
This document contains the following sections:
- Overview of AMQP 0-9
- Overview of Adobe Flex
- WebSocket and AMQP
- Taking a Look at the AMQP Demo
- Setting Up an AMQP Broker
- Configuring Kaazing Gateway to Connect to an AMQP Broker
- Setting Up Your Flex Development Environment
- Using the Flex AmqpClient Library
- Other Coding Styles
- Summary
Overview of AMQP 0-9
Advanced Message Queuing Protocol (AMQP) is an open standard for messaging middleware that was originally designed by the financial services industry to provide an interoperable protocol for managing the flow of enterprise messages. To guarantee messaging interoperability, AMQP defines both a wire-level protocol and a model--the AMQP Model--of messaging capabilities.
The AMQP Model defines three main components:
- Exchange--clients publish messages to an exchange
- Queue--clients read messages from a queue
- Binding--a mapping from an exchange to a queue
An AMQP client connects to an AMQP broker and opens a channel. Once the channel is established, the client can send messages to an exchange and receive messages from a queue. To learn more about AMQP functionality, take a look at Kaazing's Real-Time Interactive Guide to AMQP, an interactive guide that takes you step-by-step through the main features of AMQP version 0-9.
For more information about AMQP, visit http://www.amqp.org.
Overview of Adobe Flex
Adobe Flex (Flex) is a software development kit (SDK) from Adobe Systems. You use Flex to create Web applications that use the Adobe Flash browser plugin. Flex applications can be created in Adobe Flex Builder or with Adobe's free Flex compiler toolkit.
For more information about Adobe Flex, visit http://www.adobe.com/products/flex/.
WebSocket and AMQP
WebSocket enables direct communication from the browser to an AMQP broker. Kaazing Gateway radically simplifies Web application design by providing the AmqpClient client libraries for the JavaScript, Microsoft Silverlight, and Flex client technologies. Web developers can code directly against the back-end AMQP broker without the need for custom Servlets or server-side programming.
Using the AmqpClient client libraries, you can take advantage of the AMQP features, making the browser a first-class citizen in AMQP systems (similar to C, Java, Python, and other clients). This means that you can now run AMQP clients directly in a browser.
The AmqpClient libraries use Kaazing's ByteSocket client library, because AMQP messages use a binary format. Since the implementation is layered on top of ByteSocket, which uses WebSocket--and thus the entire stack of Kaazing HTML 5 Communications client libraries--applications developed using the AmqpClient libraries are provided with guaranteed persistence, reliability, and message-receipt acknowledgment all the way to the browser.
Taking a Look at the AMQP Demo
Let's take a look at a demonstration that was built with the Flex version of the AmqpClient client library. To see this demo in action, perform the following steps:
- Start the Kaazing WebSocket Gateway demo bundle as described in Getting Started.
- In a browser, navigate to http://localhost:8000/demo/demo.html#amqp-flash.
From this page, you can navigate to additional AMQP demos for different client technologies (JavaScript and Microsoft Silverlight).
The demo requires a connection to an AMQP broker. The next section describes how to set up a local AMQP broker.
Setting Up an AMQP Broker
There is a wide variety of AMQP brokers available that implement different AMQP versions. For example, OpenAMQ, Apache Qpid, Red Hat Enterprise MRG, RabbitMQ, ØMQ, and Zyre. If you do not have an AMQP broker installed yet, you can use Apache Qpid AMQP broker, which supports AMQP version 0-9. To set up the Apache Qpid broker on your system, perform the following steps:
- Download the Apache Qpid (the broker or the full release) from http://qpid.apache.org.
- Unpack the distribution into a directory of your choice (for example, C:\qpid). This directory contains the AMQP broker and its components and is referred to as AMQP_HOME in the Kaazing documentation.
- Follow the steps in the Apache Qpid Getting Started to start the broker.
Note: You may have to run a script to create the example SSL stores before you can start the broker. This script creates the Qpid keystore and truststore files that must be present in the AMQP_HOME/etc directory before you can start the broker. - Start Kaazing Gateway as described in Getting Started.
- Open a browser and navigate to http://localhost:8000/demo/demo.html#amqp and log in to the demo to verify that it is working.
Configuring Kaazing Gateway to Connect to an AMQP Broker
Note: If you have the Kaazing WebSocket Gateway demo bundle running on localhost and if you have an AMQP broker running on localhost at the default AMQP port 5672, you do not have to configure anything to see the AMQP demos and the interactive AMQP guide.
The following is an example of the default configuration element for the AMQP service in the Kaazing WebSocket Gateway demo bundle, as specified in the configuration file KAAZING_HOME/conf/gateway-config.xml:
<!-- Proxy service to AMQP server -->
<service>
<accept>ws://localhost:8000/amqp</accept>
<accept>wss://localhost:9000/amqp</accept>
<type>proxy</type>
<properties>
<connect>tcp://localhost:5672</connect>
</properties>
<cross-site-constraint>
<allow-origin>http://localhost:8000</allow-origin>
</cross-site-constraint>
<cross-site-constraint>
<allow-origin>https://localhost:9000</allow-origin>
</cross-site-constraint>
</service>
In this case, the service is configured to accept WebSocket AMQP requests from the browser at ws://localhost:8000/amqp and securely at wss://localhost:9000/amqp and proxy those requests to a locally installed AMQP broker (localhost) at port 5672.
To configure Kaazing Gateway to accept WebSocket requests at another URL or to connect to a different AMQP broker, you can edit KAAZING_HOME/conf/gateway-config.xml, update the values for the accept elements, change the connect property, and restart Kaazing Gateway. For example, the following configuration configures Kaazing Gateway to accept WebSocket AMQP requests at ws://www.example.com:80/amqp and proxy those requests to an AMQP broker (amqp.example.com) on port 5672.
<!-- Proxy service to AMQP server -->
<service>
<accept>ws://www.example.com:80/amqp</accept>
<type>proxy</type>
<properties>
<connect>tcp://amqp.example.com:5672</connect>
</properties>
</service>
Setting Up Your Flex Development Environment
To develop an Flex application, you must download the Adobe Flex 3 SDK. The Adobe Flex 3 SDK can be used to compile ActionScript source files into a SWF file--which is the target application. You can download the Adobe Flex 3 SDK from the Adobe download site and install it into your local drive.
Using the Flex AmqpClient Library
This section contains the following subsections:
- Create the AmqpClient Object
- Connect to an AMQP Broker
- Create Channels
- Declare an Exchange
- Declare a Queue
- Bind an Exchange to a Queue
- Publish Messages
- Consume Messages
- Use Transactions
- Control Message Flow
- Handle Exceptions
- Compile and Run the XMPP-Based Flex Application
Create the AmqpClient Object
First, let's create an AmqpClient client object. Before you create the client object, add the following import statements in your application's .MXML file:
import com.kaazing.gateway.client.protocol.amqp.AmqpBuffer;
import com.kaazing.gateway.client.html5.Charset;
import com.kaazing.gateway.client.html5.ByteBuffer;
import com.kaazing.gateway.client.protocol.amqp.AmqpArguments;
import com.kaazing.gateway.client.protocol.amqp.AmqpEvent;
import com.kaazing.gateway.client.protocol.amqp.AmqpChannel;
import com.kaazing.gateway.client.protocol.amqp.AmqpClient;
Next, declare a variable and create an instance of the AmqpClient object as shown in the following example.
private var client:AmqpClient = new AmqpClient();
Now that you have created an instance of the AmqpClient object, you can use the AMQP protocol commands. To handle open and close events, add two event handlers--one for the OPEN event and one for the CLOSE event, as shown in the following example.
client.addEventListener(AmqpEvent.OPEN, openHandler);
client.addEventListener(AmqpEvent.CLOSE, closeHandler);
Refer to the AmqpClient ActionScript API documentation for the complete list of all the AMQP command and callback functions.
Connect to an AMQP Broker
Next, you must to connect and log in to an AMQP broker. The client generally manages all of its communication on a single connection to an AMQP broker. You establish a connection to an AMQP broker by passing in the broker address, a user name and password, the AMQP version you want to use, and, optionally, a virtual host name (the name of a collection of exchanges and queues hosted on independent server domains). These parameters are passed in when you call the connect method as shown in the following example.
client.connect(location, virtualHost, {username:user, password:pwd}, amqpVersion);
Note: Version 0-9-1 is used if you do not specify an AMQP version number. If your AMQP broker does not support version 0-9-1, you must specify another version (for example, 0-9-0). Refer your AMQP broker documentation and to Release Notes for information about certified AMQP versions.
Create Channels
Once a connection to an AMQP broker has been established, the client must create a channel to communicate to the broker. A channel is a bi-directional connection between an AMQP client and an AMQP broker. AMQP is multi-channeled, which means that channels are multiplexed over a single network socket connection. Channels are light-weight and cheap, and therefore used in AMQP's exception handling mechanism--channels are closed when an exception occurs. First, declare variables for two channels (one for publishing to an exchange and one for consuming from a queue) as shown in the following example.
private var publishChannel:AmqpChannel;
private var consumeChannel:AmqpChannel;
Then, create the channels as shown in the following example.
private function openHandler(e:AmqpEvent):void {
publishChannel = client.openChannel();
consumeChannel = client.openChannel();
}
Once you have created the channels, you can add event handlers for various channel events as shown in the following example.
publishChannel.addEventListener(AmqpEvent.DECLAREEXCHANGE,
function(e:AmqpEvent):void {log("Exchange declared ");});
publishChannel.addEventListener(AmqpEvent.BINDQUEUE,
function(e:AmqpEvent):void {log("Bound queue and exchange");});
publishChannel.addEventListener(AmqpEvent.SELECTTRASACTION,
function(e:AmqpEvent):void {log("Selected Transaction");});
publishChannel.addEventListener(AmqpEvent.COMMITTRASACTION,
function(e:AmqpEvent):void {log("Committed Transaction");});
publishChannel.addEventListener(AmqpEvent.ROLLBACKTRASACTION,
function(e:AmqpEvent):void {log("Rolled back transaction");});
publishChannel.addEventListener(AmqpEvent.CLOSE, closePublishChannelHandler);
consumeChannel.addEventListener(AmqpEvent.DECLAREQUEUE,
function(e:AmqpEvent):void {log("Queue declared ");});
consumeChannel.addEventListener(AmqpEvent.SUBSCRIBE,
function(e:AmqpEvent):void {log("Subscribed to the queue ");});
consumeChannel.addEventListener(AmqpEvent.CLOSE,
function(e:AmqpEvent):void {log("Channel closed ");});
consumeChannel.addEventListener(AmqpEvent.FLOW,
function(e:AmqpEvent):void {log("Flow event");});
consumeChannel.addEventListener(AmqpEvent.MESSAGE, messageHandler);
In this example, each of the event handlers has an associated function (either anonymous or named) that processes the AMQP event. Refer to the AmqpEvent ActionScript API documentation for more information about channel events.
Declare an Exchange
AMQP messages are published to exchanges. Messages contain a routing key that contains the information about the message's destination. The exchange accepts messages and their routing keys and delivers them to a message queue. You can think of an exchange as an electronic mailman that delivers the messages to a mailbox (the queue) based on the address on the message's envelope (the routing key). Exchanges do not store messages.
AMQP defines different exchange types. Some of these exchange types (Direct, Fanout, and Topic) must be supported by all AMQP brokers while others (Headers and System) are optional. AMQP brokers can also support custom exchange types. The following are the different types of exchanges:
- Direct--Messages are sent only to a queue that is bound with a binding key that matches the message's routing key.
- Fanout--Messages are sent to every queue that is bound to the exchange.
- Topic--Messages are sent to a queue based on categorical binding keys and wildcards.
- Headers--Messages are sent to a queue based on their header property values.
- System--Messages are sent to system services.
Exchanges can be durable, meaning that the exchange survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the exchange lasts only until the broker is shut down. Finally, to check if an exchange exists on the AMQP broker (without actually creating it), you can create a passive exchange. The following example shows how you can create a direct exchange on the publish channel:
publishChannel.declareExchange(exchangeName, "direct", passive, durable, noWait, null);
Note: In this example, the arguments passive, durable, and noWait represent boolean values. Note also that no custom parameters are passed in.
After the exchange is created successfully, a declareexchange event is raised, which calls the previously registered event handler.
Declare a Queue
AMQP messages are consumed from queues. You can think of a queue as a mailbox; messages addressed to a particular address (the routing key) are placed in the mailbox for the consumer to pick up. If multiple consumers are bound to a single queue, only one of the consumers receives the message (the one that picked up the mail).
To check if a queue exists on the AMQP broker (without creating it), you can create a passive queue. Additionally, queues can be marked exclusive, which means that they are tied to a specific connection. If a queue is marked exclusive, it is deleted when the connection on which it was created is closed.
Queues can be durable, meaning that the queue survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the queue lasts only until the broker is shut down. Queues can also be marked auto delete, which means that the queue is automatically deleted when it is no longer in use. The following example shows how you can create a queue on the consume channel:
consumeChannel.declareQueue(queueName, passive, durable, exclusive, autoDelete, noWait, null);
Note: In this example, the arguments passive, durable, exclusive, autoDelete, and noWait represent boolean values. Note also that no custom parameters are passed in.
After the queue is created successfully, a declarequeue event is raised, which calls the previously registered event handler.
Bind an Exchange to a Queue
Once you have created an exchange and a queue in AMQP, you must bind--or map--one to the other so that messages published to a specific exchange are delivered to a particular queue. You bind a queue to an exchange with a routing key as shown in the following example.
publishChannel.bindQueue(queueName, exchangeName, routingKey, noWait, null);
After the exchange is bound to the queue successfully, a binddqueue event is raised, which calls the previously registered event handler.
Publish Messages
Messages are published to exchanges. The established binding rules (routing keys) then determine to which queue a message is delivered. Messages have content that consists of two parts:
- Content Header--A set of properties that describes the message
- Content Body--A blob of binary data
Additionally, messages can be marked mandatory to send a notification to the publisher in case a message cannot be delivered to a queue. You can also mark a message immediate so that it is returned to the sender if the message cannot be routed to a queue consumer immediately. The following example shows how the content body of a message is added to a buffer (AMQP uses a binary message format) and published to an exchange using the publish channel:
public function dispatchMessage(msg:String):void {
var body:ByteBuffer = new ByteBuffer();
body.putString(msg, Charset.UTF8);
body.flip();
var headers:Object = {};
publishChannel.publishBasic(body, headers, exchange, routingkey, mandatory, immediate);
}
Note: The arguments mandatory and immediate use boolean values. Note also that no custom parameters are passed in.
Consume Messages
Once messages are published, they can be consumed from a queue. A variety of options can be applied to messages in a queue. For example, publishers can choose to require acknowledgement (ack) of messages so that messages can be redelivered in the case of a delivery failure. If the queue is set to exclusive, it is scoped to just the current connection and deleted when the connection on which it was established is closed. Additionally, you can use the no local setting to notify the broker not to send messages to the connection on which the messages were published. The following example shows how you can consume messages from a queue on the consume channel:
consumeChannel.consumeBasic(myQueueName, myConsumerTag, noLocal, noAck, exclusive, noWait, null);
Note: In this example, the arguments noLocal, noAck, noWait, and exclusive represent boolean values.
After the consumeBasic method is successful, a subscribe event is raised, which calls the previously registered event handler. The AMQP broker can then start delivering messages to the client and these messages raise the message event, which calls the corresponding event handler. The following example shows how the messageHandler function retrieves information from the AmqpEvent object:
private function messageHandler(e:AmqpEvent):void {
var body:String = e.body.getString(Charset.UTF8);
log("MESSAGE: " + body);
}
Use Transactions
AMQP supports transactional messaging, through server local transactions. In a transaction the server only publishes a set of messages as one unit when the client commits the transaction. Transactions only apply to message publishing and not to the consumption of the messages.
Note: Once you commit or roll back a transaction on a channel, a new transaction is started automatically. For this reason you must commit all future messages you want to publish on that channel or create a new, non-transactional channel to publish messages on.
The following transaction-related methods can be used to work select (start), commit, and rollback a transaction:
publishChannel.selectTx();
publishChannel.commitTx();
publishChannel.collbackTx();
After the transaction is successfully selected, committed, or rolled back, the corresponding events (selecttransaction, committransaction, rollbacktransaction) are raised. These events call the previously registered event handlers.
Control Message Flow
You can use flow control in AMQP to temporarily--or permanently--halt the flow of messages on a channel from a queue to a consumer. If you turn the message flow off, no messages are sent to the consumer. The following example shows how you can turn the flow of messages on a channel off and back on:
consumeChannel.flowChannel(false);
consumeChannel.flowChannel(true);
After the flow on a channel is halted or resumed successfully, a flow event is raised, which calls the previously registered event handler.
Handle Exceptions
Channels are light-weight and cheap, and therefore used in AMQP's exception handling mechanism--channels are closed when an exception occurs. In this example, the previously registered event handler for the CLOSE event calls the closePublishChannelHandler function when the channel closes and the function receives an AmqpEvent object, which contains detailed information about the exception. The following shows an example of how this information can be used.
private function closePublishChannelHandler(e:AmqpEvent):void {
var body:String = e.frame.body.getString(Charset.UTF8);
log("Channel Closed: " + body);
}
Compile and Run the Flex Application
Once you are finished coding the client (typically, an .MXML file), you must use the Adobe Flex 3 SDK from Adobe to compile your Flex client. Ensure that your library-path points to the Kaazing Flex library .SWC file kaazing-enterprise-gateway-client-release-version.swc. This .SWC file can be found in the Kaazing Gateway demo bundle in the directory KAAZING_HOME/lib/client/flex. Once you have compiled a SWF file, you can execute this SWF file from the flash player or use a Web page to invoke it. For example, you can host it in the web directory KAAZING_HOME/web.
Other Coding Styles
In this how-to, you have used an event programming style. You can also use a continuation-passing programming style. The following example shows how you can declare an exchange using the continuation-passing programming style:
publishChannel.declareExchange(txtExchange, "direct", passive, durable, noWait, null, declareExchangeHandlerContinuation, errorHandlerContinuation);
You can also combine the two programming styles.
Summary
As you have seen in this how-to, you can build Web applications that can communicate directly from the browser with an AMQP broker using the Flex AmqpClient client library. Thus, with the help of WebSocket the browser now enjoys the first-class citizenry of AMQP communications that has, until recently, been enjoyed only by desktop applications.