Java Message Service 1.1

Tento paper pojednává o JMS 1.1 a jeho vztahu k produktu TIBCO Enterprise Message Service 4.x.

TIBCO EMS je založen na JMS, což je specifikace (nikoli implementace) pro systémy zasílání zpráv od firmy Sun, která ve verzi 1.1 nabízí dvě monosti posílání zpráv:

  • point-to-point – zpráva je přes server doručena od jednoho odesílatele jednomu adresátovi pomocí queue (fronty) – často to přirovnáváme k poště (odběratel má na serveru „schránku“ pro dopisy);
  • publish-subscribe – zpráva je přes server uložena do tzv. topicu (téma), odkud je dostávají zaregistrovaní posluchači (nazývá se také broadcast).

U point-to-point (PTP) se odesílá na konkrétního příjemce. Zpráva jde přes server, kde je zařazena do fronty, takže i když není příjemce připojen, o žádné zprávy nepřijde. Ty mu jsou doručeny ve chvíli, kdy se připojí. Fronta může mít i více posluchačů, ale je zajištěno, že jeden posluchač dostane každou z právu jen jednou. Do jedné fronty může zapisovat více odesílatelů (producer). Pokud je fronta nastavena na exclusive, zprávy může vyzvedávat jen jeden příjemce (consumer) – ten, který je specifikován jako první. U non-exclusive režimu může vybírat z fronty více příjemců, což se používá pro load balancing příchozích zpráv. U point-to-point režimu lze také použít třídu QueueBrowser a stahovat zprávy z fronty, aniž by je klient odebíral. Pro výběr je k dispozici MessageSelector, který je popsán dále.

U publish-subscribe (Pub/Sub) probíhá posílání zpráv od jednoho odesílatele více příjemcům, kteří se přihlašují k odběru jednotlivých témat (topic). Pokud consumer není dostupný, není mu zpráva doručena. Ale je možno konfigurovat, zda-li je přihlášení trvalé (durable). V tom případě se klientovi pošlou všechny zprávy, o které by přišel v odpojeném stavu.

Vlastnosti pro queue nebo topic:

  • fail-safe režim – zprávy jsou nejprve zapisovány na disk;
  • permissions – access control pro zprávy;
  • limits – limity na frontu nebo durable topic kvůli přeplnění;
  • routing – přeposílání na jiné servery
  • exchanges – výměna s jinými messagingovými systémy;
  • exkluzivita – viz výše;
  • redelivery policy
  • tracing, logging

JMS API

JMS API je jednoduché, ConnectionFactory vytvoří Connection, to Session a pomocí instance tohoto objektu je možné vytvořit Message, MessageProducer a MessageConsumer, kteří mohou provádět operace (posílat zprávy), případně se zaregistrovat jako MessageListener. Dobré je, že toto API je společné jak pro point-to-point tak pro publish-subscribe. Ve starších verzích (před JMS 1.1) byla tato rozhraní dvojmo (QueueSession, TopicSession a podobně).

Objekt ConnectionFactory zapouzřuje nastavení spojení, je uložen a spravován na TIBCO EMS serveru a přístup k němu získá klient pomocí JNDI. Třída Connection zapouzdřuje virtuální spojení, je s tím spojena režie, takže aplikace si obvykle vytváří jen jedno spojení. Před konzumací zpráv je nutno volat metodu start(), ukončit (např. dočasně) konzumaci je možné metodou stop(). Po skončení práce je nutno zavolat metodu close(), která ukončí také všechny Session objekty tohoto spojení.

Session je single-threaded objekt, který vytváří zprávy, producery a konzumenty. Session může být také v transakci (první parametr konstruktoru), v tom případě se skupina zpráv posílá (a přijímá) atomicky. MessageProducer objekt (vytvářený objektem Session) se používá k zasílání správ. K tomu používáme metodu send(message). Je možné vytvořit také MessageProducer aniž bychom specifikovali cíl – ten se poté musí specifikovat v send nebo publish metodách.

Objekt MessageConsumer vytváříme pomocí objektu Session a slouží k příjmu zpráv. Je nutné si uvědomit, že po přihlášení k frontě (queue) zprávy zůstávají ve frontě, dokue je nevyzvedneme. Naproti tomu po přihlášení k tématu (topic) je nutné být on-line, abychom mohli přijímat. Můžeme ale vytvořit konzumenta pomocí Session.createDurableSubscriber() metody – tedy trvalého příjemce publish-subscribe komunikace.

Konzumace zpráv může být asynchronní a synchronní. U synchroního MessageConsumer explicitně volá metodu receive() na objektech topic nebo queue. U asynchronního režimu se automaticky volá metoda onMessage() třídy MessageListener. Objekt MessageListener se registruje u konkrétního MessageConsumer objektu před voláním metody start(). Jeden a ten samý listener může zpracovávat zprávy z front nebo topiců. Specifikace J2EE definuje jako jeden typ EJB speciální typ MessageListener rozhraní – MDB.

Třídy TopicRequestor a MessageRequestor pomáhají při tvorbě request-response komunikace. Při použití těchto pomocných tříd stačí zavolat metodu request() a ta provede odeslání zprávy a počká na výsledek, který ihned vrátí (je blokující).

A konečně MessageSelector je nepovinným filtrem na straně odesílatele, může se tak snížit provoz na síti. Tím se ale také může zvýšit zatížení serveru – selektory se provádí vždy na serveru a mohou být časově náročné. Používá jednoduchou syntaxi postavenou na syntaxi SQL92.

Zprávy v JMS

Obsahuje header, properties a body. Pouze header je povinný, a jsou nastaveny následující hodnoty:

název položkypopis
JMSDestionatioinCíl zprávy.
JMSDeliveryModeRežimy doručování, detaily níže.
JMSExpirationExpirace zprávy v milisekundách. Je nutno dbát na synchronizaci času napříč servery!
JMSPriorityJMS zpráva může mít nastavenou prioritu, která se zadává číselně od 0 do 9 (nejvyšší).
JMSMessageIDJedinečný identifikátor.
JMSTimestampČasové razítko, kdy byla zpráva předána k poslání (skutečný čas odeslání může být jiný).
JMSCorrelationIDID pro párování zpráv, například dotaz-odpověď. Nepovinné.
JMSReplyToKam má příjemce odpovídat. Nepovinné.
JMSTypeIdentifikátor typu zprávy.
JMSRedeliveredPokud je nastaven, zpráva byla už doručena (ale nepřijata).

Properties jsou dodatečné (nepovinné) vlastnosti zprávy nebo vlastnosti závislé na implementaci. Vlastnosti rozšiřující mají prefix JMSX (například JMSXUserID, JMSXAppID, JMSXDeliveryCount, JMSXGroupID a jiné). Properties mohou být těchto typů: boolean, byte, short, int, long, float, double a String.

Seznam vlastností pro systém TIBCO EMS shrnuje tabulka.

vlastnostmožné hodnotypopis
JMS_TIBCO_COMPRESSBooleanV případě perzistentních zpráv ve frontě nebo topiců s durable subscribery je při ukládání na disk použita komprese pro těla zpráv. Komprese se vůbec netýká přenosu, ale jen ukládání na disk.
JMS_TIBCO_PRESERVE_UNDELIVEREDBooleanPokud nemohla být zpráva doručena, je přesunuta do undelivered fronty. Z té se obvykle zasílají někam jinam (správci a podobně).
JMS_TIBCO_SENDERStringPokud je cíl nakonfigurován, TIBCO sem ukládá jméno odesílatele.
JMS_TIBCO_DISABLE_SENDERBooleanPokud odesílatel nechce, aby se do JMS_TIBCO_SENDER jeho jméno ukládalo, může nastavit tuto hodnotu na true, nicméně to mu nepomůže, pakliže bylo na cíli nastavena volba sender_name_enforced.
JMS_TIBCO_MSG_EXTBooleanTento flag je nastaven, pokud byly typy MapMessage nebo StreamMessage rozšířeny o věci, které JMS nepodporuje (zapouzdřená zpráva, pole z primitivních typů). Toto může být nastaveno, pokud zpráva přišla ze systémů TIBCO Rendezvous nebo ActiveEnterprise.
JMS_TIBCO_MSG_TRACEStringZapne trasování zprávy (hodnota null) nebo kompletní trasování včetně těla (hodnota „body“).
JMS_TIBCO_IMPORTEDBooleanZpráva byla importována z jiných messagingových systémů.

Typy těla zpráv shrnuje následující tabulka. Maximální velikost zprávy je u TIBCO EMS 512 MB.

typ zprávypopis
MessageZpráva nemá tělo.
TextMessageTyp String.
MapMessageKey/value dvojice, klíče jsou String, hodnoty jsou potomci třídy Object.
BytesMessageProud bajtů.
StreamMessageProud primitivních typů.
ObjectMessageSerializovaný Object.

Pokud se zpráva vytváří v prostředí Javy, mají řetězce kódování UTF-8. TIBCO EMS ale má také API pro komunikaci z jiných jazyků (např. C), v tomto případě lze nastavit kódování. Jakmile zpráva přijde do prostředí Javy, TIBCO řetězce automaticky překonvertuje do UTF-8.

Potvrzování příjmu je implementováno v JMS na dvou úrovních. Při zasílání zprávy na JMS server:

  • 1) zpráva je odeslána
  • 2) server pošle zpět potvrzení o doručení (toto potvrzení lze vypnout nastavením na reliable delivery)

Při zasílání zprávy příjemci jsou možné fáze tři:

  • 3) poslání zprávy ze serveru příjemci
  • 4) potvrzení příjmu
  • 5) potvrzení o potvrzení příjmu (předchází duplicitám)

Rozlišujeme tři úrovně potvrzování:

  • CLIENT_ACKNOWLEDGE – konzument sám pošle potvrzení všech zpráv zaslaných v jednom sezení voláním metody acknowldege();
  • AUTO_ACKNOWLEDGE – session automaticky pošle potvrzení o příjmu po opuštění metody onMessage();
  • DUPS_OK_ACKNOWLEDGE – konzumentům nevadí příjem duplicit, takže na server může odejít potvrzení později – klient bude porvrzovat po skupinách (lazy potvrzování tedy může duplicity generovat).

TIBCO EMS dále přidává několik dalších režimů:

  • NO_ACKNOWLEDGE – žádné potvrzování, se zprávou se zachází, jako by byla ihned potvrzena (nelze však použít u durable subskribcí a subskripcí na routovaný server);
  • EXPLICIT_CLIENT_ACKNOWLEDGE – jako CLIENT_ACKNOWLEDGE ale potvrzuje se každá zpráva zvlášť;
  • EXPLICIT_CLIENT_DUPS_OK_ACKNOWLEDGE – jako DUPS_OK_ACKNOWLEDGE, ale také individuálně.

Režimy doručování jsou definovány v JMS standardu:

  • PERSISTENT – Je zajištěno, že v případě jednoho nebo více trvalých přihlášení dochází k ukládání zprávy na disk, dokud není bezpečně odeslána příjemcům. Nemělo by tedy dojít ka ztrátě při výpadku serveru.
  • NON_PERSISTENT – V tomto režimu se zprávy vůbec na disk nemusejí ukládat

TIBCO EMS pak ještě přidává ještě dvě možnosti:

  • RELIABLE_DELIVERY, kdy producent zprávy nedostává žádné potvrzení (číslo 2 neproběhne);
  • NO_ACKNOWLEDGE, kdy po odeslání zprávy konzumentovi už se server ničím nezabývá a klient už nic nepotvrzuje.

Nastavení těchto dvou režimů se provádí nad producerem, resp. nad sezením:

import com.tibco.tibjms.tibjms.Tibjms;
 
messageProducer.setDeliveryMode(Tibjms.RELIABLE_DELIVERY);
// resp.
TopicSession session = conn.createTopicSession(false, Tibjms.NO_ACKNOWLEDGE);

Příklad

Nyní se podíváme na komplexní a vševysvětlující příklad jednoduchého chatu.

/*
 * Copyright (C) 2000, O'Reilly & Associates, Inc.
 * The code in this file may be copied, distributed, and reused,
 * provided that you do not remove this copyright notice.
 * O'Reilly & Associates assumes no responsibility for damages
 * resulting from the use of this code.
 *
 * Modified by Lukas Zapletal.
 */
 
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;
 
public class Chat implements javax.jms.MessageListener {
 
    private TopicSession pubSession;
    private TopicSession subSession;
    private TopicPublisher publisher;
    private TopicConnection connection;
    private String userName;
 
    String providerContextFactory = "com.tibco.tibjms.naming.TibjmsInitialContextFactory";
    String defaultProtocol = "tibjmsnaming";
    String defaultProviderURL = defaultProtocol + "://localhost:7222";
 
    /* Run the Chat client */
    public static void main(String[] args) {
        try {
            Chat chat = null;
            if (args.length == 3) {
                // args[0]=topicName; args[1]=username; args[2]=password
                chat = new Chat(args[0], args[1], args[2]);
            } else if (args.length == 1) {
                chat = new Chat(args[0], null, null);
            } else {
                System.out.println("Topic missing");
                System.exit(1);
            }
 
            // read from command line
            BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
 
            // loop until the word "exit" is typed
            while (true) {
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) {
                    chat.close(); // close down connection
                    System.exit(0); // exit program
                } else {
                    chat.writeMessage(s);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /* Constructor. Establish JMS publisher and subscriber */
    public Chat(String topicName, String username, String password) throws Exception {
 
        // Obtain a JNDI connection
        Properties env = new Properties();
        // ... specify the JNDI properties specific to the vendor
        env.put(Context.INITIAL_CONTEXT_FACTORY, providerContextFactory);
        env.put(Context.PROVIDER_URL, defaultProviderURL);
        if (username != null) {
            env.put(Context.SECURITY_PRINCIPAL, username);
            env.put(Context.SECURITY_CREDENTIALS, password);
        }
        InitialContext jndi = new InitialContext(env);
 
        // Lookup a JMS connection factory
        TopicConnectionFactory conFactory =
                (TopicConnectionFactory) jndi.lookup("TopicConnectionFactory");
 
        // Create a JMS connection
        TopicConnection connection = conFactory.createTopicConnection(username, password);
 
        // Create a JMS session object
        TopicSession pubSession = connection.createTopicSession(false,
                Session.AUTO_ACKNOWLEDGE);
        TopicSession subSession = connection.createTopicSession(false,
                Session.AUTO_ACKNOWLEDGE);
 
        // Lookup a JMS topic
        Topic chatTopic = (Topic) jndi.lookup(topicName);
 
        // Create a JMS publisher and subscriber
        TopicPublisher publisher = pubSession.createPublisher(chatTopic);
        TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
 
        // Set a JMS message listener
        subscriber.setMessageListener(this);
 
        // Intialize the Chat application
        set(connection, pubSession, subSession, publisher, username);
 
        // Start the JMS connection; allows messages to be delivered
        connection.start();
    }
 
    /* Initializes the instance variables */
    public void set(TopicConnection con, TopicSession pubSess, TopicSession subSess,
                    TopicPublisher pub, String username) {
        this.connection = con;
        this.pubSession = pubSess;
        this.subSession = subSess;
        this.publisher = pub;
        this.userName = username == null ? "anonymous" : username;
    }
 
    /* Receive message from topic subscriber */
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException jmse) {
            jmse.printStackTrace();
        }
    }
 
    /* Create and send message using topic publisher */
    protected void writeMessage(String text) throws JMSException {
        TextMessage message = pubSession.createTextMessage();
        message.setText(userName + " : " + text);
        publisher.publish(message);
    }
 
    /* Close the JMS connection */
    public void close() throws JMSException {
        connection.close();
    }
}

Odkazy

pridej.cz
papers/java_message_service_1.1.txt · Poslední úprava: 29.05.2008 09:04 (external edit)
Creative Commons License Valid CSS Driven by DokuWiki Recent changes RSS feed Valid XHTML 1.0