Initial Release
Ensemble adapter for RabbitMQ
Install java 1.8.
Get the JAR: build RabbitMQ-Ensemble-javaapi or download latest jar.
Create (or use any existing one) Java gateway. To create one go to SMP > System Administration > Configuration > Connectivity > Object Gateways. Remember the Port value.
Start Java Gateway.
In a terminal in a target namespace execute:
Set class = "isc.rabbitmq.API"
Set classPath = ##class(%ListOfDataTypes).%New()
Do classPath.Insert(PATH-TO-JAR)
Set gateway = ##class(%Net.Remote.Gateway).%New()
Set sc = gateway.%Connect("localhost", PORT, $Namespace, 2, classPath)
Set sc = gateway.%Import(class)
Write ##class(%Dictionary.CompiledClass).%ExistsId(class)
In the namespace isc.rabbitmq.API class should be present and compiled.
Import this project and compile.
For samples refer to the test production isc.rabbitMQ.Production.
Development is done on Cache-Tort-Git UDL fork
isc.rabbitmq package provides all traditional Interoperability components: Inbound and Outbound Adapters and Operation/Service.
For RabbitMQ to function Java Gateway is required. It can be run in three different ways:
For Interoperability production we recommend running Java Gateway as Interoperability Service. To do that add new EnsLib.JavaGateway.Service service to your production. In RabbitMQ production elements select Java Gateway Service name as a value for JGService setting.
ClassPath for all these elements must contain the paths to amqp jar and RabbitMQ-Ensemble-javaapi jar.
Check isc.rabbitmq.Utils for sample code. The main class is isc.rabbitmq.API. It has the following methods.
| Method | Arguments | Returns | Description |
|---|---|---|---|
| %OnNew | gateway, host, port, user, pass, virtualHost, queue, durable, exchange | api | Creates new connection to RabbitMQ |
| sendMessage | msg, correlationId, messageId | null | Sends message to default queue (as specified in %OnNew) |
| sendMessageToQueue | queue, msg, correlationId, messageId | null | Sends message to the specified queue |
| readMessageString | - | props | Reads message from default queue. Returns list of message properties (including message text) |
| readMessageStream | props | msg | Reads message from default queue. Returns message text as a stream and props is populated with a list of message properties |
| close | - | - | Closes the connection |
| Argument | Java type | InterSystems type | Value | Required | Description |
|---|---|---|---|---|---|
| gateway | - | %Net.Remote.Gateway | - | Yes | Connection to Java Gateway |
| host | String | %String | localhost | Yes | Address of RabbitMQ server or amqp:\\ connection string |
| port | int | %Integer | -1 | Yes | RabbitMQ listener port |
| user | String | %String | guest | Yes | Username |
| pass | String | %String | guest | Yes | Password |
| virtualHost | String | %String | / | Yes | Virtual host |
| queue | String | %String | Test | Yes | Queue name |
| durable | int | %Integer | 1 | Required only if you want to create new queue | The queue will survive a server restart |
| exchange | String | %String | Test | No | Exchange name. Provide only for sending messages. Modifies the meaning of queue property (if exchange is present, then queue means routing key) |
| msg | byte[] | %GlobalBinaryStream | Text | Yes | Message body |
| correlationId | String | %String | CorrelationId | Required only with messageId | Correlation identifier |
| messageId | String | %String | MessageId | Required only with correlationId | Message identifier |
| props | String[] | %ListOfDataTypes | - | Yes | List of message properties |
| api | isc.rabbitmq.API | isc.rabbitmq.API | - | - | API object |
First you need a Java Gateway object (hereinafter: gateway). For example you can get it using this method:
/// Get JGW object
ClassMethod Connect(gatewayName As %String, path As %String, Output sc As %Status) As %Net.Remote.Gateway
{
Set gateway = ""
Set sc = ##class(%Net.Remote.Service).OpenGateway(gatewayName, .gatewayConfig)
Quit:$$$ISERR(sc) gateway
Set sc = ##class(%Net.Remote.Service).ConnectGateway(gatewayConfig, .gateway, path, $$$YES)
Quit gateway
}
Where:
Now that Java Gateway connection is established we can connect to RabbitMQ:
ClassMethod GetAPI(gateway As %Net.Remote.Gateway) As isc.rabbitmq.API { Set host = "localhost" Set port = -1 Set user = "guest" Set pass = "guest" Set virtualHost = "/" Set queue = "Test" Set durable = $$$YESSet api = ##class(isc.rabbitmq.API).%New(gateway, host, port, user, pass, virtualHost, queue, durable) Quit api
}
All parameters are described in the table above. Note that durable argument is used only if you’re creating a new queue. If the queue alreay exists you should still provide it (0 or 1) but it won’t be used.
Assuming you already have api object, sending messages can be done by one of two ways:
Default queue is a queue specified during creation of the api object. To send a message just call sendMessage or sendMessageId:
#Dim api As isc.rabbitmq.API
#Dim msg As %GlobalBinaryStream
Do api.sendMessage(msg)
Do api.sendMessageId(msg, "correlationId", "messageId " _ $zdt($zts,3,1,3))
Where stream is a message body. You can either provide both messageId and correlationId or non of them.
Everything is the same as above, except you call sendMessageToQueue \ sendMessageToQueueId method and the first argument is the name of the queue.
Messages are always read from the default queue (specified at creation of the api object). Message body can be received as text or as a stream.
First you need to prepare props to pass by reference into Java and then call readMessageStream:
#Dim api As isc.rabbitmq.API
#Dim msg As %GlobalBinaryStream
#Dim props As %ListOfDataTypes
Set props = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do props.Insert("")
Set msg = api.readMessageStream(.props)
props would be filled with message metainformation, and msg is a stream containig message body.
#Dim api As isc.rabbitmq.API
#Dim props As %ListOfDataTypes
Set props = api.readMessageString()
props would be filled with message metainformation and message body, note that you don’t need to initialize it on the InterSystems side before calling RabbitMQ.
Elements appearing in props. All of them besides first and second are optional. Conversion from list into RabbitMQ.Message is available in the ListToMessage method of RabbitMQ.InboundAdapter class.
| Position | Name | Description |
|---|---|---|
| 1 | Message Length | Length of a current message |
| 2 | Message Count | Number of remaining messages |
| 3 | ContentType | |
| 4 | ContentEncoding | |
| 5 | CorrelationId | |
| 6 | ReplyTo | |
| 7 | Expiration | |
| 8 | MessageId | |
| 9 | Type | |
| 10 | UserId | |
| 11 | AppId | |
| 12 | ClusterId | |
| 13 | DeliveryMode | |
| 14 | Priority | |
| 15 | Timestamp | |
| 16 | Message body | If message is read as a string, this element would contain it |