一、问题背景
在系统开发中,Redis 常被用作数据缓存,以提升访问速度,因为它将数据存储在内存中,相比磁盘存储读取速度更快。然而,由于 Redis 仅在内存中维护缓存数据,而不是直接从数据库中读取,当数据库中的数据发生变化而缓存未能同步更新时,应用程序可能会从缓存中读取到过时的数据。这种情况会导致数据不一致,即应用程序展示的内容与实际数据库中的最新数据不符。
二、定时更新
import pymysql
import redis
import tornado.ioloop
import tornado.web
MYSQL_HOST = 'localhost'
MYSQL_USER = 'root'
MYSQL_PASSWORD = '123456'
MYSQL_DB = 'goods'
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = '123456'
class MainHandler(tornado.web.RequestHandler):
def initialize(self, mysql_conn, redis_conn):
self.mysql_conn = mysql_conn
self.redis_conn = redis_conn
async def get(self):
hash_name = 'mysite'
if self.redis_conn.exists(hash_name):
redis_data = self.redis_conn.hget(hash_name, 'java')
if redis_data:
redis_data = redis_data
self.write(redis_data)
print("命中缓存")
return
else:
print("缓存未命中")
else:
print("哈希表不存在")
try:
with self.mysql_conn.cursor() as cursor:
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
self.mysql_conn.begin()
cursor.execute("SELECT name FROM items WHERE version = 1")
mysql_data = cursor.fetchone()
self.mysql_conn.commit()
except pymysql.MySQLError as e:
self.write(f"MySQL 查询失败: {e}")
return
if mysql_data:
mysql_data = mysql_data['name']
else:
mysql_data = 'No data in MySQL'
self.write(mysql_data)
def make_app():
try:
mysql_conn = pymysql.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
db=MYSQL_DB,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print("MySQL 连接成功!")
except pymysql.err.OperationalError as e:
print(f"MySQL连接失败: {e}")
exit(1)
try:
redis_conn = redis.StrictRedis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True,
password=REDIS_PASSWORD
)
print("Redis 连接成功!")
except redis.exceptions.ConnectionError as e:
print(f"Redis连接失败: {e}")
exit(1)
return tornado.web.Application([
(r"/", MainHandler, dict(mysql_conn=mysql_conn, redis_conn=redis_conn)),
], debug=True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
这里使用Tornado写来一个简单的服务,通过GET请求获取数据,先检查Redis缓存,优先使用Redis缓存数据,这时候在通过另一个连接,修改数据库中的数据,这时候就会出现脏读的情况。
处理的方法之一就是定时更新,可以一定程度上避免脏读的情况。
async def update_cache(mysql_conn, redis_conn):
"""定期更新缓存"""
try:
with mysql_conn.cursor() as cursor:
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
self.mysql_conn.begin()
cursor.execute("SELECT name FROM items WHERE name = 'java'")
mysql_data = cursor.fetchone()
self.mysql_conn.commit()
except pymysql.MySQLError as e:
print(f"MySQL 查询失败: {e}")
return
if mysql_data:
name = mysql_data['name']
try:
redis_conn.hdel('mysite', 'java')
redis_conn.hset('mysite', 'java', name)
print("缓存更新成功")
except redis.exceptions.ConnectionError as e:
print(f"Redis 更新失败: {e}")
else:
print("缓存中没有可更新的数据")
定义一个更新Redis缓存的函数,通过查询MySQL中的数据来更新Redis中的数据。
def schedule_cache_update():
loop = tornado.ioloop.IOLoop.current()
loop.add_callback(lambda: asyncio.ensure_future(update_cache(mysql_conn, redis_conn)))
loop.call_later(3600, schedule_cache_update) # 每小时更新一次缓存
schedule_cache_update() # 启动缓存更新调度
定义一个异步函数,通过loop.call_later设置每小时调用一次更新Redis的函数。
三、主动更新
顾名思义,就是自己去主动更新,在修改数据之后,也修改Redis的数据,保证数据的一致性,当全部更新后提交更新操作。这样也可以避免数据的不一致性。
async def post(self):
name = self.get_argument('name')
try:
with self.mysql_conn.cursor() as cursor:
self.mysql_conn.start_transaction()
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
cursor.execute("UPDATE items SET name = %s WHERE version = 1", (name,))
self.mysql_conn.commit()
try:
self.redis_conn.hdel('mysite', 'java')
self.redis_conn.hset('mysite', 'java', name)
print("Redis缓存更新成功")
except redis.exceptions.ConnectionError as e:
print(f"Redis 更新失败: {e}")
except pymysql.MySQLError as e:
self.mysql_conn.rollback()
self.write(f"MySQL 更新失败: {e}")
return
self.write("更新成功")
四、缓存过期
给Redis缓存一个生命周期,也可以一定程度上避免数据不一致性的问题,这里将缓存的类型修改为String类型,便于设置过期时间。这里设置这个String类型的缓存生命周期为30分钟。
async def post(self):
item_name = self.get_argument('name')
item_value = self.get_argument('version')
try:
with self.mysql_conn.cursor() as cursor:
self.mysql_conn.start_transaction()
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
cursor.execute("INSERT INTO items (name, value) VALUES (%s, %s)", (item_name, item_value))
self.mysql_conn.commit()
try:
self.redis_conn.setex(item_name, 1800, item_value)
print("缓存成功")
except redis.exceptions.ConnectionError as e:
print(f"Redis缓存失败: {e}")
self.write("数据添加成功")
except pymysql.MySQLError as e:
self.mysql_conn.rollback()
self.write(f"MySQL 更新失败: {e}")
return
五、MySQL触发器
在MySQL中有触发器的机制,在执行某个操作时,可以触发这个触发器,但是MySQL的触发器无法直接与Redis直接进行交互。所以创建一个对应的日志表,在修改数据表时触发触发器,向日志表中进行记录,在服务端不断去监听这个日志表,根据日志表来更新Redis缓存。但是这种方法等于脱裤子放屁,多此一举。可以直接监听操作的表。当是既然有这种方法,可定有存在的道理,例如可以很好的记录数据表的操作。便于故障分析和数据恢复以及数据追踪。
-- 创建日志表
CREATE TABLE item_changes (
id INT AUTO_INCREMENT PRIMARY KEY,
item_name VARCHAR(255),
item_value VARCHAR(255),
change_type ENUM('INSERT', 'UPDATE', 'DELETE'),
change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
--创建触发器
CREATE TRIGGER after_item_update
AFTER UPDATE ON items
FOR EACH ROW
BEGIN
INSERT INTO item_changes (item_name, item_value, change_type)
VALUES (NEW.name, NEW.value, 'UPDATE');
END;
CREATE TRIGGER after_item_insert
AFTER INSERT ON items
FOR EACH ROW
BEGIN
INSERT INTO log_table (action_type, item_id, item_name)
VALUES ('INSERT', NEW.id, NEW.name);
END
这里通过监听日志表中的数据,达到更新Redis缓存的目的。
async def update_cache_from_log(mysql_conn, redis_conn):
while True:
try:
with mysql_conn.cursor() as cursor:
cursor.execute("SELECT * FROM item_changes ORDER BY change_time DESC LIMIT 10")
changes = cursor.fetchall()
for change in changes:
redis_conn.hdel("mysite",change['item_name'])
redis_conn.hset('mysite', change['item_name'], change['item_value'])
cursor.execute("DELETE FROM item_changes WHERE id IN (SELECT id FROM item_changes ORDER BY change_time DESC LIMIT 10)")
mysql_conn.commit()
except pymysql.MySQLError as e:
print(f"MySQL 查询失败: {e}")
await asyncio.sleep(60)
六、总结
在使用 Redis 缓存和 MySQL 数据库时,需要注意: