高性能python-选择合适的数据结构

这里是对 «Python高性能分析» 的文章解析。
将会引用来自书中的代码, 添加一些个人的理解备注, 作为学习笔记使用。

引用代码

这个章节综合了第三章 «列表与元组» 和第四章 «字典与集合» 、第五章 «生成器与迭代器» 三个章节,说明在性能优化方面选择合适的数据结构有多么重要,以及他们之所以拖慢代码运行效率的原因。

数组

首先我们都了解,一个数组是数据在某种内在次序下的扁平列表。如果知道数据在数组中的位置,那我们就可以以 O(1) 的复杂度得到它。
这里边,需要值得注意的是,数组可以有多种实现方式,其内存结构的不同,也决定了它们在查找数据时的效率不同。

注意:在下文中提到的数组都指 list

在python中,列表是动态的数组,而元组则是静态的数组。这就有点像c++中的 vector。
它是一块连续内存, 存放的是一个整形大小的指向实际数据的指针。
区别在于动态的数组会比实际分配的空间占据更多的内存(这一部分内存用于记录它们自身状态的信息以便进行高效的resize)。

当我们知道数据在数组中的下标之后我们就可以通过 数组的指针(即数组第一个元素的指针)+ 下标 * 指针大小 从而以 O(1) 的复杂度定位到数据所在。

数组的搜索

在实际使用中,搜索某个元素在数组中的位置是最常见的问题。在不知道元素在数组中的位置的情况下,我们需要对数组进行遍历。这个算法的复杂度为 O(n) (即最差情况下,我们要对长度为n的数据遍历n次)。

唯一提升速度的方法是了解数据如何存放在内存中,或者说了解存放我们数据的桶的组织方式。

比如散列表,这可以通过另一种数据的组织方式实现 O(1) 的复杂度,下文讲述。
再比如说,对数据进行排序之后再搜索。

高效搜索必需的两大要素是排序算法和搜索算法。

数据的排序

如果我们先对我们的数据进行排序,使得所有位于某个元素左边的其他元素都小于(或大于)它,那么就可以获得更好的搜索性能。
在python中,我们可以通过定义对象的 __eq____lt__ 方法来实现对象的比较,如果没有实现,则对象之间就只能在同类型,并且比较其内存地址。

值得一提的是,python列表使用了一个内建的排序算法使用了Tim排序。
Tim排序可以在最佳情况下以 O(n)(最差情况下则是 O(n log n))的复杂度排序。 它运用了多种排序算法。对于给定的数据,它使用探索法猜测哪个算法的性能最优(更确切地说,它混用了插入排序和合并排序算法)来达到这样的性能。

在我们后续的排序算法中,我们也可以通过这种策略,针对数据的不同混合多种排序,最常见的条件就是根据数据的长度。

更有效的搜索

在对数据排序之后。我们就可以使用二分搜索找到目标值。这个算法的复杂度是 O(log n) 。 它首先查询位于列表中点的值并和目标值比较。如果中点值小于目标值,我们就继续考察右半边列表,我们不断将列表二分,直至找到目标值或发现该值不存在于列表。结果就是我们不需要像线性搜索那样读取列表中所有的元素,而仅仅读取了一个子集。

为什么不使用字典以获取更高的效率。这时因为尽管字典的查询复杂度为 O(1), 但是在数组转为字典的过程中,其复杂度为 o(n), 所以在一个长度较长的数组中,这样的转换反而不如正常的二分搜索。当然如果字典的结果会被多次使用的话,那又值得商榷。

这里值得一提的是,在python中,我们可以在数据插入的过程中简化 排序-搜索 这一流程。 利用bisect模块可以在保持排序的同时往列表中添加元素,通过通过一个高度优化过的二分搜索算法函数来查找元素。

In [1]:
import bisect
import random

def find_closest(haystack, needle):
    ''' 找到离needle最近的一个元素下标 '''
    # bisect.bisect_left 将会返回 比 needle 小的第一个元素的下标
    i = bisect.bisect_left(haystack, needle)
    if i == len(haystack):
        return i - 1
    elif haystack[i] == needle:
        return i
    elif i > 0:
        j = i - 1
        if haystack[i] - needle > needle - haystack[j]:
            return j
    return i

important_numbers = []
for i in range(10):
    new_number = random.randint(0, 1000)
    bisect.insort(important_numbers, new_number)
In [2]:
# 将是一个排序之后的数组
important_numbers
Out[2]:
[95, 329, 353, 499, 576, 607, 637, 772, 955, 990]
In [3]:
closest_index = find_closest(important_numbers, 1100)
"Closest value to 1100: {}".format(important_numbers[closest_index])
Out[3]:
'Closest value to 1100: 990'

数组的内存分配

动态数组支持resize操作,可以增加数组的容量。为了避免在每次添加元素的时候进行内存复制(当数据在达到容量时,会创建一个新的更大容量的列表,并将旧列表的数据复制到新列表中,旧列表销毁),在达到容量上限再次添加时,会按一定公式实际分配更多的内存空间,通过预留空间的做法,我们就可以减少这一分配空间的操作的次数以及内存复制的次数。

元祖

既然元祖和数组都是一段连续的内存结构,那么它们之间有什么不同。

其实在上文已经提到:

  • 列表是动态数组,它们可变且可以重设长度(改变其内部元素的个数)
  • 元组是静态数组,它们不可变,且其内部数据一旦创建便无法改变。

这里额外提一点,元组缓存于Python运行时环境,这意味着我们每次使用元组时无须访问内核去分配内存,这就是为什么元祖要比数组效率更好的原因。
同样的,尽管数组和元祖都支持存放不同的数据类型,但是这会带来额外的开销并减少一些可能的优化。

元祖的内存分配

元祖不支持改变大小,所以一个元祖不会保留额外空间,和同样长度的列表相比,元祖占据更少的内存。
同样的。元祖支持合并操作,但是和数组的append不同,这个操作会遍历元祖进行复制操作,任意两个元组相加始终返回一个新分配的元组。

元祖的效率

另外一个元祖比数组效率更高的地方在于,对于长度为1-20的元祖而言,即使它们已经被销毁,但是其内存也不会立即返回给操作系统,而是留待未来使用。
这意味着当未来需要一个同样大小的新元组时,我们不再需要向操作系统申请一块内存来存放数据,因为我们已经有了预留的内存。

字典

如果你有一些无序数据但它们可以被唯一的索引对象来引用(任何可以被散列的类型都可以成为索引对象,索引对象通常会是一个字符串),那么字典就是理想的数据结构。

但是,由于字典的特殊结构,它们通常会占用更多的内存;并且,虽然插入/查询操作的复杂度为 O(1), 但实际的速度极大取决于其使用的散列函数。如果散列函数的运行速度较慢,那么在字典上进行的任何操作也会相应变慢。在下文我们将看到这一点。

字典的散列实现

我们都知道,字典是按一定散列方式,将一个任意的键(一个可hash对象)转变为一个列表的索引,这样我们就可以通过计算出键的hash值来找到我们的元素,实现 O(1) 的复杂度。

对于对象,我们可以通过自己实现 __hash__ 函数,使得对象成为一个可hash对象(hashable)

字典的插入

对于散列表的插入,我们必须首先弄清楚以下两点:

  • 数据在这个连续内存块中的位置
  • 该值如何与其他对象进行比较

当我们插入数据时,首先需要计算键的散列值并掩码来得到一个有效的数组索引。掩码是为了保证一个可能是任意数字的散列值最终能落入分配的空间。

其实这里书中说的有点复杂,简单的说就是,在长度为N的空间中,保证散列值能够落在小于N的空间。可以通过 散列值 & bin(N-1) 来计算最终落入的下标,这个实际上和 模取N取余数 时一个道理的。

如果两个值相等,说明该键值对已经存在在该字典中,无所再次插入。如果不等。则可以通过一个简单的线性函数计算出一个新的索引(这一方法称之为嗅探)。

这里展示了一个python2.7 中使用的基本的嗅探的伪代码。

def index_sequence(key, mask=0b111, PERTURB_SHIFT=5):
    perturb = hash(key) 
    i = perturb & mask
    yield i
    while True:
        i = ((i << 2) + i + perturb + 1)
        perturb >>= PERTURB_SHIFT
        yield i & mask

重要的是记住线性嗅探仅使用了散列值的最后几个字节而没有考虑其余字节(比如,对于一个8元素字典,我们只查看了最后3个比特,因为当前的掩码是0x111)。这意味着如果两个键的散列值最后3个比特相等,那么不仅产生了一个碰撞,同时其后的嗅探索引序列也都是一样的。Python使用的扰动方案会考虑散列值中更多的比特位来解决这一问题。

以上是一段书中的原文,我的理解是,对于一个长度为8的字典(或小于8),我们只需比较 bin(8-1) = 0x111 即最后3位的掩码,就可以计算出实际元素落在字典空间的位置。反过来说,如果两个键之间的散列值的最后3位相等,那么在第二次查找中将会产生一个散列碰撞,由于其之后的嗅探序列(按一定规则进行计算新索引)也是一样的,所以可以通过计算一个新的复合嗅探序列的新索引,存放第二个元素。

以下是一段执行代码

class City(str):
    def __hash__(self):
        return ord(self[0])

# We create a dictionary where we assign arbitrary values to cities
data = {
    City("Rome"): 4,
    City("San Francisco"): 3,
    City("New York"): 5,
    City("Barcelona"): 2,
}

该hash返回字符串的首字符的ascii码,对于一个长度为4的字典, 则其掩码为 bin(8-1) = 0b111。

对于一个长度为4的字典,其实际占据空间为8,这时因为字典和数组一样预留一定的空间。对于一个字典的性能的较优解来说,实际数据的容量应占据实际空间的3/5左右,并使用大于实际空间最接近的一个容量大小 8, 32, 158, 512, 2048 等。当然你也可以理解为字典的最小空间为8 :)

对于 Rome 其在字典空间中的下标为 2 ord('R') & 0b111 = 82 & 0b111;
San Francisco 其在字典空间的下标为 3 ord('S') & 0b111 = 83 & 0b111;
New York 其在字典空间的下标为 6 ord('N') & 0b111 = 78 & 0b111;
但对于 Barcelona 来说。其下标 2 ord('B') & 0b111 = 66 & 0b111 与 Rome 的下标相同,所以 Barcelona 的插入需要经过再次计算。
通过上文的嗅探代码计算新的下标 5 ((i << 2) + i + perturb + 1) & 0b111 = ((2 << 2) + 2 + 66 + 1) & 0b111 = 77 & 0b111

字典的大小

和数组类似,字典在达到一定大小之后,将会增长。为了做到这一点,需要分配一个更大的表(也就是在内存中预留更多的桶),将掩码调整为适合新的表,旧表中的所有元素被重新插入新表。这需要重新计算索引,因为改变后的掩码会改变索引计算结果。结果就是,改大散列表的代价非常之大。

研究显示一个不超过三分之二满的表在具有最佳空间节约的同时依然具有不错的散列碰撞避免率。

字典或集合默认的最小长度是8(也就是说,即使你只保存3个值,Python仍然会分配8个元素)。每次改变大小时,容器的长度增加到原来的4倍,直至达到50000个元素,之后每次增加到原来的2倍。这导致了下面可能的大小:

8, 32, 128, 512, 2048, 8192, 32768, 131072, 262144, ...

值得注意的是当一个散列表变大或变小时都可能发生改变大小。也就是说,如果散列表中足够多的元素被删除,表可能会被改小。但是,改变大小仅发生在插入时。

字典的获取与删除

通过计算出字典的下标之后,需要比较两个值之间是否相等,如果相等则计算下一个嗅探序列的下标,获取其值,直到其值为 NULL 作为终止嗅探序列的标志。我们就可以认为字典中不存在该数据。
同样的,当一个值从散列表中被删除时,我们需要一个特殊的值来表示该空间虽空,但其后可能还有别的因散列碰撞而插入的值。

这里我的理解是,实际在字典中,用 NULL 表示一个嗅探序列的中止,表示从这之后的嗅探序列都没有值。而当一个值从散列表中删除,其实应该是类似 dict[key] = None 这样的效果,此时,该键仍存在但其值为空,嗅探序列仍能够往下执行,直至遇到 NULL 为止 。如果是 del dict[key] 的话,则应该该段空间都被置为 NULL。

散列函数

从上文中可以知道,选择一个合适的散列函数有多么重要,这关系到删除与查询的性能。
散列函数需要让散列值均匀分布以避免散列碰撞。碰撞太频繁会降低散列表的性能:如果大多数键都有碰撞,那么我们需要经常“嗅探”其他索引值,有可能需要在字典中访问很大一片区域来寻找需要的键。最坏情况下,字典中所有的键都碰撞,字典查询的性能变成O(n),就跟搜索一个列表一样。

理论上来说,当所有的散列值都具有同样的被选中概率时熵最大。一个令熵最大的散列函数被称为完美散列函数,因为它保证了最低的碰撞次数。 对于无限大的字典来说,其常数每个值作为散列值都不相等,并且其被选中的概率也都一样。

但对于有限长度字典来说,存在碰撞将不可避免。不过我们可以通过一些手段来优化。

例如对于一个字典,我们已知会放入5000个元素,则我们可以通过预先分配 32768 的空间的字典,以避免字典的复制和索引重新计算。

这时因为为了找到大小为N的字典的掩码,我们首先找到能令该字典保持三分之二满的最低桶数(N 5 / 3)。 即 5000 5 / 3 = 8333。 然后找到能满足这个数字的最小字典大小,并找到足以保存这一数字的bit的位数。即我们需要 32768 长度的空间,其掩码为 bin(32768 - 1) = 0b111111111111111

所以。在构建自定义类的时候,散列函数对键的使用方式将极大地影响数据结构的最终性能。了解这一块内容的实现方便我们实现更好的 __hash__ 函数。

集合

集合有一个重要的特性,就是其键的唯一性,你可以把它当作时一个特殊的字典,只不过它丢弃了所有的值。

如果你尝试添加一个已有的项,该项不会被添加进集合。另外,这一操作的代价是O(1)。

python中的命名空间

当Python访问一个变量、函数或模块时,都有一个体系来决定它去哪里查找这些对象。首先,Python查找locals()数组,其内保存了所有本地变量的条目。Python花了很多精力优化本地变量查询的速度,而这也是整条链上唯一一个不需要字典查询的部分。如果它不在本地变量里,那么会搜索globals()字典。最后,如果对象也不在那里,则搜索builtin对象。

以下代码展示了三种不同的方法:

In [4]:
import math
from math import sin

def test1(x):
    return math.sin(x)

def test2(x):
    return sin(x)

def test3(x, sin=math.sin):
    return sin(x)

通过dis模块的dis可以查看其字节码调用

In [8]:
import dis

dis.dis(test1)
  5           0 LOAD_GLOBAL              0 (math)
              2 LOAD_ATTR                1 (sin)
              4 LOAD_FAST                0 (x)
              6 CALL_FUNCTION            1
              8 RETURN_VALUE
In [9]:
dis.dis(test2)
  8           0 LOAD_GLOBAL              0 (sin)
              2 LOAD_FAST                0 (x)
              4 CALL_FUNCTION            1
              6 RETURN_VALUE
In [10]:
dis.dis(test3)
 11           0 LOAD_FAST                1 (sin)
              2 LOAD_FAST                0 (x)
              4 CALL_FUNCTION            1
              6 RETURN_VALUE

这里通过三种不同的代码方式,将sin函数保存在不同的命名空间内。
在test1中先寻找math模块,再寻找sin函数经过了两次字典查询。 在test2中预先导入了sin函数,所以只需在全局空间寻找一次。
在test3中虽然也需要寻找一次sim函数,但是将其保存在本地变量之后,之后的寻找将会在本地变量中执行,这将会带来性能提升。当然,在普通函数调用中,这种写法并不常见,并且由于python3的优化,甚至于在普通调用 test2 和 test3的比较中, test2的效率要更高。 :)

In [11]:
%timeit test2(10000000)
205 ns ± 2.96 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [12]:
%timeit test3(10000000)
210 ns ± 2.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

生成器与迭代器

生成器最常见的一个用法就是如下这种:

In [1]:
def get_list(number):
    for i in range(number):
        yield i

这段代码本质上和range并没有区别,都是返回一个数组列表,和普通的循环返回列表相比,这样的操作会更加节省内存。这时因为通过在迭代的过程中保存了上下文的环境,我们只读取我们需要的值,而无需花费更多的空间去存储我们最终只会用到一次而不得不存储的一个列表。

在python2中,range是普通的循环,则xrange是生成器。这一点在python3中得到整合。

另外,for循环要求被循环的对象支持迭代。我们可以通过对一个自定义对象定义__iter__方法来支持迭代。
同样的,我们可以使用 iter 函数来获得一个对象的 __iter 属性。

迭代器的调用

我们可以使用以下两种方式来调用我们的迭代器。

In [3]:
for i in get_list(3):
    print(i)
0
1
2
In [11]:
f = get_list(3)
while True:
    try:
        i = next(f) # python3.6 之前可以使用 f.next()
        print(i)
    except StopIteration:
        break
0
1
2

同样的我们可以用列表推导式,像列表一样获取值。这将会返回一个generator对象,这对于我们后面的迭代器延时估值有关(书中原称,我不是很能理解这个翻译。姑且就认为是对迭代器的返回值进行分组分片等操作)。

In [14]:
(x for i in get_list(3))
Out[14]:
<generator object <genexpr> at 0x10dd84620>

迭代器的延时估值

正常的迭代器调用都是单向的,我们只能够拿到当前的值,而无法获取到其他值。
利用标准库 itertools 可以做到这一点。

In [16]:
import itertools
# 对一个生成器切片
tuple(itertools.islice(get_list(10), 3, 6)) # 此处直接切片会返回一个迭代对象,然后我们通过tuple去获取其所有值
Out[16]:
(3, 4, 5)
In [20]:
# 连接多个生成器
tuple(itertools.chain(get_list(3), get_list(3), get_list(3)))
Out[20]:
(0, 1, 2, 0, 1, 2, 0, 1, 2)
In [23]:
# 对生成器进行分组
def even_number(iterable):
    key = lambda x: x % 2 == 0
    return itertools.groupby(iterable, key)

for is_even, data in even_number(get_list(5)):
    print(is_even, tuple(data)) # 注意该groupby必须要求 序列是连续的
True (0,)
False (1,)
True (2,)
False (3,)
True (4,)
In [31]:
# 对生成器过滤
even_list = itertools.filterfalse(lambda x: not x[0], even_number(get_list(5)))
is_even, data = next(even_list)
print(is_even, tuple(data))
True (0,)

最后再说明一下这样做的意义,和普通的代码流程不同,这样的操作只是对代码逻辑设置了一条流水线,在最终的 tuplenext 没有调用之前,实际上是没有执行的。

这样,只有明确条件中目标元素的计算才会被执行,并且如果出现了一个提前终止的条件,那么就降低整体的运行时间。
我们通过修改以上代码来佐证这一点。

In [32]:
def get_list(number):
    print("number generator start ...")
    for i in range(number):
        yield i
        
iterable = even_number(get_list(5))
print("gen iterable. but it not run...")
even_list = itertools.filterfalse(lambda x: not x[0], iterable)
for is_even, data in even_list:
    print(is_even, tuple(data))
gen iterable. but it not run...
number generator start ...
True (0,)
True (2,)
True (4,)

Comments !