聊聊 Python 中的同步原语,为什么有了 GIL 还需要同步原语

2024年 4月 3日 28.3k 0

前言

• 在前面的文章中我们介绍了 Python 中的全局解释器锁 GIL,我们知道 GIL 可以保证在多线程场景下同一时刻只有一个线程运行,但是并不能保证线程安全(所谓线程安全简单来说就是程序在多线程环境中运行时,线程在交替运行时能正常的访问共享资源,不会造成数据不一致或者死锁,最后都能达到预期的结果),比如我们看下面的两个例子:

对 counter 进行累加

import threading
import time

counter = 0
temp_count = 0


def increment():
    global counter, temp_count
    for _ in range(1000):
        counter += 1
        temp = temp_count
        time.sleep(0.0001)
        temp_count = temp + 1


start = time.time()
threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

end = time.time()

print("Final counter value:", counter)
print("Final temp_count value:", temp_count)
print(f"总共耗时:{end - start}")

# 运行结果
Final counter value: 10000
Final temp_count value: 1001
总共耗时:0.5465419292449951

• 上面我们对 counter 做多线程累积时,尽管 counter += 1 是非原子操作,但是由于 CPU 执行太快,因此我们很难复现线程不安全的情况,因此我们使用 temp_count 写法进行手动模拟。

账户取款

import threading


class BankAccount:
    def __init__(self, balance):
        self.balance = balance

    def withdraw(self, amount):
        if self.balance >= amount:
            # 发生线程切换
            self.balance -= amount
            print(f"Withdrawal successful. Balance: {self.balance}")
        else:
            print("Insufficient funds")

    def deposit(self, amount):
        self.balance += amount
        print(f"Deposit successful. Balance: {self.balance}")


if __name__ == "__main__":
    account = BankAccount(1000)

    # 创建多个线程进行取款存款操作
    threads = []
    for _ in range(5):
        t = threading.Thread(target=account.withdraw, args=(account, 200))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

• 上面的代码同样是线程不安全的,考虑这个场景,如果此时账户余额中剩余200,线程1执行完 self.balance >= amount 后切换到线程2,线程2正常取款200,然后切换回线程1,导致此时余额为-2200。

使用同步原语保证线程安全

• 从上面的两个案例中我们可以看出,GIL 并不能保证线程安全,我们需要使用同步原语来进行线程同步保证线程安全。

locked、release 显式获取锁和释放锁

• 在一些比较老的 python 代码中,我们可以看到很多使用 locked、release 显式获取锁和释放锁 的用法。

import threading


class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

    def withdraw(self, amount):
        self.lock.locked()
        if self.balance >= amount:
            self.balance -= amount
            print(f"Withdrawal successful. Balance: {self.balance}")
        else:
            print("Insufficient funds")
        self.lock.release()

    def deposit(self, amount):
        self.lock.locked()
        self.balance += amount
        print(f"Deposit successful. Balance: {self.balance}")
        self.lock.release()


if __name__ == "__main__":
    account = BankAccount(1000)

    # 创建多个线程进行取款存款操作
    threads = []
    for _ in range(5):
        t = threading.Thread(target=account.withdraw, args=(account, 200))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

使用 with 语句同步原理

• 相比于这种显式调用的方法,with 语句更加优雅,也更不容易出错,特别是程序员可能会忘记调用 release() 方法或者程序在获得锁之后产生异常这两种情况(使用 with 语句可以保证在这两种情况下仍能正确释放锁)。

import threading


class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                print(f"Withdrawal successful. Balance: {self.balance}")
            else:
                print("Insufficient funds")

    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f"Deposit successful. Balance: {self.balance}")


if __name__ == "__main__":
    account = BankAccount(1000)

    # 创建多个线程进行取款存款操作
    threads = []
    for _ in range(5):
        t = threading.Thread(target=account.withdraw, args=(account, 200))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

其它支持同步原语:RLock 和 Semaphore

RLock

• 一个 RLock (可重入锁)可以被同一个线程多次获取,主要用来实现基于监测对象模式的锁定和同步。在使用这种锁的情况下,当锁被持有时,只有一个线程可以使用完整的函数或者类中的方法。

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    _lock = threading.RLock()
    def __init__(self, initial_value = 0):
        self._value = initial_value

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with SharedCounter._lock:
            self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with SharedCounter._lock:
             self.incr(-delta)

• 在上边这个例子中,没有对每一个实例中的可变对象加锁,取而代之的是一个被所有实例共享的类级锁。这个锁用来同步类方法,具体来说就是,这个锁可以保证一次只有一个线程可以调用这个类方法。不过,与一个标准的锁不同的是,已经持有这个锁的方法在调用同样使用这个锁的方法时,无需再次获取锁。比如 decr 方法。 这种实现方式的一个特点是,无论这个类有多少个实例都只用一个锁。因此在需要大量使用计数器的情况下内存效率更高。不过这样做也有缺点,就是在程序中使用大量线程并频繁更新计数器时会有争用锁的问题。

Semaphore

• 信号量对象是一个建立在共享计数器基础上的同步原语。如果计数器不为0,with 语句将计数器减1,线程被允许执行。with 语句执行结束后,计数器加1。如果计数器为0,线程将被阻塞,直到其他线程结束将计数器加1。

import urllib.request
from threading import Semaphore

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)


def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url)

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论