redis监听Key失效


业务逻辑

监听redis失效的key,在key过期的时刻,做一些列操作。

修改配置

将两个文件的notify-keyspace-events " " 改为 notify-keyspace-events “Ex”

重启redis-server

测试

两种方案

  1. 在client终端测试
  2. 服务器调用

client终端

打开两个client终端

第一个输入PSUBSCRIBE __keyevent@*__:expired

结果如下:

第二个创建key的时效,例如输入setex lbx 10 true

十秒后,key为lbx的过期,并且在第一个client终端看见如下信息:

证明已经监测到失效的key,成功!

服务器调用

创建消息监听容器

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

@Configuration
public class MyRedisConfig {

    /**
     * Redis 消息监听器容器.
     *
     * @param redisConnectionFactory the redis connection factory
     * @return the redis message listener container
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }


    /**
     * Redis Key失效监听器注册为Bean.
     *
     * @param redisMessageListenerContainer the redis message listener container
     * @return the redis event message listener
     */
    @Bean
    public RedisEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer){
        return new RedisEventMessageListener(redisMessageListenerContainer);
    }

}

重写doHandleMessage

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.time.LocalDateTime;

public class RedisEventMessageListener extends KeyExpirationEventMessageListener {

    /**
     * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
     *
     * @param listenerContainer must not be {@literal null}.
     */
    public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    protected void doHandleMessage(Message message) {
        // 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
        // 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
        String key = message.toString();
        System.out.println("key = " + key);
        System.out.println("end = " + LocalDateTime.now());
    }
}

自定义错误处理类

自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.

自定义错误处理类:

@Component

public class RedisListenerErrorHandle implements ErrorHandler {

    @Override
    public void handleError(Throwable throwable) {

        System.out.println("正常监听");

    }

}

注入错误处理类:


@Autowired
RedisListenerErrorHandle redisListenerErrorHandle;
/**
 * Redis 消息监听器容器.
 *
 * @param redisConnectionFactory the redis connection factory
 * @return the redis message listener container
 */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    redisMessageListenerContainer.setErrorHandler(redisListenerErrorHandle);
    return redisMessageListenerContainer;
}

key加入redis

//自启动
@Order(3)
public class GradedAlarmController implements ApplicationRunner {

    Jedis jedis = new Jedis("127.0.0.1", 6379);

    @Override
    public void run(ApplicationArguments args) throws Exception {
        List<String> allDevice = getDeviceId();
        for(String id :allDevice){
            jedis.setex(id, 10, "true");
            System.out.println(id + "已加入了redis");
            //每个延时半秒,防止一次性加入太多炸了
            Thread.currentThread().sleep(500);
        }
    }
}

加入对象

如果加入的value是对象,需要序列化

Entity实体类继承序列化接口,否则实例化结果为null

加入jedis时,序列化

//加入时
 jedis.set((alarmEntity.getMonitorValue()).getBytes(), SerializeUtil.serialize(alarmEntity));

//取value时
 AlarmEntity alarmEntity = (AlarmEntity) SerializeUtil.deserialize(jedis.get((key).getBytes()));

//取之前最好先判断是否存在
 if (jedis.exists( key ))

线程池

自定义线程池类


public class JedisConnectionFactory {
    private final static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

        jedisPoolConfig.setMaxTotal(100);//最大空闲连接
        jedisPoolConfig.setMaxIdle(100);//最小空闲连接
        jedisPoolConfig.setMinIdle(0);//设置最长等待时间, ms
        jedisPoolConfig.setMaxWaitMillis(200);
        jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379,1000);
    }

    //获取 Jedis 对象
    public static Jedis getJedis() {

        Jedis jedis = null;
        if (jedisPool != null) {
            //从连接池中获取Jedis对象
            jedis = jedisPool.getResource();
        }
        return jedis;
    }
}

调用

注意:一定要嵌套在try里调用,这样会自动释放资源

try (Jedis jedis = JedisConnectionFactory.getJedis()) {

    //相关操作

} catch (Exception e) {

    log.error("redis加入失败!", e);
}

文章作者: Luan-bx
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Luan-bx !
  目录