====== Java Message Service 1.1 ======
Tento paper pojednává o JMS 1.1 a jeho vztahu k produktu [[TIBCO Enterprise Message Service 4.x]].
TIBCO EMS je založen na JMS, což je specifikace (nikoli implementace) pro systémy zasílání zpráv od firmy Sun, která ve verzi 1.1 nabízí dvě monosti posílání zpráv:
* **point-to-point** -- zpráva je přes server doručena od //jednoho// odesílatele //jednomu// adresátovi pomocí **queue** (fronty) -- často to přirovnáváme k poště (odběratel má na serveru "schránku" pro dopisy);
* **publish-subscribe** -- zpráva je přes server uložena do tzv. **topicu** (téma), odkud je dostávají zaregistrovaní posluchači (nazývá se také broadcast).
U **point-to-point** (PTP) se odesílá na konkrétního příjemce. Zpráva jde přes server, kde je zařazena do fronty, takže //i když není příjemce připojen//, o žádné zprávy //nepřijde//. Ty mu jsou doručeny ve chvíli, kdy se připojí. Fronta může mít i více posluchačů, ale je zajištěno, že jeden posluchač dostane každou z právu jen jednou. Do jedné fronty může zapisovat více odesílatelů (producer). Pokud je fronta nastavena na **exclusive**, zprávy může vyzvedávat jen jeden příjemce (consumer) -- ten, který je specifikován jako první. U **non-exclusive** režimu může vybírat z fronty více příjemců, což se používá pro load balancing příchozích zpráv. U point-to-point režimu lze také použít třídu **QueueBrowser** a stahovat zprávy z fronty, aniž by je klient odebíral. Pro výběr je k dispozici //MessageSelector//, který je popsán dále.
U **publish-subscribe** (Pub/Sub) probíhá posílání zpráv od jednoho odesílatele více příjemcům, kteří se přihlašují k odběru jednotlivých témat (topic). Pokud consumer není dostupný, //není mu zpráva doručena//. Ale je možno konfigurovat, zda-li je přihlášení trvalé (**durable**). V tom případě se klientovi pošlou všechny zprávy, o které by přišel v odpojeném stavu.
Vlastnosti pro queue nebo topic:
* fail-safe režim -- zprávy jsou nejprve zapisovány na disk;
* permissions -- access control pro zprávy;
* limits -- limity na frontu nebo durable topic kvůli přeplnění;
* routing -- přeposílání na jiné servery
* exchanges -- výměna s jinými messagingovými systémy;
* exkluzivita -- viz výše;
* redelivery policy
* tracing, logging
===== JMS API =====
JMS API je jednoduché, ConnectionFactory vytvoří Connection, to Session a pomocí instance tohoto objektu je možné vytvořit Message, MessageProducer a MessageConsumer, kteří mohou provádět operace (posílat zprávy), případně se zaregistrovat jako MessageListener. Dobré je, že toto API je společné jak pro point-to-point tak pro publish-subscribe. Ve starších verzích (před JMS 1.1) byla tato rozhraní dvojmo (QueueSession, TopicSession a podobně).
Objekt **ConnectionFactory** zapouzřuje nastavení spojení, je uložen a spravován na TIBCO EMS serveru a přístup k němu získá klient pomocí JNDI. Třída Connection zapouzdřuje virtuální spojení, je s tím spojena režie, takže aplikace si obvykle vytváří jen jedno spojení. Před konzumací zpráv je nutno volat metodu //start()//, ukončit (např. dočasně) konzumaci je možné metodou //stop()//. Po skončení práce je nutno zavolat metodu //close()//, která ukončí také všechny Session objekty tohoto spojení.
**Session** je single-threaded objekt, který vytváří zprávy, producery a konzumenty. Session může být také v transakci (první parametr konstruktoru), v tom případě se skupina zpráv posílá (a přijímá) atomicky. **MessageProducer** objekt (vytvářený objektem Session) se používá k zasílání správ. K tomu používáme metodu //send(message)//. Je možné vytvořit také MessageProducer aniž bychom specifikovali cíl -- ten se poté musí specifikovat v //send// nebo //publish// metodách.
Objekt **MessageConsumer** vytváříme pomocí objektu Session a slouží k příjmu zpráv. Je nutné si uvědomit, že po přihlášení k frontě (queue) zprávy zůstávají ve frontě, dokue je nevyzvedneme. Naproti tomu po přihlášení k tématu (topic) je nutné být on-line, abychom mohli přijímat. Můžeme ale vytvořit konzumenta pomocí //Session.createDurableSubscriber()// metody -- tedy trvalého příjemce publish-subscribe komunikace.
Konzumace zpráv může být //asynchronní// a //synchronní//. U synchroního MessageConsumer explicitně volá metodu //receive()// na objektech topic nebo queue. U asynchronního režimu se automaticky volá metoda //onMessage()// třídy **MessageListener**. Objekt MessageListener se registruje u konkrétního MessageConsumer objektu **před** voláním metody //start()//. Jeden a ten samý listener může zpracovávat zprávy z front nebo topiců. Specifikace J2EE definuje jako jeden typ EJB speciální typ MessageListener rozhraní -- MDB.
Třídy **TopicRequestor** a **MessageRequestor** pomáhají při tvorbě request-response komunikace. Při použití těchto pomocných tříd stačí zavolat metodu //request()// a ta provede odeslání zprávy a počká na výsledek, který ihned vrátí (je blokující).
A konečně **MessageSelector** je nepovinným filtrem na straně odesílatele, může se tak snížit provoz na síti. Tím se ale také //může zvýšit zatížení serveru// -- selektory se provádí vždy na serveru a mohou být časově náročné. Používá jednoduchou syntaxi postavenou na syntaxi SQL92.
===== Zprávy v JMS =====
Obsahuje **header**, **properties** a **body**. Pouze header je //povinný//, a jsou nastaveny následující hodnoty:
|**název položky**|**popis**|
|JMSDestionatioin|Cíl zprávy.|
|JMSDeliveryMode|Režimy doručování, detaily níže.|
|JMSExpiration|Expirace zprávy v milisekundách. Je nutno dbát na synchronizaci času napříč servery!|
|JMSPriority|JMS zpráva může mít nastavenou prioritu, která se zadává číselně od 0 do 9 (nejvyšší).|
|JMSMessageID|Jedinečný identifikátor.|
|JMSTimestamp|Časové razítko, kdy byla zpráva předána k poslání (skutečný čas odeslání může být jiný).|
|JMSCorrelationID|ID pro párování zpráv, například dotaz-odpověď. Nepovinné.|
|JMSReplyTo|Kam má příjemce odpovídat. Nepovinné.|
|JMSType|Identifikátor typu zprávy.|
|JMSRedelivered|Pokud je nastaven, zpráva byla už doručena (ale nepřijata).|
Properties jsou dodatečné (nepovinné) vlastnosti zprávy nebo vlastnosti //závislé na implementaci//. Vlastnosti rozšiřující mají prefix JMSX (například JMSXUserID, JMSXAppID, JMSXDeliveryCount, JMSXGroupID a jiné). Properties mohou být těchto **typů**: boolean, byte, short, int, long, float, double a String.
Seznam vlastností pro systém TIBCO EMS shrnuje tabulka.
|**vlastnost**|**možné hodnoty**|**popis**|
|JMS_TIBCO_COMPRESS|Boolean|V případě //perzistentních zpráv ve frontě// nebo //topiců s durable subscribery// je při ukládání na disk použita komprese pro //těla zpráv//. Komprese se vůbec netýká přenosu, ale jen ukládání na disk.|
|JMS_TIBCO_PRESERVE_UNDELIVERED|Boolean|Pokud nemohla být zpráva doručena, je přesunuta do undelivered fronty. Z té se obvykle zasílají někam jinam (správci a podobně).|
|JMS_TIBCO_SENDER|String|Pokud je cíl nakonfigurován, TIBCO sem ukládá //jméno odesílatele//.|
|JMS_TIBCO_DISABLE_SENDER|Boolean|Pokud odesílatel nechce, aby se do JMS_TIBCO_SENDER jeho jméno ukládalo, může nastavit tuto hodnotu na //true//, nicméně to mu nepomůže, pakliže bylo na cíli nastavena volba //sender_name_enforced//.|
|JMS_TIBCO_MSG_EXT|Boolean|Tento flag je nastaven, pokud byly typy MapMessage nebo StreamMessage rozšířeny o věci, které JMS nepodporuje (zapouzdřená zpráva, pole z primitivních typů). Toto může být nastaveno, pokud zpráva přišla ze systémů TIBCO Rendezvous nebo ActiveEnterprise.|
|JMS_TIBCO_MSG_TRACE|String|Zapne trasování zprávy (hodnota //null//) nebo kompletní trasování včetně těla (hodnota "body").|
|JMS_TIBCO_IMPORTED|Boolean|Zpráva byla importována z jiných messagingových systémů.|
Typy těla zpráv shrnuje následující tabulka. Maximální velikost zprávy je u TIBCO EMS 512 MB.
|**typ zprávy**|**popis**|
|Message|Zpráva nemá tělo.|
|TextMessage|Typ String.|
|MapMessage|Key/value dvojice, klíče jsou String, hodnoty jsou potomci třídy Object.|
|BytesMessage|Proud bajtů.|
|StreamMessage|Proud primitivních typů.|
|ObjectMessage|Serializovaný Object.|
Pokud se zpráva vytváří v prostředí Javy, mají řetězce kódování UTF-8. TIBCO EMS ale má také API pro komunikaci z jiných jazyků (např. C), v tomto případě lze nastavit kódování. Jakmile zpráva přijde do prostředí Javy, TIBCO řetězce automaticky překonvertuje do UTF-8.
**Potvrzování příjmu** je implementováno v JMS na dvou úrovních. Při zasílání zprávy na JMS server:
* 1) zpráva je odeslána
* 2) server pošle zpět potvrzení o doručení (toto potvrzení lze vypnout nastavením na //reliable delivery//)
Při zasílání zprávy příjemci jsou možné fáze tři:
* 3) poslání zprávy ze serveru příjemci
* 4) potvrzení příjmu
* 5) potvrzení o potvrzení příjmu (předchází duplicitám)
Rozlišujeme tři úrovně potvrzování:
* **CLIENT_ACKNOWLEDGE** -- konzument sám pošle potvrzení všech zpráv zaslaných //v jednom sezení// voláním metody //acknowldege()//;
* **AUTO_ACKNOWLEDGE** -- session automaticky pošle potvrzení o příjmu po opuštění metody onMessage();
* **DUPS_OK_ACKNOWLEDGE** -- konzumentům nevadí příjem duplicit, takže na server může odejít potvrzení později -- klient bude porvrzovat po skupinách (lazy potvrzování tedy může duplicity generovat).
TIBCO EMS dále přidává několik dalších režimů:
* **NO_ACKNOWLEDGE** -- žádné potvrzování, se zprávou se zachází, jako by byla ihned potvrzena (nelze však použít u durable subskribcí a subskripcí na routovaný server);
* **EXPLICIT_CLIENT_ACKNOWLEDGE** -- jako //CLIENT_ACKNOWLEDGE// ale potvrzuje se **každá zpráva zvlášť**;
* **EXPLICIT_CLIENT_DUPS_OK_ACKNOWLEDGE** -- jako //DUPS_OK_ACKNOWLEDGE//, ale také individuálně.
**Režimy doručování** jsou definovány v JMS standardu:
* **PERSISTENT** -- Je zajištěno, že v případě jednoho nebo více //trvalých přihlášení// dochází k ukládání zprávy na disk, dokud není bezpečně odeslána příjemcům. Nemělo by tedy dojít ka ztrátě při výpadku serveru.
* **NON_PERSISTENT** -- V tomto režimu se zprávy vůbec na disk nemusejí ukládat
TIBCO EMS pak ještě přidává ještě dvě možnosti:
* **RELIABLE_DELIVERY**, kdy producent zprávy nedostává žádné potvrzení (číslo 2 neproběhne);
* **NO_ACKNOWLEDGE**, kdy po odeslání zprávy konzumentovi už se server ničím nezabývá a klient už nic nepotvrzuje.
Nastavení těchto dvou režimů se provádí nad producerem, resp. nad sezením:
import com.tibco.tibjms.tibjms.Tibjms;
messageProducer.setDeliveryMode(Tibjms.RELIABLE_DELIVERY);
// resp.
TopicSession session = conn.createTopicSession(false, Tibjms.NO_ACKNOWLEDGE);
===== Příklad =====
Nyní se podíváme na komplexní a vševysvětlující příklad jednoduchého chatu.
/*
* Copyright (C) 2000, O'Reilly & Associates, Inc.
* The code in this file may be copied, distributed, and reused,
* provided that you do not remove this copyright notice.
* O'Reilly & Associates assumes no responsibility for damages
* resulting from the use of this code.
*
* Modified by Lukas Zapletal.
*/
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;
public class Chat implements javax.jms.MessageListener {
private TopicSession pubSession;
private TopicSession subSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String userName;
String providerContextFactory = "com.tibco.tibjms.naming.TibjmsInitialContextFactory";
String defaultProtocol = "tibjmsnaming";
String defaultProviderURL = defaultProtocol + "://localhost:7222";
/* Run the Chat client */
public static void main(String[] args) {
try {
Chat chat = null;
if (args.length == 3) {
// args[0]=topicName; args[1]=username; args[2]=password
chat = new Chat(args[0], args[1], args[2]);
} else if (args.length == 1) {
chat = new Chat(args[0], null, null);
} else {
System.out.println("Topic missing");
System.exit(1);
}
// read from command line
BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
// loop until the word "exit" is typed
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
chat.close(); // close down connection
System.exit(0); // exit program
} else {
chat.writeMessage(s);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/* Constructor. Establish JMS publisher and subscriber */
public Chat(String topicName, String username, String password) throws Exception {
// Obtain a JNDI connection
Properties env = new Properties();
// ... specify the JNDI properties specific to the vendor
env.put(Context.INITIAL_CONTEXT_FACTORY, providerContextFactory);
env.put(Context.PROVIDER_URL, defaultProviderURL);
if (username != null) {
env.put(Context.SECURITY_PRINCIPAL, username);
env.put(Context.SECURITY_CREDENTIALS, password);
}
InitialContext jndi = new InitialContext(env);
// Lookup a JMS connection factory
TopicConnectionFactory conFactory =
(TopicConnectionFactory) jndi.lookup("TopicConnectionFactory");
// Create a JMS connection
TopicConnection connection = conFactory.createTopicConnection(username, password);
// Create a JMS session object
TopicSession pubSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Lookup a JMS topic
Topic chatTopic = (Topic) jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application
set(connection, pubSession, subSession, publisher, username);
// Start the JMS connection; allows messages to be delivered
connection.start();
}
/* Initializes the instance variables */
public void set(TopicConnection con, TopicSession pubSess, TopicSession subSess,
TopicPublisher pub, String username) {
this.connection = con;
this.pubSession = pubSess;
this.subSession = subSess;
this.publisher = pub;
this.userName = username == null ? "anonymous" : username;
}
/* Receive message from topic subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
/* Create and send message using topic publisher */
protected void writeMessage(String text) throws JMSException {
TextMessage message = pubSession.createTextMessage();
message.setText(userName + " : " + text);
publisher.publish(message);
}
/* Close the JMS connection */
public void close() throws JMSException {
connection.close();
}
}
===== Odkazy =====
* http://en.wikipedia.org/wiki/Java_Message_Service