Redis与MySQL的数据一致性

2024年 7月 20日 102.7k 0

一、问题背景

在系统开发中,Redis 常被用作数据缓存,以提升访问速度,因为它将数据存储在内存中,相比磁盘存储读取速度更快。然而,由于 Redis 仅在内存中维护缓存数据,而不是直接从数据库中读取,当数据库中的数据发生变化而缓存未能同步更新时,应用程序可能会从缓存中读取到过时的数据。这种情况会导致数据不一致,即应用程序展示的内容与实际数据库中的最新数据不符。

二、定时更新

import pymysql
import redis
import tornado.ioloop
import tornado.web

MYSQL_HOST = 'localhost'
MYSQL_USER = 'root'
MYSQL_PASSWORD = '123456'
MYSQL_DB = 'goods'

REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = '123456'

class MainHandler(tornado.web.RequestHandler):
    def initialize(self, mysql_conn, redis_conn):
        self.mysql_conn = mysql_conn
        self.redis_conn = redis_conn
    
    async def get(self):
        hash_name = 'mysite'
        if self.redis_conn.exists(hash_name):
            redis_data = self.redis_conn.hget(hash_name, 'java')
            if redis_data:
                redis_data = redis_data
                self.write(redis_data)
                print("命中缓存")
                return
            else:
                print("缓存未命中")
        else:
            print("哈希表不存在")

        try:
            with self.mysql_conn.cursor() as cursor:
                cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")  
                self.mysql_conn.begin()
                cursor.execute("SELECT name FROM items WHERE version = 1")
                mysql_data = cursor.fetchone()
                self.mysql_conn.commit()
        except pymysql.MySQLError as e:
            self.write(f"MySQL 查询失败: {e}")
            return
        
        if mysql_data:
            mysql_data = mysql_data['name']
        else:
            mysql_data = 'No data in MySQL'

        self.write(mysql_data)

def make_app():
    try:
        mysql_conn = pymysql.connect(
            host=MYSQL_HOST,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            db=MYSQL_DB,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        print("MySQL 连接成功!")
    except pymysql.err.OperationalError as e:
        print(f"MySQL连接失败: {e}")
        exit(1)

    try:
        redis_conn = redis.StrictRedis(
            host=REDIS_HOST,
            port=REDIS_PORT,
            db=REDIS_DB,
            decode_responses=True,
            password=REDIS_PASSWORD
        )
        print("Redis 连接成功!")
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接失败: {e}")
        exit(1)

    return tornado.web.Application([
        (r"/", MainHandler, dict(mysql_conn=mysql_conn, redis_conn=redis_conn)),
    ], debug=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)    
    tornado.ioloop.IOLoop.current().start()

这里使用Tornado写来一个简单的服务,通过GET请求获取数据,先检查Redis缓存,优先使用Redis缓存数据,这时候在通过另一个连接,修改数据库中的数据,这时候就会出现脏读的情况。

处理的方法之一就是定时更新,可以一定程度上避免脏读的情况。

async def update_cache(mysql_conn, redis_conn):
    """定期更新缓存"""
    try:
        with mysql_conn.cursor() as cursor:
            cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")  
            self.mysql_conn.begin()        
            cursor.execute("SELECT name FROM items WHERE name = 'java'")
            mysql_data = cursor.fetchone()
            self.mysql_conn.commit() 
    except pymysql.MySQLError as e:
        print(f"MySQL 查询失败: {e}")
        return

if mysql_data:
    name = mysql_data['name']
    try:
        redis_conn.hdel('mysite', 'java')  
        redis_conn.hset('mysite', 'java', name)  
        print("缓存更新成功")
    except redis.exceptions.ConnectionError as e:
        print(f"Redis 更新失败: {e}")
else:
    print("缓存中没有可更新的数据")

定义一个更新Redis缓存的函数,通过查询MySQL中的数据来更新Redis中的数据。

 def schedule_cache_update():
     loop = tornado.ioloop.IOLoop.current()
     loop.add_callback(lambda: asyncio.ensure_future(update_cache(mysql_conn, redis_conn)))
     loop.call_later(3600, schedule_cache_update)  # 每小时更新一次缓存

 schedule_cache_update()  # 启动缓存更新调度    

定义一个异步函数,通过loop.call_later设置每小时调用一次更新Redis的函数。

三、主动更新

顾名思义,就是自己去主动更新,在修改数据之后,也修改Redis的数据,保证数据的一致性,当全部更新后提交更新操作。这样也可以避免数据的不一致性。

    async def post(self):
        name = self.get_argument('name')
        try:
            with self.mysql_conn.cursor() as cursor:
                self.mysql_conn.start_transaction()
                cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
                cursor.execute("UPDATE items SET name = %s WHERE version = 1", (name,))
                self.mysql_conn.commit()
            try:
                self.redis_conn.hdel('mysite', 'java')
                self.redis_conn.hset('mysite', 'java', name)
                print("Redis缓存更新成功")
            except redis.exceptions.ConnectionError as e:
                print(f"Redis 更新失败: {e}")
        except pymysql.MySQLError as e:
            self.mysql_conn.rollback()
            self.write(f"MySQL 更新失败: {e}")
            return
        self.write("更新成功")

四、缓存过期

给Redis缓存一个生命周期,也可以一定程度上避免数据不一致性的问题,这里将缓存的类型修改为String类型,便于设置过期时间。这里设置这个String类型的缓存生命周期为30分钟。

async def post(self):
    item_name = self.get_argument('name')
    item_value = self.get_argument('version')
    
    try:
        with self.mysql_conn.cursor() as cursor:
            self.mysql_conn.start_transaction()
            cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")            
            cursor.execute("INSERT INTO items (name, value) VALUES (%s, %s)", (item_name, item_value))
            self.mysql_conn.commit()

        try:
            self.redis_conn.setex(item_name, 1800, item_value) 
            print("缓存成功")
        except redis.exceptions.ConnectionError as e:
            print(f"Redis缓存失败: {e}")
        self.write("数据添加成功")
    except pymysql.MySQLError as e:
        self.mysql_conn.rollback()
        self.write(f"MySQL 更新失败: {e}")
        return

五、MySQL触发器

在MySQL中有触发器的机制,在执行某个操作时,可以触发这个触发器,但是MySQL的触发器无法直接与Redis直接进行交互。所以创建一个对应的日志表,在修改数据表时触发触发器,向日志表中进行记录,在服务端不断去监听这个日志表,根据日志表来更新Redis缓存。但是这种方法等于脱裤子放屁,多此一举。可以直接监听操作的表。当是既然有这种方法,可定有存在的道理,例如可以很好的记录数据表的操作。便于故障分析和数据恢复以及数据追踪。

-- 创建日志表
CREATE TABLE item_changes (
    id INT AUTO_INCREMENT PRIMARY KEY,
    item_name VARCHAR(255),
    item_value VARCHAR(255),
    change_type ENUM('INSERT', 'UPDATE', 'DELETE'),
    change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

--创建触发器
CREATE TRIGGER after_item_update
AFTER UPDATE ON items
FOR EACH ROW
BEGIN
    INSERT INTO item_changes (item_name, item_value, change_type)     
    VALUES (NEW.name, NEW.value, 'UPDATE');
END;

CREATE TRIGGER after_item_insert
AFTER INSERT ON items
FOR EACH ROW
BEGIN
    INSERT INTO log_table (action_type, item_id, item_name)
    VALUES ('INSERT', NEW.id, NEW.name);
END 

这里通过监听日志表中的数据,达到更新Redis缓存的目的。

async def update_cache_from_log(mysql_conn, redis_conn):
    while True:
        try:
            with mysql_conn.cursor() as cursor:
                cursor.execute("SELECT * FROM item_changes ORDER BY change_time DESC LIMIT 10")
                changes = cursor.fetchall()
                for change in changes:
                    redis_conn.hdel("mysite",change['item_name'])
                    redis_conn.hset('mysite', change['item_name'], change['item_value'])
                    
                cursor.execute("DELETE FROM item_changes WHERE id IN (SELECT id FROM item_changes ORDER BY change_time DESC LIMIT 10)")
                mysql_conn.commit()
        except pymysql.MySQLError as e:
            print(f"MySQL 查询失败: {e}")
        await asyncio.sleep(60)  

六、总结

在使用 Redis 缓存和 MySQL 数据库时,需要注意:

  • 优先删除缓存而非更新,这样可以简化操作并减少出错的概率。
  • 先更新数据库再删除缓存,这样能确保数据库中的数据是最新的,并且缓存会被及时更新。
  • 要注意事务的使用,以防止数据库操作不一致。
  • 可以利用分布式锁等技术来避免多个操作同时修改数据,确保系统的可靠性和一致性。
  • 相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论