业务逻辑
监听redis失效的key,在key过期的时刻,做一些列操作。
修改配置
将两个文件的notify-keyspace-events " "
改为 notify-keyspace-events “Ex”
重启redis-server
测试
两种方案
- 在client终端测试
- 服务器调用
client终端
打开两个client终端
第一个输入PSUBSCRIBE __keyevent@*__:expired
结果如下:
第二个创建key的时效,例如输入setex lbx 10 true
十秒后,key为lbx的过期,并且在第一个client终端看见如下信息:
证明已经监测到失效的key,成功!
服务器调用
创建消息监听容器
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class MyRedisConfig {
/**
* Redis 消息监听器容器.
*
* @param redisConnectionFactory the redis connection factory
* @return the redis message listener container
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
/**
* Redis Key失效监听器注册为Bean.
*
* @param redisMessageListenerContainer the redis message listener container
* @return the redis event message listener
*/
@Bean
public RedisEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer){
return new RedisEventMessageListener(redisMessageListenerContainer);
}
}
重写doHandleMessage
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.time.LocalDateTime;
public class RedisEventMessageListener extends KeyExpirationEventMessageListener {
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
protected void doHandleMessage(Message message) {
// 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
// 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
String key = message.toString();
System.out.println("key = " + key);
System.out.println("end = " + LocalDateTime.now());
}
}
自定义错误处理类
自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.
自定义错误处理类:
@Component
public class RedisListenerErrorHandle implements ErrorHandler {
@Override
public void handleError(Throwable throwable) {
System.out.println("正常监听");
}
}
注入错误处理类:
@Autowired
RedisListenerErrorHandle redisListenerErrorHandle;
/**
* Redis 消息监听器容器.
*
* @param redisConnectionFactory the redis connection factory
* @return the redis message listener container
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.setErrorHandler(redisListenerErrorHandle);
return redisMessageListenerContainer;
}
key加入redis
//自启动
@Order(3)
public class GradedAlarmController implements ApplicationRunner {
Jedis jedis = new Jedis("127.0.0.1", 6379);
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> allDevice = getDeviceId();
for(String id :allDevice){
jedis.setex(id, 10, "true");
System.out.println(id + "已加入了redis");
//每个延时半秒,防止一次性加入太多炸了
Thread.currentThread().sleep(500);
}
}
}
加入对象
如果加入的value是对象,需要序列化
Entity实体类继承序列化接口,否则实例化结果为null
加入jedis时,序列化
//加入时
jedis.set((alarmEntity.getMonitorValue()).getBytes(), SerializeUtil.serialize(alarmEntity));
//取value时
AlarmEntity alarmEntity = (AlarmEntity) SerializeUtil.deserialize(jedis.get((key).getBytes()));
//取之前最好先判断是否存在
if (jedis.exists( key ))
线程池
自定义线程池类
public class JedisConnectionFactory {
private final static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(100);//最大空闲连接
jedisPoolConfig.setMaxIdle(100);//最小空闲连接
jedisPoolConfig.setMinIdle(0);//设置最长等待时间, ms
jedisPoolConfig.setMaxWaitMillis(200);
jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379,1000);
}
//获取 Jedis 对象
public static Jedis getJedis() {
Jedis jedis = null;
if (jedisPool != null) {
//从连接池中获取Jedis对象
jedis = jedisPool.getResource();
}
return jedis;
}
}
调用
注意:一定要嵌套在try里调用,这样会自动释放资源
try (Jedis jedis = JedisConnectionFactory.getJedis()) {
//相关操作
} catch (Exception e) {
log.error("redis加入失败!", e);
}