1. 一个小需求
经常遇到一些小的需求,但是实现起来并不简单。这里就有一个文件上传的简单需求,分为下面几个步骤:
用户在页面上传一个大文件
大文件会被暂存在内网的 Ceph
后台任务,将 Ceph 中的大文件,下载到 Docker 内
后台任务,将 Docker 中的大文件,上传到外网的 COS
后台使用的是 Django,采用 Docker 多实例部署。多实例方便扩容,提高服务的并发能力,但是要求实例无状态,有状态的部分需要存储在第三方服务,Ceph 就是其中之一。直接将文件从本地上传到 COS ,会导致正在上传的文件因为发布而被丢失。
2. 一个大文件引发的 Bug
在测试的过程中,发现小文件可以正常上传;但是上传大于 300MB 的文件时,总是失败。日志如下:
1
2
3
4
5
|
[2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,114: ERROR/MainProcess] Process 'Worker-20' pid:45 exited with 'signal 9 (SIGKILL)' [time_stamp=2018-10-23 10:11:18,114, worker=MainProcess, levelname=ERROR]
[2018-10-23 10:11:18] celery: [2018-10-23 10:11:18,206: ERROR/MainProcess] Pool callback raised exception: OperationalError(2006, "MySQL server has gone away (error(32, 'Broken pipe'))") [time_stamp=2018-10-23 10:11:18,206, worker=MainProcess, levelname=ERROR]
[2018-10-23 10:11:18] celery: self._execute_command(COMMAND.COM_QUERY, sql)
[2018-10-23 10:11:18] celery: File "/app/.heroku/python/lib/python2.7/site-packages/pymysql/connections.py", line 970, in _execute_command
[2018-10-23 10:11:18] celery: Traceback (most recent call last):
|
使用的包版本:
1
2
3
|
Django==1.8.3
celery==3.1.18
django-celery==3.1.16
|
日志中有两个错误,一个是进程被杀掉,一个是数据库失去连接。最开始将问题定位在 MySQL server has gone away
,但一直没有解决问题。最后,在 Grafana 中查看 Celery Worker 的内存使用率时,发现每次上传大文件,内存使用率就会急剧增加,然后又急剧下降。原来是,内存使用超了,进程被强制 kill。最后,通过优化内存使用,解决了问题。
3. 一段优化代码
优化之前:
1
2
3
4
5
|
r = requests.get(self.local_file.url,
allow_redirects=True,
stream=True,
timeout=300)
open(self.local_path, 'wb+').write(r.content)
|
优化之后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
if not os.path.exists(os.path.dirname(self.local_path)):
try:
os.makedirs(os.path.dirname(self.local_path))
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
r = requests.get(self.local_file.url,
allow_redirects=True,
stream=True,
timeout=300)
with open(self.local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 512):
if chunk:
f.write(chunk)
|
优化之前,Celery 将整个文件放在内存,内存使用率暴增。优化之后,先请求响应头,通过迭代器一点一点读取响应体中的文件内容。这大大节省了内存使用。
4. 一个数据库连接异常问题
找到问题,解决之后,并没有结束。这里有一个奇怪的问题,为什么会出现 MySQL server has gone away
。同事之前也遇到一个类似的问题,Celery 多进程任务抛出各种数据库异常。原因分析如下:
Celery Worker 在启动时,djcelery 进行了 DB 操作,数据库连接被初始化。
子进程被 fork 出来后,由于完全复制了父进程的内存数据,导致所有 Worker 共享了同一个 MySQL 连接(同一个 socket file)。由于 persistent connections 特性,数据库连接没有被关闭。这个是 djcelery 库与多线程部署的坑。
解决方案是:
不关闭 persistent connections 功能,监听子进程初始化完成和任务开始的信号。在收到信号时,手动强制关闭当前进程中 Django ORM 的连接。
相关实现的代码如下:
1
2
3
4
5
6
7
8
9
10
11
|
from django.db import connections
@signals.task_prerun.connect
def task_prerun(**kwargs):
for conn in connections.all():
conn.close()
@signals.worker_process_init.connect
def worker_init(**kwargs):
for conn in connections.all():
conn.close()
|
实际上,这里的 signal 9 (SIGKILL)
和 MySQL server has gone away
并不是同一个 Celery Worker 抛出。由于继承自同一个父进程和连接池,当其中一个子进程被 kill 之后,另外一个正在处理任务的进程也会出问题。
5. 一段相关的 Django 代码
在高并发情况下,频繁新建/关闭数据库连接是低效的。Django 的 persistent connections(长连接) 就是为了解决这个问题。Django 数据库长连接的原理是,在每次创建数据库连接之后,把连接实例放到一个 Theard.local 的实例中维护。每次进行数据库请求时,Django 会去 local 中查找可用的连接,有则复用。当连接,发生异常或者存在时间超过 CONN_MAX_AGE 时,才会被关闭。CONN_MAX_AGE
参数,在 settings.py 文件中可以配置:
1
2
3
4
5
6
7
8
9
10
11
|
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb', # 数据库名称
'USER': 'root', # 数据库用户名
'PASSWORD': '', # 数据库密码
'HOST': 'localhost', # 数据库主机,默认为 localhost
'PORT': '3306', # 数据库端口
'CONN_MAX_AGE': 60, # 0 表示使用完马上关闭,None 表示不关闭
}
}
|
再来看看,Django 中如何管理长连接:django/db/__init_.py
1
2
3
|
def close_old_connections(**kwargs):
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
|
django/db/backends/base/base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def close_if_unusable_or_obsolete(self):
if self.connection is not None:
# If the application didn't restore the original autocommit setting,
# don't take chances, drop the connection.
if self.get_autocommit() != self.settings_dict['AUTOCOMMIT']:
self.close()
return
# If an exception other than DataError or IntegrityError occurred
# since the last commit / rollback, check if the connection works.
if self.errors_occurred:
if self.is_usable():
self.errors_occurred = False
else:
self.close()
return
if self.close_at is not None and time.time() >= self.close_at:
self.close()
return
|
django/db/backends/mysql/base.py
1
2
3
4
5
6
7
|
def is_usable(self):
try:
self.connection.ping()
except Database.Error:
return False
else:
return True
|
6. 参考
- https://github.com/celery/django-celery/issues/359