Как я могу асинхронно извлекать сообщения из Activemq

Я хочу написать код для извлечения сообщений из Activemq. Я не хочу извлекать все сообщения из Activemq за раз, потому что мое требование заключается в том, что всякий раз, когда мое Java-приложение получает 1 сообщение от Activemq, на основе тела сообщения я найду соответствующий HTTP Link и вперед по этой ссылке. Для всей этой логики я написал 2 .java имена файлов

MessageConsumer.java

MyListener.java

Файл MessageConsumer.java только для установления соединения. Соответствующий код приведен ниже.

 package PackageName;
 import java.io.IOException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.jms.*;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
    throws ServletException, IOException {
    try {
      //creating connectionfactory object for way
      ConnectionFactory connectionFactory=new        
      ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
     //establishing the connection b/w this Application and Activemq
     Connection connection=connectionFactory.createConnection();
     Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     Queue queue=session.createQueue("MessageTesing");
     javax.jms.MessageConsumer consumer=session.createConsumer(queue);
     //fetching queues from Activemq
     MessageListener listener = new MyListener();
    consumer.setMessageListener(listener);
    connection.start();
    System.out.println("Press a key to terminate");
    }
  catch (Exception e) {
    // TODO: handle exception
}
 }
}

Файл MyListener.java предназначен для запуска соответствующих приложений. Код приведен ниже.

package PackageName;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    try {
        TextMessage msg1=(TextMessage)msg;
        //just for your understanding I mention dummy code
        System.out.println(msg1.getText());
        if (msg1.getText()=="Google") {
            System.out.println("Forwarding http link to Google");
        }
        else {
            System.out.println("Forwarding http link to Facebook");
        }
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

в моем посте я запускаю ссылки Google и Facebook. Но что касается моих требований, я буду называть свои собственные приложения. Каждое приложение занимает более 20 минут. Поэтому я хочу получать сообщения одно за другим. как только процесс предыдущего сообщения завершен, тогда только это получит другое сообщение от Activemq .

Но точно знаю, что я получаю все сообщения одновременно. Как я могу это исправить. Я видел программу Activemq-Hellowworld. Я не понял.

Извините, я новичок в Java технологии. Кто-нибудь может мне помочь.

Спасибо.


person Hanumath    schedule 09.08.2013    source источник


Ответы (2)


Если вы используете MessageListener, вы фактически получаете сообщения асинхронно (в другом потоке).

Вероятно, вы ищете синхронный прием сообщений, поэтому попробуйте это в своем основном потоке:

final QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();

while (true) {
  Message message = queueReceiver.receive();
  // Process your message: insert the code from MyListener.onMessage here

  // Possibly add an explit message.acknowledge() here, 
  // if you want to make sure that in case of an exception no message is lost
  // (requires Session.CLIENT_ACKNOWLEDGE, when you create the queue session)

  // Possibly terminate program, if a certain condition/situation arises
}

без MessageListener.

receive() блокирует до тех пор, пока сообщение не будет доступно, поэтому ваш основной поток (и, следовательно, ваша программа) ожидает в методе receive. Если приходит сообщение, он его получает и обрабатывает.

Обновить

Если вы используете

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

тогда вам следует позвонить

message.acknowledge()

после того, как сообщение было полностью обработано.

Тогда как в случае Session.AUTO_ACKNOWLEDGE сообщение немедленно удаляется из очереди (и, следовательно, теряется, если программа завершает работу во время обработки сообщения).

person Beryllium    schedule 09.08.2013
comment
Я интегрировал ваш код в свой код. Даже если я получаю сообщения от Activemq, количество ожидающих сообщений не уменьшается в моей консоли Activemq. Почему это происходит именно так. - person Hanumath; 09.08.2013
comment
@Beryllium Я полностью запутался из-за этой задачи. - person Hanumath; 09.08.2013
comment
@ user2642355 Это меняет вопрос. Я предлагаю вернуться к старой версии вопроса и опубликовать отдельный вопрос с темой Как спроектировать .... В этом вопросе, пожалуйста, опишите вариант использования вашего приложения (никаких технических вещей, только процесс) и реализация, которую вы сделали до сих пор (сервлет, использующий приемник JMS). Сочетание использования приемника JMS в сервлете, вероятно, не то, что вам нужно, поэтому нам нужно взглянуть на это с точки зрения варианта использования. - person Beryllium; 09.08.2013
comment
Я разместил другой вопрос. Вы можете проверить следующую ссылку. http://stackoverflow.com/questions/18147115/how-can-i-call-externalapplication-based-on-activemq-message-using-jms - person Hanumath; 09.08.2013

Вместо использования MessageListener вы можете использовать метод receive() в объекте MessageConsumer. Таким образом, вы получаете только одно сообщение каждый раз, когда вызываете метод receive().

MessageConsumer consumer = session.createConsumer(destination); 
Message message = consumer.receive(1000);
person jan    schedule 09.08.2013