MyException - 我的异常网
当前位置:我的异常网» 软件架构设计 » (java)简略实现原生RabbitMQ中的广播订阅(fanout

(java)简略实现原生RabbitMQ中的广播订阅(fanout)模式

www.MyException.Cn  网友分享于:2018-06-06  浏览:0次
(java)简单实现原生RabbitMQ中的广播订阅(fanout)模式

fanout 发送端

 

  1. import com.rabbitmq.client.Channel;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.ConnectionFactory;  
  4.   
  5. public class MyFanoutSender {  
  6.       
  7.      
  8.     private static final String MESSAGE = "my name is";  
  9.       
  10.     public static void main(String[] args){  
  11.         Connection conn = null;  
  12.         Channel channel = null;  
  13.         try {  
  14.             //初始化连接,主机,端口,用户名,密码可以自己定义  
  15.             ConnectionFactory factory = new ConnectionFactory();  
  16.             factory.setHost(HOST_PARAMETER.LOCAL_HOST);  
  17.             factory.setPort(HOST_PARAMETER.LOCAL_PORT);  
  18.             factory.setUsername(HOST_PARAMETER.LOCAL_USER_NAME);  
  19.             factory.setPassword(HOST_PARAMETER.LOCAL_PASSWORD);  
  20.             //创建连接  
  21.             conn = factory.newConnection();  
  22.             //创建通道  
  23.             channel = conn.createChannel();  
  24.             //定义为fanout类型的交换机  
  25.             channel.exchangeDeclare(HOST_PARAMETER.EXCHANGE_NAME, "fanout");  
  26.             //发送,指定routingkey为""  
  27.             channel.basicPublish(HOST_PARAMETER.EXCHANGE_NAME, ""null, MESSAGE.getBytes());  
  28.             System.out.println("I send a fanout massage!");  
  29.         } catch (Exception e) {  
  30.             e.printStackTrace();  
  31.         } finally{  
  32.             try {  
  33.                 if(channel != null){  
  34.                     channel.close();  
  35.                 }  
  36.                 if(conn != null){  
  37.                     conn.close();  
  38.                 }  
  39.             } catch (Exception e) {  
  40.                 e.printStackTrace();  
  41.             }  
  42.         }  
  43.     }  
  44. }  


fanout订阅者(接收端),可以自己多定义几个,才能看出效果

 

  1. import java.io.IOException;  
  2.   
  3. import com.rabbitmq.client.AMQP.BasicProperties;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.Consumer;  
  8. import com.rabbitmq.client.DefaultConsumer;  
  9. import com.rabbitmq.client.Envelope;  
  10.   
  11. public class MyFirstFanoutReceiver {  
  12.       
  13.   public static void main(String[] args){  
  14.       Connection conn = null;  
  15.       Channel channel = null;  
  16.       try {  
  17.         //初始化连接  
  18.         ConnectionFactory factory = new ConnectionFactory();  
  19.         factory.setHost(HOST_PARAMETER.LOCAL_HOST);  
  20.         factory.setPort(HOST_PARAMETER.LOCAL_PORT);  
  21.         factory.setUsername(HOST_PARAMETER.LOCAL_USER_NAME);  
  22.         factory.setPassword(HOST_PARAMETER.LOCAL_PASSWORD);  
  23.         //创建连接  
  24.         conn = factory.newConnection();  
  25.         //创建通道  
  26.         channel = conn.createChannel();  
  27.         //声明交换机类型  
  28.         channel.exchangeDeclare(HOST_PARAMETER.EXCHANGE_NAME, "fanout");  
  29.         //声明默认的队列  
  30.         String queue = channel.queueDeclare().getQueue();  
  31.         //将队列与交换机绑定,最后一个参数为routingKey,与发送者指定的一样""  
  32.         channel.queueBind(queue, HOST_PARAMETER.EXCHANGE_NAME, "");  
  33.         //消费者  
  34.         Consumer consumer = new DefaultConsumer(channel){  
  35.             @Override  
  36.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)  
  37.                     throws IOException {  
  38.                 System.out.println(new String(body,"utf-8")+" Tom");  
  39.             }  
  40.               
  41.         };  
  42.         channel.basicConsume(queue, true, consumer);  
  43.         System.out.println("i am the first fanout receiver!");  
  44.       }catch (Exception e) {  
  45.             e.printStackTrace();  
  46.       }   
  47.   }  
  48. }  

文章评论

软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有