在BEA WebLogic中使用Java消息服務

發表于:2007-05-25來源:作者:點擊數: 標簽:javaWebLogicBEA消息服務
在最近兩年里,Sun引入了很多針對企業應用程序 開發 的API。其中最令人興奮的是Java消息服務或者JMS。JMS API是為了在企業中進行消息傳遞而設計的,其中,JNDI的任務是為目錄服務命名和提供目錄服務,而JNBC的任務是提供 數據庫 訪問。JMS是為了給企業消息傳

  在最近兩年里,Sun引入了很多針對企業應用程序開發的API。其中最令人興奮的是Java消息服務或者JMS。JMS API是為了在企業中進行消息傳遞而設計的,其中,JNDI的任務是為目錄服務命名和提供目錄服務,而JNBC的任務是提供數據庫訪問。JMS是為了給企業消息傳遞提供一種通用工具而設計的API,它沒有考慮將消息發送到任何一臺應用服務器的底層實現,也沒有考慮您希望使用的其他企業消息傳遞軟件技術。對于那些創建或者使用面向消息的中間件(MOM)的開發人員,尤其是那些需要在自己的產品中使用這些工具的Java開發人員,這是一個很大的進步。通過JMS,您應該能夠編寫一組針對JMS API進行消息傳遞的代碼,然后在任何提供JMS支持的消息傳遞系統中使用這些代碼。
  在本文中,我將向您展示如何創建一系列的消息生產者和消費者,他們利用了JMS API的大部分函數來使用持久存儲的主題和隊列、創建臨時目的地、通過消息選擇器過濾消息,等等。我將在JMS的第一個供應商的實現中實現這些例子,該實現是隨BEA的WebLogic應用服務器一起提供的(關于該服務器以及配置JMS的詳細信息,可以從www.JavaDevelopersJournal.com的“install.txt”在線文件中獲得)。在這一過程中,我將帶您一起經歷實現這些應用程序的必要步驟,其中包括修改應用服務器來支持您將創建的示例代碼。
  在繼續創建下面的示例代碼之前,建議您先獲得并安裝最新版本的BEA/WebLogic應用服務器??梢詮腂EA的網站http://ecommerce.bea.com/index.jsp下載免費使用試用版。

代碼
  包括在線的源文件在內,總共有4個源文件展示了如何在JMS中使用兩種不同的消息傳遞類型:發布/訂閱和點對點。Sender.java和Receiver.java展示了如何在JMS中使用Queue進行點對點消息傳遞,而Publisher.java和Subscriber.java展示了如何使用Topic進行發布/訂閱消息傳遞。“readme.txt”文件(也在網上)包含運行各種應用程序的命令以及有關代碼的最新信息。在嘗試運行這些應用程序之前,一定要先閱讀“install.txt”文件,并根據指示對WebLogic服務器進行必要的修改。因為示例代碼中還包含一些注釋和JMS的更多細節,本文無法完全介紹這些細節,因此,我們鼓勵讀者檢查源代碼并運行這些應用程序。

Java消息服務
  Java消息服務被設計為一組接口,這些接口將由MOM供應商和其他希望支持消息傳遞的供應商(應用服務器供應商、數據庫服務器供應商,等等)實現。這些接口為希望實現代碼的客戶應用提供了一個公共API,針對可能的、給定數量的底層消息傳遞系統進行消息傳遞。
  同時,JMS的設計非常靈活,它可以在保持可移植性的同時提供對現有消息傳遞系統的廣泛支持。因此,它不支持每個消息傳遞系統中每種可能的消息傳遞選擇。盡管您可能熟悉任何給定的MOM產品,但JMS不會自動支持該產品的每一個方面。
  JMS中的主要概念是Destination。Destination僅僅是消息生產者與消息消費者之間的一種關聯。Destination分為兩種類型:Topic和Queue。為每種Destination類型定義的單獨接口允許供應商支持其中一種類型或者兩種類型都支持,這取決于供應商的消息傳遞工具。Topic是為發布/訂閱消息傳遞而設計的,而Queue是為客戶對客戶(點對點)的消息傳遞設計的。這兩種Destination類型都支持持久性。JMS還提供了對事務的支持。事務使您能夠支持消息傳遞操作的提交和回滾。因此,如果事務中的操作失敗了,那么您可以對發生在該事務中的所用消息傳遞操作執行回滾。同樣地,如果每件事都很順利,那么您可以提交這些消息傳遞操作,使它們持久可用。因為JMS事務超出了本文的討論范圍,所以我將讓您自己去研究JMS的這些方面。

初始化JMS
連接工廠

  要初始化JMS,則需要使用ConnectionFactory,它是一個標記接口,其中沒有任何方法,并且可以通過TopicConnectionFactory或者QueueConnectionFactory得以擴展。供應商實現這些工廠接口中的一個或者兩個接口均實現,以提供對其特定消息傳遞服務實現的訪問。WebLogic提供了每種Factory類型的一般實現。Factory,以及Topic和Queue,都被認為是“托管”對象,因此它們都是WebLogic服務器在開機時創建的(在我們的例子中,我們將使用WebLogic的JMS實現)。然后,可以通過JNDI檢索這些托管對象。還可以使用JNDI命名上下文來檢索這些托管對象。
  如果這看起來有點讓您感到迷惑,不要擔心,示例代碼會使您很容易理解它們。您還可以在WebLogic中創建自己的Factory對象,而不是使用默認的Factory,這項操作是在weblogic.properties文件中完成的。用戶定義的Factory將在后面進行討論,這一節主要關注Topic,定義自己的Factory的詳細信息包含在“install.txt”文件中,該文件以及本文的源代碼都可以在網上找到。
  以下代碼來自Sender.java示例,它將向您展示如何初始化JMS,以及如何從JNDI中獲得Queue的ConnectionFactory:

public final String JMS_FACTORY = "javax.jms.QueueConnectionFactory";
...
queueFx = (QueueConnectionFactory) initCtx.lookup(JMS_FACTORY);

  Queue的默認ConnectionFactory的名稱被傳遞到初始JNDI命名上下文的lookup()方法中。然后會把對QueueConnectionFactory實現的引用從WebLogic應用服務器返回給我們(Sender.java 示例代碼中的getInitialContext()方法展示了初始化JNDI和從WebLogic中獲得初始命名上下文的細節。您可以從Sun的網站獲得關于API的詳細信息。網址是:http://java.sun.com/products/jndi/1.2/javadoc/index.html)。
  因為ConnectionFactories 是托管對象,所以由WebLogic應用服務器負責為Queue、Topic以及所有用戶定義的Factory創建默認ConnectionFactories,并將它們與WebLogic JNDI實現中的名稱綁定在一起,以便通過客戶端代碼進行查找。因此,在JNDI中,Queue的默認ConnectionFactory的名稱是“javax.jms.QueueConnectionFactory”,在通過JNDI查找該名稱的時候,將返回一個對這個接口的默認實現的引用,該引用是由WebLogic提供的。Topic也有一個默認的ConnectionFactory,它遵循相同的格式,即“javax.jms.TopicConnectionFactory”。通過JNDI進行查找將返回一個對TopicConnectionFactory默認實現的引用,它也是WebLogic提供的。

連接
  在成功創建正確的ConnectionFactory后,下一步將是創建一個“連接”,它是JMS定義的一個接口,表示連接到底層消息傳遞供應商的客戶機連接。ConnectionFactory負責返回可以與底層消息傳遞系統進行通信的Connection實現。通??蛻魴C只使用單一連接。根據JMS文檔,Connection的目的是“利用JMS提供者封裝開放的連接”,以及表示“客戶機與提供者服務例程之間的開放TCP/IP套接字。該文檔還指出Connection應該是進行客戶機身份驗證的地方,除了其他一些事項外,客戶機還可以指定惟一標志符。像ConnectionFactories一樣,Connections也有兩種類型,這取決于您將使用的Destination類型。QueueConnection和TopicConnection 都擴展了基本的Connection接口,通常,您只能使用其中的一個,使用哪一個則取決于客戶將使用的消息傳遞方式。Connection的創建是通過ConnectionFactory完成的。Connection包含兩種重要的方法:start和stop,開始和停止在連接過程中發送和接收消息。請參見清單1中的完整代碼塊。

會話
  一旦從ConnectionFactory中獲得一個Connection,就必須從Connection中創建一個或者多個會話。Session也是基本接口,同時,有兩種擴展基本接口的特定于Destination的接口:QueueSession和TopicSession,它們可以提供特定于Queue或者Topic的Session方法。Session是為Destination類型生產消息的消費者或生產者的工廠的一種類型(如果Destination類型是QueueSession,那么它將創建面向Queue的生產者和消費者;如果Destination類型是TopicSession,那么它將創建面向Topic的生產者和消費者)。
  Session可以被事務化,也可以不被事務化,通常,您可以通過向Connection上的適當創建方法傳遞一個布爾參數對此進行設置。同樣,您也可以向Connection的Session創建方法傳遞一個參數,該方法可以設置將創建的Session的消息確認模式,而該模式將指定是由客戶機還是由消費者來確認它所檢索的任何信息(如果使用事務機制,則忽略該參數)。Session提供的其他方法是各種消息創建方法,這些方法允許您創建包含文本、字節、屬性甚至串行化Java對象的特定類型的JMS消息(有關的更多信息在下面的Message接口上)。圖1是實際JMS接口的繼承和創建關系的圖表,請參見清單1中的代碼。

(暫缺圖)

目的地(Destination)
  在創建任何消息的生產者或消費者之前,必須要有特定的Destination,通過它您可以將任何生產者與消費者聯系起來。記住,Destination是托管對象,類似ConnectionFactories。這意味著Destination是由WebLogic維護的,必須通過JNDI查找來檢索它。這還意味著,在這種情況下,Destination必須是事先定義好的。但這并不是說您總是要提前創建Destination。JMS API提供了創建臨時Destination的能力,但這個Destination只能在創建它的Session的生存周期中使用,JMS API也能在運行時創建永久性Destination。然而,在JMS當前的WebLogic實現中,應該注意的是,如果通過Session創建Destination,那么只要WebLogic服務器在運行,這些Destination就會存在。如果服務器出故障或者死機了,那么Destination也會不存在。創建真正的永久性Destination的方法是在weblogic.properties文件中創建它(請參閱“install.txt”文件,以獲得關于如何做到這一點的詳細信息)。
  鑒于本文的目的,我們的Destination是通過weblogic.properties文件提前創建的。該文件中定義的Destination是WebLogic應用服務器在開機時創建的,并且可以通過JNDI用于客戶代碼。清單1顯示了Send.java文件中的代碼,展示了如何創建QueueConnection和QueueSession,以及如何像檢索Sender應用程序的包層次結構那樣檢索Destination,我們在weblogic.properties文件中定義的隊列的名稱為“jdj.article.queue.sender”。
消息的消費者和消息的生產者
  JMS初始化過程的最后一個階段是創建MessageConsumers和MessageProducers。像 ConnectionFactory一樣,它們也有Connection 和Session兩個基本接口,為了使用Topic或者Queue,還有一些擴展這兩個基本接口的特定于Destination的基本接口。(我在使用MessageProducer時使用Producer這個術語,在使用MessageConsumer時使用Consumer這個術語)。MessageConsumers用于檢索發送到Destination的消息,MessageProducers用于將消息發送到Destination。二者均由Session實例創建。MessageProducer由特定于Destination的接口QueueSender和TopicPublisher擴展。MessageConsumer則由QueueReceiver和TopicSubscriber接口擴展。
  一旦已經創建了自己的MessageConsumer和/或MessageProducer,也就做好了接收和/或發送消息的所有準備。因為生產者和消費者的創建是特定于Queue或Topic的,所以我將在下面的相關小節中討論特定于Destination類型的這兩種類型的處理。

消息
  在進入下一步并開始深入探討Topic和Queue中的消息發送和接收之前,我們需要討論另一個接口,即Message,它表示JMS消息本身。這個對象包含將發送到Destination的信息,以及正在Destination上進行監聽的消費者接收的信息。Message是由會話實例創建的,它由三個部分組成:消息頭、屬性和消息體。消息頭是用來識別和路由消息的,客戶端開發人員通常不會看到或者處理消息頭信息。屬性支持通過消息傳遞的特定于應用程序的值。這些屬性字段是預先定義的,對它們的完整描述可以在JMS文檔中找到。我將在示例代碼中使用一些屬性。消息的主體部分是消息的實際“有效負載”,它有5種類型,可以通過5個特定的接口來表示它們:StreamMessage、MapMessage、ObjectMessage、TextMessage 和BytesMessage(參見圖1)。

持久性
  JMS還有一個重要方面值得討論:它支持消息的持久傳遞。很簡單,這意味著當消息發送到Destination時,如果Consumer沒有運行或不可用,那么這個消息將被保存起來,直至下次Consumer連接到Destination為止。例如,如果您有5個應用程序在某一個Topic上進行監聽,其中一個崩潰了,那么下一次該應用程序啟動并連接到這個特定的Topic時,盡管該應用程序崩潰了,但發送到這個Topic的所有消息都將在應用程序開始再次進行監聽時發送到這個應用程序。如果這看起來難以理解,那么在您運行示例代碼時就會更有意義。

隊列
  Queue是為“點對點”或“一對一”的消息傳遞而設計的。這意味著應該只有一臺客戶機發送消息給Queue,并且只有一個應用程序可以處理這個Queue中的消息。
事實上,您可以有多臺向某一個Queue發送消息的客戶機,但只能有一個應用程序處理該Queue中的消息。如果在Queue上有多個Consumer,那么哪個Consumer接收消息將無法保證,但是只能有一個Consumer接收消息。如果Destination上需要多個Consumer,并且想讓它們都收到相同的消息,那么應該使用Topic(參見本文后面內容)。
  Sender.java和Receiver.java文件中包含展示如何使用Queue的代碼。這些代碼展示了JMS的初始化過程,并展示了如何檢索預定義的Queue,以及為了在Queue上發送和接收消息,應該如何創建MessageProducer和MessageConsumer。
  為了在Queue中消費和產生消息,系統中有兩個特定接口。其中一個接口是QueueSender,它是通過調用QueueSession的一個createQueueSender()方法,從QueueSession返回的,用于向Queue發送消息。另一個接口是QueueReceiver,它是通過調用QueueSession的一個createQueueReceiver()方法,從QueueSession返回的,用于接收來自Queue的消息。
  清單2是來自Sender.java的sendMsg方法的代碼,它展示了如何從Session中如何創建MessageProducer,然后構造一個TextMessage并發送它。在這段代碼中,我們將創建一個QueueSender,然后創建一個TextMessage,它包含從應用程序用戶界面的TextField中檢索到的文本。隨后,我們使用QueueSender方法“發送”消息。
  在MessageConsumer上,有兩種處理傳入消息接收的方法:同步的和異步的。第一步是創建MessageConsumer;下一步是判斷您想在Consumer上同步發送信息還是異步發送消息。
  在從Session中創建了MessageConsumer之后,如果想同步接收為Destination生成的下一條消息,那么可以調用MessageConsumer上的檢索方法。該方法沒有采用任何參數,在接收下一個Message之前,它將一直受阻,并且會將這個Message返回給調用者。為了接收異步消息,可以向MessageConsumer注冊一個MessageListener接口的實現。這種方法對Topic 和 Queue都適用。MessageListener只有一個您必須實現的方法 —— onMessage,它只接收一個參數,即Message。在為每個發送到Destination的消息實現onMessage時,將調用該方法。Sender.java和Receiver.java中的onMessage實現中示范了這個過程。在Receiver.java中,我們將下面的代碼放入initializeJMS方法中,這段代碼將創建MessageConsumer(一個QueueReceiver)并設置MessageListener的實現:

// Create a Receiver for the Queue...
receiver = session.createReceiver(queue);
// Set the listener (this class)
receiver.setMessageListener(this);

  一旦調用了Connection的start方法,消息就會在到達Destination之時開始進入Consumer。

ReplyTo——使用臨時隊列
  您還會注意到,Sender.java和Receiver.java都實現了MessageConsumers和MessageProducers。二者都實現了MessageListener。這說明了JMS的一個有趣特征,也就是說,它使用了臨時Destination。希望接收它所產生信息的響應的應用程序可以創建一個臨時的Queue或者Topic,并在它發送的Message中傳遞這個Destination。
Message的屬性之一是JMSReplyTo屬性。這個屬性就是用于這個目的的。您可以創建一個臨時的Queue或者Topic,并把它放入Message的JMSReplyTo屬性中。收到該消息的消費者有一個私有的臨時Destination,可以用它來響應發送者??梢酝ㄟ^兩種方法對這一點進行說明,這兩種方法分別在兩個文件中。Sender.java中包含如下所示代碼段,它將創建臨時的Queue,并將它放置在TextMessage的JMSReplyTo屬性中:

// Create a temporary queue for replies...
tempQueue = (Queue) session.createTemporaryQueue();

  以上這行代碼可以在Sender.java的initializeJMS方法中找到。這段代碼將在啟動應用程序時創建一個臨時Queue,而這個Queue將存在于應用程序的整個生命周期中。下面這行代碼可以在Sender.java的sendMsg方法中找到,它展示了如何設置JMSReplyTo屬性,以包含臨時的Queue。

// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);

  在Receiver.java的QueueReceiver接收這條消息時,會從JMSReplyTo字段中提取臨時Queue,并且會通過應用程序構造一個QueueSender,以便將響應消息發送回Sender.java。這展示了如何使用JMS Message的屬性,并顯示了私有的臨時Destination的有用之處。它還展示了客戶機可以如何既是消息的Producer,有是消息的Consumer。下面的代碼來自Receiver.java,它展示了如何從JMS Message中提取臨時Queue;可以在onMessage方法中找到這些代碼:

// Get the temporary queue from the JMSReplyTo
// property of the message...
tempQueue = (Queue) msg.getJMSReplyTo();

  以下代碼塊來自sendReplyToMsg方法,它展示了如何創建QueueSender和如何發送應答:

// create a Sender for the temporary queue
if (sender == null)
sender = session.createSender(tempQueue);
TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);
...
// Send the message to the temporary queue...
sender.send(msg);

主題
  Topic是為了實現“發布/訂閱”消息傳遞機制而設計的。而Queue是為了擁有一個Producer和一個Consumer而設計的,Topic的設計目標是允許擁有多個能發送消息給它的Producer,同時擁有多個能接收來自Topic的同一消息的Consumer。
  Topic的MessageProducers和MessageConsumers的創建過程與Queue的類似。您可以使用Session來創建TopicPublishers和TopicSubscribers。之所以存在QueueSender和QueueReceiver鏡像,是因為它們都提供了特定于Topic的功能,并且都實現了基本的MessageProducer和MessageConsumer接口。
  TopicPublisher的創建過程幾乎與QueueSender的創建過程完全相同。以下代碼來自Publisher.java的sendMsg方法,它展示了TopicPublisher的創建過程,以及如何向Topic發布消息:

// create a Publisher if there isn't one...
if (publisher == null)
publisher = session.createPublisher(topic);
TextMessage msg = session.createTextMessage();
msg.setText(text);
...
// Publish it to the topic...
publisher.publish(msg);

持久訂閱者
  TopicSubscribers一個有趣的方面是它的持久訂閱者,即用戶提供的惟一名稱,可以在Session中識別這個名稱。Session擁有一個與之相關的Client ID,這個ID要么是通過Connection上的一個方法調用在運行時定義的,要么是作為托管ConnectionFactory的一部分(在我們的例子中,它是在weblogic.properties文件中用這種方法定義的)。因此,一個Connection可以提供一個帶有Client ID的Session,并且持久訂閱者的名稱是Session中的惟一標識符,它與特定的Client ID相關聯。持久訂閱者的目的是為給定的Topic創建一個惟一的、持久的Consumer。
  通過調用TopicSession的createDurableSubscriber方法創建TopicSubscriber的應用程序必須傳遞一個持久訂閱者的名稱(一個字符串)作為其參數之一;例如,您可以將持久訂閱者的名稱設置為目前已登錄的用戶的名稱,等等。這個名稱惟一地標識出了某一Topic的特殊訂閱者(連同Connection/Session的惟一Client ID)。一旦已經向Topic注冊這個持久訂閱者,該Topic就會持久性地確保消息被發送到這個訂閱者那里。這意味著如果某一特殊的持久訂閱者是不可用的,那么將會一直保存消息,直到下次這個持久訂閱者(有相同的、惟一的持久訂閱者名稱和Client ID)注冊為一個Consumer為止。Subscriber.java 文件展示了持久訂閱者的創建過程,并允許您使用默認的訂閱者名稱,或者從命令行設置這個標識符(要得到關于運行這個應用程序和展示通過擁有持久訂閱者獲得消息永久性的更多細節,請參閱“readme.txt”)。下列代碼片段來自Subscriber.java的initializeJMS方法,它展示了如何從TopicSession中創建一個持久訂閱者:

subscriber = session.createDurableSubscriber(
topic,
subscriberID,
SELECTOR,
false);
// Set the listener (this class)
subscriber.setMessageListener(this);

  TopicSubscribers并沒有被創建為持久訂閱者,因此它不會持久性地接收消息,而是只在運行期間接收消息。
  要想獲得關于持久訂閱者和Topic的更進一步的信息,請參閱Sun的JMS文檔。關于上述代碼的另外一點是:注意傳遞給該方法的第三個參數,這個參數是SELECTOR。這是消息選擇器與Consumer有關聯的地方(請參閱下面內容,以獲得關于消息選擇器的更多信息)。

過濾——使用消息選擇器
  我們將討論的有關JMS的最后一個方面是消息選擇器,消息選擇器是用于MessageConsumers的過濾器,可以用來過濾傳入消息的屬性和頭文件部分(但不過濾消息體),并確定是否將實際消費該消息。按照JMS文檔的說法,消息選擇器是一些字符串,它們基于某種語法,而這種語法是SQL-92的子集。您可以將消息選擇器作為MessageConsumer創建的一部分。根據MessageConsumer是QueueReceiver還是TopicSubscriber,這種將消息選擇器應用于傳入信息的方法也會稍有不同。我建議您通過檢查Subscriber.java文件以及運行Publisher和Subscriber應用程序,檢查消息收集器的語法,并查看如何應用它們。以下代碼段在Subscriber.java中定義了消息選擇器,而應用程序本身允許您從文本域中改變這個選擇器:

public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";

  該選擇器檢查了來自Topic的傳入消息的JMSType屬性,并確定了這個屬性的值是否等于TOPIC_PUBLISHER。如果相等,則將消息傳遞到MessageListener實現;如果不相等,那么消息會被忽略。請參閱“readme.txt”文件,以獲得關于運行這些應用程序和展示其行為的更多細節。此外還建議您參閱Sun的JMS文檔。

結束語
  JMS是一項在應用程序中創建可移植消息傳遞代碼的很有吸引力的、功能強大的技術。它允許進行“點對點”和“發布/訂閱”消息傳遞,而且還支持事務和持久性。BEA的WebLogic應用服務器提供了一種健壯并完整的JMS實現,它能與應用服務器提供的其他技術一同工作,比如EJB和servlet。這為通過事務支持在不同企業對象和服務間進行永久的,異步的消息傳遞創造了極大的可能。我希望本文可以鼓勵您開發在線示例代碼,并且可以鼓勵您檢查WebLogic(尤其是JMS)提供的可能性。

原文轉自:http://www.anti-gravitydesign.com

国产97人人超碰caoprom_尤物国产在线一区手机播放_精品国产一区二区三_色天使久久综合给合久久97