来自于 SegmentFault 上的开发者 @二十一 总结的 Python 重点。由于总结了太多的东西,所以篇幅有点长,这也是作者"缝缝补补"总结了好久的东西,强烈建议收藏再慢慢看~
Py2 VS Py3
Py2 和 Py3 的差别
-
- print 成为了函数,python2 是关键字
- 不再有 unicode 对象,默认 str 就是 unicode
- python3 除号返回浮点数
- 没有了long类型
- xrange 不存在,range 替代了 xrange
- 可以使用中文定义函数名变量名
- 高级解包和*解包
- 限定关键字参数 *后的变量必须加入名字=值
- raise from
- iteritems 移除变成 items()
- yield from 链接子生成器
- asyncio,async/await 原生协程支持异步编程
- 新增 enum, mock, ipaddress, concurrent.futures, asyncio urllib, selector
- 不同枚举类间不能进行比较
- 同一枚举类间只能进行相等的比较
- 枚举类的使用(编号默认从1开始)
- 为了避免枚举类中相同枚举值的出现,可以使用@unique装饰枚举类
| |
| from enum import Enum |
| |
| class COLOR(Enum): |
| YELLOW=1 |
| |
| GREEN=1 |
| BLACK=3 |
| RED=4 |
| print(COLOR.GREEN) |
| for i in COLOR: |
| print(i) |
| |
| for i in COLOR.__members__.items(): |
| print(i) |
| |
| for i in COLOR.__members__: |
| print(i) |
| |
| |
| |
| |
| |
| a=1 |
| print(COLOR(a)) |
py2/3 转换工具
- six 模块:兼容 pyton2 和 pyton3 的模块
- 2to3 工具:改变代码语法版本
- __future__:使用下一版本的功能
类库相关
常用库
- 必须知道的 collections https://segmentfault.com/a/1190000017385799
- python排 序操作及 heapq 模块 https://segmentfault.com/a/1190000017383322
- itertools 模块超实用方法 https://segmentfault.com/a/1190000017416590
不常用但很重要的库
- dis(代码字节码分析)
- inspect(生成器状态)
- cProfile(性能分析)
- bisect(维护有序列表)
- fnmatch
- fnmatch(string,"*.txt") # win下不区分大小写
- fnmatch 根据系统决定
- fnmatchcase 完全区分大小写
- timeit(代码执行时间)
| def isLen(strString): |
| |
| return True if len(strString)>6 else False |
| |
| def isLen1(strString): |
| |
| return [False,True][len(strString)>6] |
| import timeit |
| print(timeit.timeit('isLen1("5fsdfsdfsaf")',setup="from __main__ import isLen1")) |
| |
| print(timeit.timeit('isLen("5fsdfsdfsaf")',setup="from __main__ import isLen")) |
- @contextlib.contextmanager 使生成器函数变成一个上下文管理器
- types(包含了标准解释器定义的所有类型的类型对象,可以将生成器函数修饰为异步模式)
| import types |
| types.coroutine |
| import html |
| html.escape("I'm Jim") |
| html.unescape('<h1>I'm Jim</h1>') # I'm Jim |
- mock(解决测试依赖)
- concurrent(创建进程池和线程池)
| from concurrent.futures import ThreadPoolExecutor |
| |
| pool = ThreadPoolExecutor() |
| task = pool.submit(函数名,(参数)) |
| task.done() |
| task.result() |
| task.cancel() |
| task.add_done_callback() |
| task.running() |
| |
| for data in pool.map(函数,参数列表): |
| print(返回任务完成得执行结果data) |
| |
| from concurrent.futures import as_completed |
| as_completed(任务列表) |
| |
| wait(任务列表,return_when=条件) |
- selector(封装select,用户多路复用io编程)
- asyncio
| future=asyncio.ensure_future(协程) 等于后面的方式 future=loop.create_task(协程) |
| future.add_done_callback()添加一个完成后的回调函数 |
| loop.run_until_complete(future) |
| future.result()查看写成返回结果 |
| |
| asyncio.wait()接受一个可迭代的协程对象 |
| asynicio.gather(*可迭代对象,*可迭代对象) 两者结果相同,但gather可以批量取消,gather对象.cancel() |
| |
| 一个线程中只有一个loop |
| |
| 在loop.stop时一定要loop.run_forever()否则会报错 |
| loop.run_forever()可以执行非协程 |
| 最后执行finally模块中 loop.close() |
| |
| asyncio.Task.all_tasks()拿到所有任务 然后依次迭代并使用任务.cancel()取消 |
| |
| 偏函数partial(函数,参数)把函数包装成另一个函数名 其参数必须放在定义函数的前面 |
| |
| loop.call_soon(函数,参数) |
| call_soon_threadsafe()线程安全 |
| loop.call_later(时间,函数,参数) |
| 在同一代码块中call_soon优先执行,然后多个later根据时间的升序进行执行 |
| |
| 如果非要运行有阻塞的代码 |
| 使用loop.run_in_executor(executor,函数,参数)包装成一个多线程,然后放入到一个task列表中,通过wait(task列表)来运行 |
| |
| 通过asyncio实现http |
| reader,writer=await asyncio.open_connection(host,port) |
| writer.writer()发送请求 |
| async for data in reader: |
| data=data.decode("utf-8") |
| list.append(data) |
| 然后list中存储的就是html |
| |
| as_completed(tasks)完成一个返回一个,返回的是一个可迭代对象 |
| |
| 协程锁 |
| async with Lock(): |
Python 进阶
- Manager(内置了好多数据结构,可以实现多进程间内存共享)
| from multiprocessing import Manager,Process |
| def add_data(p_dict, key, value): |
| p_dict[key] = value |
| |
| if __name__ == "__main__": |
| progress_dict = Manager().dict() |
| from queue import PriorityQueue |
| |
| first_progress = Process(target=add_data, args=(progress_dict, "bobby1", 22)) |
| second_progress = Process(target=add_data, args=(progress_dict, "bobby2", 23)) |
| |
| first_progress.start() |
| second_progress.start() |
| first_progress.join() |
| second_progress.join() |
| |
| print(progress_dict) |
| from multiprocessing import Pipe,Process |
| #pipe的性能高于queue |
| def producer(pipe): |
| pipe.send("bobby") |
| |
| def consumer(pipe): |
| print(pipe.recv()) |
| |
| if __name__ == "__main__": |
| recevie_pipe, send_pipe = Pipe() |
| #pipe只能适用于两个进程 |
| my_producer= Process(target=producer, args=(send_pipe, )) |
| my_consumer = Process(target=consumer, args=(recevie_pipe,)) |
| |
| my_producer.start() |
| my_consumer.start() |
| my_producer.join() |
| my_consumer.join() |
- Queue(不能用于进程池,进程池间通信需要使用Manager().Queue())
| from multiprocessing import Queue,Process |
| def producer(queue): |
| queue.put("a") |
| time.sleep(2) |
| |
| def consumer(queue): |
| time.sleep(2) |
| data = queue.get() |
| print(data) |
| |
| if __name__ == "__main__": |
| queue = Queue(10) |
| my_producer = Process(target=producer, args=(queue,)) |
| my_consumer = Process(target=consumer, args=(queue,)) |
| my_producer.start() |
| my_consumer.start() |
| my_producer.join() |
| my_consumer.join() |
| def producer(queue): |
| queue.put("a") |
| time.sleep(2) |
| |
| def consumer(queue): |
| time.sleep(2) |
| data = queue.get() |
| print(data) |
| |
| if __name__ == "__main__": |
| queue = Manager().Queue(10) |
| pool = Pool(2) |
| |
| pool.apply_async(producer, args=(queue,)) |
| pool.apply_async(consumer, args=(queue,)) |
| |
| pool.close() |
| pool.join() |
- argv 命令行参数list,第一个是程序本身的路径
- path 返回模块的搜索路径
- modules.keys() 返回已经导入的所有模块的列表
- exit(0) 退出程序
-
- a in s or b in s or c in s简写
- 采用any方式:all() 对于任何可迭代对象为空都会返回 True
| |
| True in [i in s for i in [a,b,c]] |
| |
| any(i in s for i in [a,b,c]) |
| |
| list(filter(lambda x:x in s,[a,b,c])) |
- {1,2}.issubset({1,2,3})#判断是否是其子集
- {1,2,3}.issuperset({1,2})
- {}.isdisjoint({})#判断两个set交集是否为空,是空集则为True
- [u4E00-u9FA5]匹配中文文字区间[一到龥]
| import sys |
| sys.getdefaultencoding() |
| class A(dict): |
| def __getattr__(self,value): |
| return 2 |
| def __getattribute__(self,item): |
| return item |
-
- 类变量是不会存入实例__dict__中的,只会存在于类的__dict__中
- globals/locals(可以变相操作代码)
- globals中保存了当前模块中所有的变量属性与值
- locals中保存了当前环境中的所有变量属性与值
- 本地作用域(Local)
- 当前作用域被嵌入的本地作用域(Enclosing locals)
- 全局/模块作用域(Global)
- 内置作用域(Built-in)
print([[x for x in range(1,101)][i:i+3] for i in range(0,100,3)])
- 即创建类的类,创建类的时候只需要将metaclass=元类,元类需要继承type而不是object,因为type就是元类
| type.__bases__ |
| object.__bases__ |
| type(object) |
| class Yuan(type): |
| def __new__(cls,name,base,attr,*args,**kwargs): |
| return type(name,base,attr,*args,**kwargs) |
| class MyClass(metaclass=Yuan): |
| pass |
- Python在使用传入参数的过程中不会默认判断参数类型,只要参数具备执行条件就可以执行
- 深拷贝拷贝内容,浅拷贝拷贝地址(增加引用计数)
- copy模块实现神拷贝
- 一般测试类继承模块unittest下的TestCase
- pytest模块快捷测试(方法以test_开头/测试文件以test_开头/测试类以Test开头,并且不能带有 init 方法)
- coverage统计测试覆盖率
| class MyTest(unittest.TestCase): |
| def tearDown(self): |
| print('本方法开始测试了') |
| |
| def setUp(self): |
| print('本方法测试结束') |
| |
| @classmethod |
| def tearDownClass(self): |
| print('开始测试') |
| @classmethod |
| def setUpClass(self): |
| print('结束测试') |
| |
| def test_a_run(self): |
| self.assertEqual(1, 1) |
-
- gil 会根据执行的字节码行数以及时间片释放 gil,gil 在遇到 io 的操作时候主动释放
- 什么是 monkey patch?
- 猴子补丁,在运行的时候替换掉会阻塞的语法修改为非阻塞的方法
- 运行时判断一个对象的类型的能力,id,type,isinstance
- 都不是,python是共享传参,默认参数在执行时只会执行一次
-
- try-except-else-finally中 else 和 finally 的区别
- else在不发生异常的时候执行,finally无论是否发生异常都会执行
- except一次可以捕获多个异常,但一般为了对不同异常进行不同处理,我们分次捕获处理
- 同一时间只能有一个线程执行,CPython(IPython)的特点,其他解释器不存在
- cpu 密集型:多进程+进程池
- io 密集型:多线程/协程
- 实现__next__和__iter__方法的对象就是迭代器
- 可迭代对象只需要实现__iter__方法
- 使用生成器表达式或者yield的生成器函数(生成器是一种特殊的迭代器)
- 比线程更轻量的多任务方式
- 实现方式
- yield
- async-awiat
- 为了支持快速查找使用了哈希表作为底层结构
- 哈希表平均查找时间复杂度为o(1)
- CPython 解释器使用二次探查解决哈希冲突问题
- 循环复制到新空间实现扩容
- 冲突解决:
- 链接法
- 二次探查(开放寻址法):python使用
| for gevent import monkey |
| monkey.patch_all() |
| co_flags = func.__code__.co_flags |
| |
| |
| if co_flags & 0x180: |
| return func |
| |
| |
| if co_flags & 0x20: |
| return func |
| |
| |
| |
| fib = lambda n: n if n 二分查找->hash->二叉查找树->平衡二叉树->多路查找树->多路平衡查找树(B-Tree)Mysql面试总结基础篇https://segmentfault.com/a/1190000018371218Mysql面试总结进阶篇https://segmentfault.com/a/1190000018380324深入浅出Mysqlhttp://ningning.today/2017/02/13/database/深入浅出mysql/清空整个表时,InnoDB是一行一行的删除,而MyISAM则会从新删除建表text/blob数据类型不能有默认值,查询时不存在大小写转换什么时候索引失效应尽量避免在 where 子句中使用 != 或 操作符,否则引擎将放弃使用索引而进行全表扫描尽量避免在 where 子句中使用 or 来连接条件,否则将导致引擎放弃使用索引而进行全表扫描,即使其中有条件带索引也不会使用,这也是为什么尽量少用 or 的原因如果列类型是字符串,那一定要在条件中将数据使用引号引用起来,否则不会使用索引应尽量避免在 where 子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描对于多列索引,不是使用的第一部分,则不会使用索引以%开头的like模糊查询出现隐式类型转换没有满足最左前缀原则失效场景:例如: |
| select id from t where substring(name,1,3) = 'abc' – name; |
| 以abc开头的,应改成: |
| select id from t where name like 'abc%' |
| 例如: |
| select id from t where datediff(day, createdate, '2005-11-30') = 0 – '2005-11-30'; |
| 应改为: |
| |
| 不要在 where 子句中的 “=” 左边进行函数、算术运算或其他表达式运算,否则系统将可能无法正确使用索引 |
| 应尽量避免在 where 子句中对字段进行表达式操作,这将导致引擎放弃使用索引而进行全表扫描 |
| 如: |
| select id from t where num/2 = 100 |
| 应改为: |
| select id from t where num = 100*2; |
| |
| 不适合键值较少的列(重复数据较多的列)比如:set enum列就不适合(枚举类型(enum)可以添加null,并且默认的值会自动过滤空格集合(set)和枚举类似,但只可以添加64个值) |
| 如果MySQL估计使用全表扫描要比使用索引快,则不使用索引 |
| |
| |
| |
| 什么是聚集索引 |
| |
| |
| |
| |
| B+Tree叶子节点保存的是数据还是指针 |
| MyISAM索引和数据分离,使用非聚集 |
| InnoDB数据文件就是索引文件,主键索引就是聚集索引 |
| |
| Redis 命令总结 |
| |
| |
| |
| 为什么这么快? |
| |
| |
| |
| |
| 因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了(毕竟采用多线程会有很多麻烦!)。 |
| 基于内存,由 C 语言编写 |
| 使用多路I/O复用模型,非阻塞 IO |
| 使用单线程减少线程间切换 |
| 数据结构简单 |
| 自己构建了 VM 机制,减少调用系统函数的时间 |
| |
| |
| |
| |
| 优势 |
| |
| |
| |
| |
| 性能高 – Redis 能读的速度是110000次/s,写的速度是81000次/s |
| 丰富的数据类型 |
| 原子 – Redis 的所有操作都是原子性的,同时 Redis 还支持对几个操作全并后的原子性执行 |
| 丰富的特性 – Redis 还支持 publish/subscribe(发布/订阅), 通知, key 过期等等特性 |
| |
| |
| |
| |
| 什么是 redis 事务? |
| |
| |
| |
| |
| 将多个请求打包,一次性、按序执行多个命令的机制 |
| 通过 multi,exec,watch 等命令实现事务功能 |
| Python redis-py pipeline=conn.pipeline(transaction=True) |
| |
| |
| |
| |
| 持久化方式 |
| |
| |
| |
| |
| save(同步,可以保证数据一致性) |
| bgsave(异步,shutdown时,无AOF则默认使用) |
| RDB(快照) |
| AOF(追加日志) |
| |
| |
| |
| |
| 怎么实现队列 |
| |
| |
| |
| |
| push |
| rpop |
| |
| |
| |
| |
| 常用的数据类型(Bitmaps,Hyperloglogs,范围查询等不常用) |
| |
| |
| |
| |
| skiplist(跳跃表) |
| intset或hashtable |
| ziplist(连续内存块,每个entry节点头部保存前后节点长度信息实现双向链表功能)或double linked list |
| 整数或sds(Simple Dynamic String) |
| String(字符串):计数器 |
| List(列表):用户的关注,粉丝列表 |
| Hash(哈希): |
| Set(集合):用户的关注者 |
| Zset(有序集合):实时信息排行榜 |
| |
| |
| |
| |
| 与 Memcached 区别 |
| |
| |
| |
| |
| Memcached只能存储字符串键 |
| Memcached用户只能通过APPEND的方式将数据添加到已有的字符串的末尾,并将这个字符串当做列表来使用。但是在删除这些元素的时候,Memcached采用的是通过黑名单的方式来隐藏列表里的元素,从而避免了对元素的读取、更新、删除等操作 |
| Redis和Memcached都是将数据存放在内存中,都是内存数据库。不过Memcached还可用于缓存其他东西,例如图片、视频等等 |
| 虚拟内存–Redis当物理内存用完时,可以将一些很久没用到的Value 交换到磁盘 |
| 存储数据安全–Memcached挂掉后,数据没了;Redis可以定期保存到磁盘(持久化) |
| 应用场景不一样:Redis出来作为NoSQL数据库使用外,还能用做消息队列、数据堆栈和数据缓存等;Memcached适合于缓存SQL语句、数据集、用户临时性数据、延迟查询数据和Session等 |
| |
| |
| |
| |
| Redis实现分布式锁 |
| |
| |
| |
| |
| 使用setnx实现加锁,可以同时通过expire添加超时时间 |
| 锁的value值可以是一个随机的uuid或者特定的命名 |
| 释放锁的时候,通过uuid判断是否是该锁,是则执行delete释放锁 |
| |
| |
| |
| |
| 常见问题 |
| |
| |
| |
| |
| 当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级 |
| 数据过期,进行更新缓存数据 |
| 初始化项目,将部分常用数据加入缓存 |
| 请求访问数据时,查询缓存中不存在,数据库中也不存在 |
| 短时间内缓存数据过期,大量请求访问数据库 |
| 缓存雪崩 |
| 缓存穿透 |
| 缓存预热 |
| 缓存更新 |
| 缓存降级 |
| |
| |
| |
| |
| 一致性Hash算法 |
| |
| |
| |
| |
| 使用集群的时候保证数据的一致性 |
| |
| |
| |
| |
| 基于redis实现一个分布式锁,要求一个超时的参数 |
| |
| |
| |
| |
| setnx |
| |
| |
| 虚拟内存 |
| 内存抖动 |
| |
| Linux |
| |
| |
| |
| Unix五种i/o模型 |
| |
| |
| |
| |
| select |
| poll |
| epoll |
| 并发不高,连接数很活跃的情况下 |
| 比select提高的并不多 |
| 适用于连接数量较多,但活动链接数少的情况 |
| 阻塞io |
| 非阻塞io |
| 多路复用io(Python下使用selectot实现io多路复用) |
| 信号驱动io |
| 异步io(Gevent/Asyncio实现异步) |
| |
| |
| |
| |
| 比 man 更好使用的命令手册 |
| |
| |
| |
| |
| tldr:一个有命令示例的手册 |
| |
| |
| |
| |
| kill -9和-15的区别 |
| |
| |
| |
| |
| -15:程序立刻停止/当程序释放相应资源后再停止/程序可能仍然继续运行 |
| -9:由于-15的不确定性,所以直接使用-9立即杀死进程 |
| |
| |
| |
| |
| 分页机制(逻辑地址和物理地址分离的内存分配管理方案): |
| |
| |
| |
| |
| 操作系统为了高效管理内存,减少碎片 |
| 程序的逻辑地址划分为固定大小的页 |
| 物理地址划分为同样大小的帧 |
| 通过页表对应逻辑地址和物理地址 |
| |
| |
| |
| |
| 分段机制 |
| |
| |
| |
| |
| 为了满足代码的一些逻辑需求 |
| 数据共享/数据保护/动态链接 |
| 每个段内部连续内存分配,段和段之间是离散分配的 |
| |
| |
| |
| |
| 查看 cpu 内存使用情况? |
| |
| |
| |
| |
| top |
| free 查看可用内存,排查内存泄漏问题 |
| |
| 设计模式 |
| 单例模式 |
| |
| def Single(cls,*args,**kwargs): |
| instances = {} |
| def get_instance (*args, **kwargs): |
| if cls not in instances: |
| instances[cls] = cls(*args, **kwargs) |
| return instances[cls] |
| return get_instance |
| @Single |
| class B: |
| pass |
| |
| class Single: |
| def __init__(self): |
| print("单例模式实现方式二。。。") |
| |
| single = Single() |
| del Single |
| |
| class Single: |
| def __new__(cls,*args,**kwargs): |
| if not hasattr(cls,'_instance'): |
| cls._instance = super().__new__(cls,*args,**kwargs) |
| return cls._instance |
| |
| 工厂模式 |
| class Dog: |
| def __init__(self): |
| print("Wang Wang Wang") |
| class Cat: |
| def __init__(self): |
| print("Miao Miao Miao") |
| |
| |
| def fac(animal): |
| if animal.lower() == "dog": |
| return Dog() |
| if animal.lower() == "cat": |
| return Cat() |
| print("对不起,必须是:dog,cat") |
| |
| 构造模式 |
| class Computer: |
| def __init__(self,serial_number): |
| self.serial_number = serial_number |
| self.memory = None |
| self.hadd = None |
| self.gpu = None |
| def __str__(self): |
| info = (f'Memory:{self.memoryGB}', |
| 'Hard Disk:{self.hadd}GB', |
| 'Graphics Card:{self.gpu}') |
| return ''.join(info) |
| class ComputerBuilder: |
| def __init__(self): |
| self.computer = Computer('Jim1996') |
| def configure_memory(self,amount): |
| self.computer.memory = amount |
| return self |
| def configure_hdd(self,amount): |
| pass |
| def configure_gpu(self,gpu_model): |
| pass |
| class HardwareEngineer: |
| def __init__(self): |
| self.builder = None |
| def construct_computer(self,memory,hdd,gpu) |
| self.builder = ComputerBuilder() |
| self.builder.configure_memory(memory).configure_hdd(hdd).configure_gpu(gpu) |
| @property |
| def computer(self): |
| return self.builder.computer |
| |
| 数据结构和算法 |
| python实现各种数据结构 |
| 快速排序 |
| def quick_sort(_list): |
| if len(_list) b: |
| if _list1[a] > _list2[b]: |
| sort.append(_list2[b]) |
| b += 1 |
| else: |
| sort.append(_list1[a]) |
| a += 1 |
| if len_a > a: |
| sort.append(_list1[a:]) |
| if len_b > b: |
| sort.append(_list2[b:]) |
| return sort |
| def merge_sort(_list): |
| if len(list1) |