Spring+ActiveMQ+Mysql 配置JMS
一、准备一个可以运行的Spring环境
二、下载ActiveMQ (下载地址)
2.1 先确保ActiveMQ运行正常,直接运行 安装目录\bin\activemq.bat即可,
注意:如果要以服务方式运行的话,可以使用ActiveMQ 提供的工具 安装目录\bin\win32\InstallService.bat 确保以管理员方式运行
可以打开链接, (http://localhost:8161/admin)查看是否安装成功
三、试用
单独编写消息发送者和消息接受以测试相应
消息发送
public static void main(String[] args) throws JMSException{// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值Queue.Name是Query的名字Destination destination = session.createQueue("[color=red]Queue.Name[/color]");// MessageProducer:消息生产者MessageProducer producer = session.createProducer(destination);// 设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 发送一条消息sendMsg(session, producer);session.commit();connection.close();}/** * 在指定的会话上,通过指定的消息生产者发出一条消息 * * @param session 消息会话 * @param producer 消息生产者 */public static void sendMsg(Session session, MessageProducer producer) throws JMSException{// 创建一条文本消息TextMessage message = session.createTextMessage("Hello ActiveMQ!");// 通过消息生产者发出消息producer.send(message);System.out.println("");}
public static void main(String[] args) throws JMSException{// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置Destination destination = session.createQueue("Queue.Name");// 消费者,消息接收者MessageConsumer consumer = session.createConsumer(destination);while(true){TextMessage message = (TextMessage) consumer.receive(1000);if(null != message)System.out.println("收到消息:" + message.getText());elsebreak;}session.close();connection.close();}
<!-- 配置JMS消息发送 --><bean id="jmsFactory" /></property></bean><bean id="destination" ref="myJmsTemplate"></property></bean><bean id="receive" ref="jmsFactory"></property><property name="messageListener" ref="receive"></property><property name="destination" ref="destination" /></bean><!-- 配置JMS消息发送完成 -->
@Componentpublic class JmsQueueSender{private JmsTemplate jmsTemplate;public void setConnectionFactory(ConnectionFactory cf){this.jmsTemplate = new JmsTemplate(cf);}public void simpleSend(){jmsTemplate.convertAndSend("Queue.Name", "test!!!");}public JmsTemplate getJmsTemplate(){return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate){this.jmsTemplate = jmsTemplate;}}
@Componentpublic class JmsQueueReceiver implements MessageListener{@Overridepublic void onMessage(Message message){if(message instanceof TextMessage){final TextMessage textMessage = (TextMessage) message;try{System.out.println(textMessage.getText());}catch(final JMSException e){e.printStackTrace();}}}}
<persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter>
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
<bean id="mysql-ds" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean>