====== Java Message Service 1.1 ====== Tento paper pojednává o JMS 1.1 a jeho vztahu k produktu [[TIBCO Enterprise Message Service 4.x]]. TIBCO EMS je založen na JMS, což je specifikace (nikoli implementace) pro systémy zasílání zpráv od firmy Sun, která ve verzi 1.1 nabízí dvě monosti posílání zpráv: * **point-to-point** -- zpráva je přes server doručena od //jednoho// odesílatele //jednomu// adresátovi pomocí **queue** (fronty) -- často to přirovnáváme k poště (odběratel má na serveru "schránku" pro dopisy); * **publish-subscribe** -- zpráva je přes server uložena do tzv. **topicu** (téma), odkud je dostávají zaregistrovaní posluchači (nazývá se také broadcast). U **point-to-point** (PTP) se odesílá na konkrétního příjemce. Zpráva jde přes server, kde je zařazena do fronty, takže //i když není příjemce připojen//, o žádné zprávy //nepřijde//. Ty mu jsou doručeny ve chvíli, kdy se připojí. Fronta může mít i více posluchačů, ale je zajištěno, že jeden posluchač dostane každou z právu jen jednou. Do jedné fronty může zapisovat více odesílatelů (producer). Pokud je fronta nastavena na **exclusive**, zprávy může vyzvedávat jen jeden příjemce (consumer) -- ten, který je specifikován jako první. U **non-exclusive** režimu může vybírat z fronty více příjemců, což se používá pro load balancing příchozích zpráv. U point-to-point režimu lze také použít třídu **QueueBrowser** a stahovat zprávy z fronty, aniž by je klient odebíral. Pro výběr je k dispozici //MessageSelector//, který je popsán dále. U **publish-subscribe** (Pub/Sub) probíhá posílání zpráv od jednoho odesílatele více příjemcům, kteří se přihlašují k odběru jednotlivých témat (topic). Pokud consumer není dostupný, //není mu zpráva doručena//. Ale je možno konfigurovat, zda-li je přihlášení trvalé (**durable**). V tom případě se klientovi pošlou všechny zprávy, o které by přišel v odpojeném stavu. Vlastnosti pro queue nebo topic: * fail-safe režim -- zprávy jsou nejprve zapisovány na disk; * permissions -- access control pro zprávy; * limits -- limity na frontu nebo durable topic kvůli přeplnění; * routing -- přeposílání na jiné servery * exchanges -- výměna s jinými messagingovými systémy; * exkluzivita -- viz výše; * redelivery policy * tracing, logging ===== JMS API ===== JMS API je jednoduché, ConnectionFactory vytvoří Connection, to Session a pomocí instance tohoto objektu je možné vytvořit Message, MessageProducer a MessageConsumer, kteří mohou provádět operace (posílat zprávy), případně se zaregistrovat jako MessageListener. Dobré je, že toto API je společné jak pro point-to-point tak pro publish-subscribe. Ve starších verzích (před JMS 1.1) byla tato rozhraní dvojmo (QueueSession, TopicSession a podobně). Objekt **ConnectionFactory** zapouzřuje nastavení spojení, je uložen a spravován na TIBCO EMS serveru a přístup k němu získá klient pomocí JNDI. Třída Connection zapouzdřuje virtuální spojení, je s tím spojena režie, takže aplikace si obvykle vytváří jen jedno spojení. Před konzumací zpráv je nutno volat metodu //start()//, ukončit (např. dočasně) konzumaci je možné metodou //stop()//. Po skončení práce je nutno zavolat metodu //close()//, která ukončí také všechny Session objekty tohoto spojení. **Session** je single-threaded objekt, který vytváří zprávy, producery a konzumenty. Session může být také v transakci (první parametr konstruktoru), v tom případě se skupina zpráv posílá (a přijímá) atomicky. **MessageProducer** objekt (vytvářený objektem Session) se používá k zasílání správ. K tomu používáme metodu //send(message)//. Je možné vytvořit také MessageProducer aniž bychom specifikovali cíl -- ten se poté musí specifikovat v //send// nebo //publish// metodách. Objekt **MessageConsumer** vytváříme pomocí objektu Session a slouží k příjmu zpráv. Je nutné si uvědomit, že po přihlášení k frontě (queue) zprávy zůstávají ve frontě, dokue je nevyzvedneme. Naproti tomu po přihlášení k tématu (topic) je nutné být on-line, abychom mohli přijímat. Můžeme ale vytvořit konzumenta pomocí //Session.createDurableSubscriber()// metody -- tedy trvalého příjemce publish-subscribe komunikace. Konzumace zpráv může být //asynchronní// a //synchronní//. U synchroního MessageConsumer explicitně volá metodu //receive()// na objektech topic nebo queue. U asynchronního režimu se automaticky volá metoda //onMessage()// třídy **MessageListener**. Objekt MessageListener se registruje u konkrétního MessageConsumer objektu **před** voláním metody //start()//. Jeden a ten samý listener může zpracovávat zprávy z front nebo topiců. Specifikace J2EE definuje jako jeden typ EJB speciální typ MessageListener rozhraní -- MDB. Třídy **TopicRequestor** a **MessageRequestor** pomáhají při tvorbě request-response komunikace. Při použití těchto pomocných tříd stačí zavolat metodu //request()// a ta provede odeslání zprávy a počká na výsledek, který ihned vrátí (je blokující). A konečně **MessageSelector** je nepovinným filtrem na straně odesílatele, může se tak snížit provoz na síti. Tím se ale také //může zvýšit zatížení serveru// -- selektory se provádí vždy na serveru a mohou být časově náročné. Používá jednoduchou syntaxi postavenou na syntaxi SQL92. ===== Zprávy v JMS ===== Obsahuje **header**, **properties** a **body**. Pouze header je //povinný//, a jsou nastaveny následující hodnoty: |**název položky**|**popis**| |JMSDestionatioin|Cíl zprávy.| |JMSDeliveryMode|Režimy doručování, detaily níže.| |JMSExpiration|Expirace zprávy v milisekundách. Je nutno dbát na synchronizaci času napříč servery!| |JMSPriority|JMS zpráva může mít nastavenou prioritu, která se zadává číselně od 0 do 9 (nejvyšší).| |JMSMessageID|Jedinečný identifikátor.| |JMSTimestamp|Časové razítko, kdy byla zpráva předána k poslání (skutečný čas odeslání může být jiný).| |JMSCorrelationID|ID pro párování zpráv, například dotaz-odpověď. Nepovinné.| |JMSReplyTo|Kam má příjemce odpovídat. Nepovinné.| |JMSType|Identifikátor typu zprávy.| |JMSRedelivered|Pokud je nastaven, zpráva byla už doručena (ale nepřijata).| Properties jsou dodatečné (nepovinné) vlastnosti zprávy nebo vlastnosti //závislé na implementaci//. Vlastnosti rozšiřující mají prefix JMSX (například JMSXUserID, JMSXAppID, JMSXDeliveryCount, JMSXGroupID a jiné). Properties mohou být těchto **typů**: boolean, byte, short, int, long, float, double a String. Seznam vlastností pro systém TIBCO EMS shrnuje tabulka. |**vlastnost**|**možné hodnoty**|**popis**| |JMS_TIBCO_COMPRESS|Boolean|V případě //perzistentních zpráv ve frontě// nebo //topiců s durable subscribery// je při ukládání na disk použita komprese pro //těla zpráv//. Komprese se vůbec netýká přenosu, ale jen ukládání na disk.| |JMS_TIBCO_PRESERVE_UNDELIVERED|Boolean|Pokud nemohla být zpráva doručena, je přesunuta do undelivered fronty. Z té se obvykle zasílají někam jinam (správci a podobně).| |JMS_TIBCO_SENDER|String|Pokud je cíl nakonfigurován, TIBCO sem ukládá //jméno odesílatele//.| |JMS_TIBCO_DISABLE_SENDER|Boolean|Pokud odesílatel nechce, aby se do JMS_TIBCO_SENDER jeho jméno ukládalo, může nastavit tuto hodnotu na //true//, nicméně to mu nepomůže, pakliže bylo na cíli nastavena volba //sender_name_enforced//.| |JMS_TIBCO_MSG_EXT|Boolean|Tento flag je nastaven, pokud byly typy MapMessage nebo StreamMessage rozšířeny o věci, které JMS nepodporuje (zapouzdřená zpráva, pole z primitivních typů). Toto může být nastaveno, pokud zpráva přišla ze systémů TIBCO Rendezvous nebo ActiveEnterprise.| |JMS_TIBCO_MSG_TRACE|String|Zapne trasování zprávy (hodnota //null//) nebo kompletní trasování včetně těla (hodnota "body").| |JMS_TIBCO_IMPORTED|Boolean|Zpráva byla importována z jiných messagingových systémů.| Typy těla zpráv shrnuje následující tabulka. Maximální velikost zprávy je u TIBCO EMS 512 MB. |**typ zprávy**|**popis**| |Message|Zpráva nemá tělo.| |TextMessage|Typ String.| |MapMessage|Key/value dvojice, klíče jsou String, hodnoty jsou potomci třídy Object.| |BytesMessage|Proud bajtů.| |StreamMessage|Proud primitivních typů.| |ObjectMessage|Serializovaný Object.| Pokud se zpráva vytváří v prostředí Javy, mají řetězce kódování UTF-8. TIBCO EMS ale má také API pro komunikaci z jiných jazyků (např. C), v tomto případě lze nastavit kódování. Jakmile zpráva přijde do prostředí Javy, TIBCO řetězce automaticky překonvertuje do UTF-8. **Potvrzování příjmu** je implementováno v JMS na dvou úrovních. Při zasílání zprávy na JMS server: * 1) zpráva je odeslána * 2) server pošle zpět potvrzení o doručení (toto potvrzení lze vypnout nastavením na //reliable delivery//) Při zasílání zprávy příjemci jsou možné fáze tři: * 3) poslání zprávy ze serveru příjemci * 4) potvrzení příjmu * 5) potvrzení o potvrzení příjmu (předchází duplicitám) Rozlišujeme tři úrovně potvrzování: * **CLIENT_ACKNOWLEDGE** -- konzument sám pošle potvrzení všech zpráv zaslaných //v jednom sezení// voláním metody //acknowldege()//; * **AUTO_ACKNOWLEDGE** -- session automaticky pošle potvrzení o příjmu po opuštění metody onMessage(); * **DUPS_OK_ACKNOWLEDGE** -- konzumentům nevadí příjem duplicit, takže na server může odejít potvrzení později -- klient bude porvrzovat po skupinách (lazy potvrzování tedy může duplicity generovat). TIBCO EMS dále přidává několik dalších režimů: * **NO_ACKNOWLEDGE** -- žádné potvrzování, se zprávou se zachází, jako by byla ihned potvrzena (nelze však použít u durable subskribcí a subskripcí na routovaný server); * **EXPLICIT_CLIENT_ACKNOWLEDGE** -- jako //CLIENT_ACKNOWLEDGE// ale potvrzuje se **každá zpráva zvlášť**; * **EXPLICIT_CLIENT_DUPS_OK_ACKNOWLEDGE** -- jako //DUPS_OK_ACKNOWLEDGE//, ale také individuálně. **Režimy doručování** jsou definovány v JMS standardu: * **PERSISTENT** -- Je zajištěno, že v případě jednoho nebo více //trvalých přihlášení// dochází k ukládání zprávy na disk, dokud není bezpečně odeslána příjemcům. Nemělo by tedy dojít ka ztrátě při výpadku serveru. * **NON_PERSISTENT** -- V tomto režimu se zprávy vůbec na disk nemusejí ukládat TIBCO EMS pak ještě přidává ještě dvě možnosti: * **RELIABLE_DELIVERY**, kdy producent zprávy nedostává žádné potvrzení (číslo 2 neproběhne); * **NO_ACKNOWLEDGE**, kdy po odeslání zprávy konzumentovi už se server ničím nezabývá a klient už nic nepotvrzuje. Nastavení těchto dvou režimů se provádí nad producerem, resp. nad sezením: import com.tibco.tibjms.tibjms.Tibjms; messageProducer.setDeliveryMode(Tibjms.RELIABLE_DELIVERY); // resp. TopicSession session = conn.createTopicSession(false, Tibjms.NO_ACKNOWLEDGE); ===== Příklad ===== Nyní se podíváme na komplexní a vševysvětlující příklad jednoduchého chatu. /* * Copyright (C) 2000, O'Reilly & Associates, Inc. * The code in this file may be copied, distributed, and reused, * provided that you do not remove this copyright notice. * O'Reilly & Associates assumes no responsibility for damages * resulting from the use of this code. * * Modified by Lukas Zapletal. */ import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Properties; public class Chat implements javax.jms.MessageListener { private TopicSession pubSession; private TopicSession subSession; private TopicPublisher publisher; private TopicConnection connection; private String userName; String providerContextFactory = "com.tibco.tibjms.naming.TibjmsInitialContextFactory"; String defaultProtocol = "tibjmsnaming"; String defaultProviderURL = defaultProtocol + "://localhost:7222"; /* Run the Chat client */ public static void main(String[] args) { try { Chat chat = null; if (args.length == 3) { // args[0]=topicName; args[1]=username; args[2]=password chat = new Chat(args[0], args[1], args[2]); } else if (args.length == 1) { chat = new Chat(args[0], null, null); } else { System.out.println("Topic missing"); System.exit(1); } // read from command line BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // loop until the word "exit" is typed while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { chat.close(); // close down connection System.exit(0); // exit program } else { chat.writeMessage(s); } } } catch (Exception e) { e.printStackTrace(); } } /* Constructor. Establish JMS publisher and subscriber */ public Chat(String topicName, String username, String password) throws Exception { // Obtain a JNDI connection Properties env = new Properties(); // ... specify the JNDI properties specific to the vendor env.put(Context.INITIAL_CONTEXT_FACTORY, providerContextFactory); env.put(Context.PROVIDER_URL, defaultProviderURL); if (username != null) { env.put(Context.SECURITY_PRINCIPAL, username); env.put(Context.SECURITY_CREDENTIALS, password); } InitialContext jndi = new InitialContext(env); // Lookup a JMS connection factory TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi.lookup("TopicConnectionFactory"); // Create a JMS connection TopicConnection connection = conFactory.createTopicConnection(username, password); // Create a JMS session object TopicSession pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup a JMS topic Topic chatTopic = (Topic) jndi.lookup(topicName); // Create a JMS publisher and subscriber TopicPublisher publisher = pubSession.createPublisher(chatTopic); TopicSubscriber subscriber = subSession.createSubscriber(chatTopic); // Set a JMS message listener subscriber.setMessageListener(this); // Intialize the Chat application set(connection, pubSession, subSession, publisher, username); // Start the JMS connection; allows messages to be delivered connection.start(); } /* Initializes the instance variables */ public void set(TopicConnection con, TopicSession pubSess, TopicSession subSess, TopicPublisher pub, String username) { this.connection = con; this.pubSession = pubSess; this.subSession = subSess; this.publisher = pub; this.userName = username == null ? "anonymous" : username; } /* Receive message from topic subscriber */ public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(text); } catch (JMSException jmse) { jmse.printStackTrace(); } } /* Create and send message using topic publisher */ protected void writeMessage(String text) throws JMSException { TextMessage message = pubSession.createTextMessage(); message.setText(userName + " : " + text); publisher.publish(message); } /* Close the JMS connection */ public void close() throws JMSException { connection.close(); } } ===== Odkazy ===== * http://en.wikipedia.org/wiki/Java_Message_Service