社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

Redis缓存与Mysql如何保证双写一致

马哥Linux运维 • 5 月前 • 122 次点击  


前言

    缓存和数据库如何保证数据的一致是个很经典的问题,关于先更新缓存,还是先更新数据库,或者先删除缓存,还是先删除数据的先后问题,再读写并发的场景下很难做到数据一致,我认为比较好的两种方案:一种是我们经常说的延迟双删机制,但是这个延迟的时间是无法很准确的把握的,还有如果缓存删除失败了应该如何处理,总体来说还是不保险的;另外一种我认为是比较可行的方法,要引入阿里的canal,通过拉取binlog日志解析推送的MQ实现异步更新缓存,达到最终缓存和数据库的一致性;

延迟双删策略

基本流程就是客户端A请求,先去删除缓存,然后将数据写入数据库,此时客户端B查询先去查询缓存,缓存没有返回,去查数据库,此时还没有完成主从同步,拿到是从库的旧数据,然后将旧数据进行缓存,在客户端A完成主从同步后,再次删除缓存,这时数据才是一致的,但是重点就是在休眠的几秒钟,会造成数据的不一致性;

⚠️注意点:第二次删除缓存如果失败,那么缓存里面大概率还是旧数据;所以第二次缓存删除重试的方法比较关键:

  • 一种:失败记录写表,起定时任务去扫描表进行重试,显然这种方式并不会很好,会对数据库造成很大的压力;

  • 另外一种:异步处理,利用消息队列,将消息放在队列中,缓解数据库压力,但是要增加对消息队列的维护;


简单写个延迟双删的demo

@RestController@RequestMappingpublic class RedisController {

@Autowired private RedisTemplate redisTemplate; @Autowired private SysUserMapper sysUserMapper;

@GetMapping public void duobleCancle() throws InterruptedException {
redisTemplate.delete("1");
SysUser sysUser = sysUserMapper.selectUserById(Long.valueOf(1));
SysUser updateSysUser =new SysUser(); updateSysUser.setUserName("Lxlxxx"); updateSysUser.setEmail("@163.com");
UpdateWrapper updateWrapper = new UpdateWrapper(); updateWrapper.eq("userId",1);
sysUserMapper.update(updateSysUser,updateWrapper);
Thread.sleep(3000);
redisTemplate.opsForValue().append(sysUser.getUserId(), JSON.toJSONString(sysUser));

redisTemplate.delete("1"); }


由此可见问题还是比较多的,如果这么在项目中使用这种写法,那最终还是会读取到脏数据;

基于订阅binlog异步更新缓存

大致的流程是这样的:

具体binlog订阅实现

步骤:先安装canal、然后安装rabbitmq、然后就是mysql

Canal配置,因为canal支持 tcp, kafka, rocketMQ, rabbitMQ这四种异步的方式,这里我们使用 rabbitMQ,所以将serverMode配置成rabbitMQ



canal.ip = 1 canal.serverMode = rabbitmq canal.mq.servers = 127.0.0.1 canal.mq.vhost=canal  canal.mq.exchange=exchange.trade canal.mq.username=guest canal.mq.password=guest ---------------------------------------------------------------------------------    
canal.instance.dbUsername=rootcanal.instance.dbPassword=123456canal.instance.mysql.slaveId=1234 canal.instance.master.address=127.0.0.1:3306 canal.instance.defaultDatabaseName=test canal.mq.topic=example


mysql的my.cnf配置


log-bin=mysql-bin binlog-format=ROW server_id=1 


引入依赖,我分别引入的是redis、rabbitmq、mybatis-plus、fastsjon的包

        <dependency>            <groupId>org.springframework.bootgroupId>            <artifactId>spring-boot-starter-webartifactId>        dependency>
<dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-data-redisartifactId> dependency>
<dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-amqpartifactId> dependency>
<dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.3version> dependency> <dependency> <groupId>com.baomidougroupId> <artifactId>mybatis-plus-boot-starterartifactId> <version>${mybatis.plus.version}version> dependency>


application.yml配置文件

spring:  rabbitmq:    virtual-host: canal    host: 127.0.0.1    publisher-confirms: true    datasource:    url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC    username: root    password: root    driver-class-name: com.mysql.jdbc.Driver  redis:    host: 127.0.0.1


RabbitmqConfig配置

@Configurationpublic class RabbitMqConfig {
@Bean public Queue TestDirectQueue() { return new Queue("exchange.canal.queue",true); }
@Bean DirectExchange TestDirectExchange() { return new DirectExchange("exchange.canal"); }
@Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("example"); } }


RabbitMqListener监听消息异步处理 canal拉取的binlog日志

@Component@Slf4jpublic class RabbitMqListener {

@Autowired private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "exchange.canal.queue") public void process(Message message) {
log.info("canal queue消费的消息" + message.getBody()); Map map = JSON.parseObject(message.getBody(), Map.class); JSONArray array = null; String sqlType = (String) map.get("type"); if (StringUtils.endsWithIgnoreCase("SELECT", sqlType)) { array = JSONArray.parseArray((String) map.get("data")); } if (null == array) { return; } JSONObject jsonObject = array.getJSONObject(0); if (StringUtils.endsWithIgnoreCase("UPDATE", sqlType) || StringUtils.endsWithIgnoreCase("INSERT", sqlType)) { redisTemplate.boundValueOps(jsonObject.get("code").toString()).set(jsonObject.toString()); } else if (StringUtils.endsWithIgnoreCase("DELETE", sqlType)) { redisTemplate.delete(jsonObject.get("code").toString()); } if (StringUtils.endsWithIgnoreCase("SELECT", sqlType)) { redisTemplate.boundValueOps(jsonObject.get("code").toString()).set(jsonObject.toString()); } else { redisTemplate.delete(jsonObject.get("code").toString()); } }}


总结

在高并发的场景下缓存和数据库的一致性的问题,永远是个比较大的问题,在请求量很大的情况下,我们必须使用缓存来减少数据库的压力,但是我们需要对数据库进行频繁更新,其实基本保证不了瞬间的一致性,只能在最终保证一致性,通过消息异步的方式可以有效的控制缓存更新、删除的可靠性。


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/165098
 
122 次点击