`
awaitdeng
  • 浏览: 214073 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

redis发布与订阅实例

阅读更多
package com.redis.test;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class JedisPubSubTest {

 public static JedisPool pool;
 static {
  JedisPoolConfig jedispool_config = new JedisPoolConfig();
  jedispool_config.maxActive = 20;
  jedispool_config.maxIdle = 0;
  jedispool_config.maxWait = 1000;
  jedispool_config.testOnBorrow = true;

  pool = new JedisPool(jedispool_config, "127.0.0.1", 6379);
 }

 public static void main(String[] args) throws InterruptedException {
  Jedis redisClient1 = pool.getResource();
  Jedis redisClient2 = pool.getResource();
  MyListener listener = new MyListener();

  Publisher pub = new Publisher();
  pub.publish(redisClient2); //发布一个频道

 

  Subscriber sub = new Subscriber();
  sub.psub(redisClient1, listener); // 订阅一个频道

  
 }

}

 

 

 

 


class Subscriber {
 
 public void psub(final Jedis redisClient, final MyListener listener) {
  

  new Thread(new Runnable() {
   @Override
   public void run() {

    System.out.println("订阅:news.share");
    // 订阅得到信息在lister的onMessage(...)方法中进行处理

    // 订阅多个频道
    // redisClient.subscribe(listener, "news.share", "news.log");

    //redisClient.subscribe(listener, new String[]{"news.share","news.log"});
      redisClient.psubscribe(listener, new String[] { "news.share" });// 使用模式匹配的方式设置频道
   }
  }).start();

 }

}

 

 


class Publisher {
 
 public void publish(final Jedis redisClient) {
  
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.currentThread().sleep(2000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    System.out.println("发布:news.share");
    redisClient.publish("news.share", "ok");
    redisClient.publish("news.share", "hello word");
   }
  }).start();

 }

}

 

 

消息监听:

 

package com.redis.test;

import redis.clients.jedis.JedisPubSub;

public class MyListener extends JedisPubSub {
 // 取得订阅的消息后的处理
 public void onMessage(String channel, String message) {
  System.out.println(channel + "=" + message);
 }

 // 初始化订阅时候的处理
 public void onSubscribe(String channel, int subscribedChannels) {
   System.out.println(channel + "=" + subscribedChannels+"&&&&&&&");
 }

 // 取消订阅时候的处理
 public void onUnsubscribe(String channel, int subscribedChannels) {
   System.out.println(channel + "=" + subscribedChannels+"#########");
 }

 // 初始化按表达式的方式订阅时候的处理
 public void onPSubscribe(String pattern, int subscribedChannels) {
   System.out.println(pattern + "=" + subscribedChannels+"!!!!!!!");
 }

 // 取消按表达式的方式订阅时候的处理
 public void onPUnsubscribe(String pattern, int subscribedChannels) {
   System.out.println(pattern + "=" + subscribedChannels+"@@@@@@@@@");
 }

 // 取得按表达式的方式订阅的消息后的处理
 public void onPMessage(String pattern, String channel, String message) {
  System.out.println(pattern + "=" + channel + "=" + message);
 }
}


结论: 经测试 sub 必须发pub 之前,否则收不到 message, 不要 pool.returnResource操作,会报异常。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics