August Rush

一个还在努力成长的小火汁!

游龙当归海,海不迎我自来也。

We create our own demons.

You can reach me at augustrush0923@gmail.com
Python实现数据缓存
发布:2021年04月09日 | 作者:augustrush | 阅读量: 401

什么是缓存

缓存(cache),其作用是缓和较慢存储的高频次请求。简单来说,就是加速慢存储的访问效率。

下面通过一个例子来说明缓存的作用:

import time


def query1(sql):
    time.sleep(1)  # 假设需要1s才能取出数据
    result = f'execute {sql}'
    return result

假设函数query1用来执行给定的SQL语句,但是每次请求的执行都要耗时1s以上,因此我们需要通过缓存来加速访问效率。

缓存的逻辑是,如果这个SQL被执行过了,那么在短时间内就没必要再次执行,应该直接复用上次的结果。

根据这个思路,我们来实现一版缓存的逻辑。

import time


CACHE = {}


def query2(sql):
    try:
        result = CACHE[sql]
    except KeyError:
        time.sleep(1)  # 假设需要1s才能取出数据
        result = f'execute {sql}'
        CACHE[sql] = result
    return result


if __name__ == '__main__':
    start = time.time()
    query2('SELECT * FROM database')
    print(time.time() - start)

    start = time.time()
    query2('SELECT * FROM database')
    print(time.time() - start)

先解释一下代码的逻辑。在第一次执行query2函数时,需要去执行SQL,而当执行完SQL拿到结果后,会把结果放到CACHE中,其中会以sql参数为key。这样如果后面的请求执行了相同的SQL,就可以直接通过CACHE获取到上次保存的结果,这样就能极大地提高执行效率。

想象一下,这个接口每天被调用上亿次的话,如果不加缓存,需要花费多少时间。

这里需要提到的一点是,这样的缓存方式属于被动缓存,也就是当有请求处理完之后才会缓存数据,即第一次请求还需要去实际执行。这一点很重要,尤其是当线上系统使用的是进程级的内存缓存时。

此外,还存在另一种方案:主动缓存。它有两种做法:其一是系统启动时,会自动把所有接口刷一遍,这样用户在访问时缓存就已经存在;其二就是在数据写入时同步更新或写入缓存。

query2的缓存获取中,我们通过KeyError异常的方式进行了处理。但try-except在异常发生时,开销会比较大。另一种方案是:

import time


CACHE = {}


def query3(sql):
    result = CACHE.get(sql)
    if not result:
        time.sleep(1)
        result = f'execute {sql}'
        CACHE[sql] = result
    return result


缓存装饰器

上面是一个简单的缓存,我们需要针对每个函数都写一遍缓存获取的逻辑,但这种方式的易用性比较差,并且会侵入到业务代码中。

因此,根据Python中装饰器的特性,可以将其改编为装饰器模式,通过装饰对应函数给其增加缓存逻辑。

import time
import functools


CACHE = {}


def cache_func(func):
    @functools.wraps(func)
    def inner(*args, **kwargs):
        key = repr(*args, **kwargs)
        result = CACHE.get(key)
        if not result:
            result = func(*args, **kwargs)
            CACHE[key] = result
        return result
    return inner


@cache_func
def query1(sql):
    time.sleep(1)  # 假设需要1s才能取出数据
    result = f'execute {sql}'
    return result


if __name__ == '__main__':
    start = time.time()
    query1('SELECT * FROM database')
    print(time.time() - start)

    start = time.time()
    query1('SELECT * FROM database')
    print(time.time() - start)


增强缓存装饰器

通过装饰器,我们可以更容易地使用缓存逻辑,但这还远远不够,毕竟数据不是死的,还会更新。如果缓存的数据始终不更新,那也是个问题。另外一个需要考虑的问题是CACHE的容量,不能让数据无限量地进入这个字典中,毕竟有些数据可能只用一次。

基于上面的两个问题,我们需要标记被缓存的数据以及设置CACHE的容量上限。这样我们就可以通过实时间或者空间的限制淘汰那些不经常使用的数据。

这个时候使用基础的Python内置的字典以及无法满足需求了,因此需要考虑自己来实现一个dict。这里我们需要用到dict内置的方法,也称magic method。

除了需要实现一个dict外,还需要设定好一套淘汰算法。当容量超过我们的设定值,删掉不需要的内容。关于缓存淘汰算法,你可以通过搜索引擎查找,这里我们使用LRU(Least Recently Used,近期最少使用)算法,其大体逻辑就是淘汰掉最长时间没使用的那个。在实现上,我们只需要一个能够顺序记录某个缓存key的访问时间的数据结构就行。

下面来具体实现以下

import time
from collections import OrderedDict


class LRUCacheDict:
    def __init__(self, max_size=1024, expiration=60):
        """ 最大容量为1024个key,每个key的有效期为60s """
        self.max_size = max_size  # 最大容量
        self.expiration = expiration  # 有效期

        self._cache = {}  # 创建存储容器
        self._access_records = OrderedDict()  # 记录访问时间
        self._expire_records = OrderedDict()  # 记录失效时间

    def __setitem__(self, key, value):
        now = self.get_now()  # 获取当前时间
        self.__delete__(key)  # 执行__delete__方法

        self._cache[key] = value  # 设置key-value到_cache容器内
        self._expire_records[key] = now + self.expiration  # 设置当前key的过期时间到_expire_records容器内
        self._access_records[key] = now  # 设置当前key的访问时间到_access_records容器内

        self.cleanup()  # 执行cleanup方法

    def __getitem__(self, key):
        now = self.get_now()  # 获取当前时间
        del self._access_records[key]  # 删除_access_records容器内当前key的value
        self._access_records[key] = now  # 设置当前key的访问时间到_access_records容器内
        self.cleanup()  # 执行cleanup方法

        return self._cache[key]  # 返回_cache容器内当前key的value

    def __contains__(self, key):
        self.cleanup()  # 执行cleanup方法
        return key in self._cache  # 判断当前key是否在_cache容器内,并返回True/False

    def __delete__(self, key):
        if key in self._cache:  # 判断当前key是否在_cache容器内
            del self._cache[key]  # 删除_cache容器内当前key的key-value
            del self._expire_records[key]  # 删除_expire_records容器内当前key的key-value
            del self._access_records[key]  # 删除_access_records容器内当前key的key-value

    def cleanup(self):
        """去掉无效(过期或者超出存储大小)的缓存"""
        if self.expiration is None:  # 如果expiration为None。则代表不过期
            return None

        pending_delete_keys = []  # 初始化一个列表
        now = self.get_now()  # 获取当前时间

        # 删除已经过期的缓存
        for k, v in self._expire_records.items():  # 循环获取_expire_records容器内各个key/value
            if v < now:  # 如果value值小于now
                pending_delete_keys.append(k)  # 往pending_delete_keys内添加当前key值

        for del_k in pending_delete_keys:  # 循环获取pending_delete_keys容器内key值
            self.__delete__(del_k)  # 执行__delete__方法

        # 如果数据量大于max_size,则删掉最旧的缓存
        while len(self._cache) > self.max_size:  # 循环判断当前_cache容器大小是否大于max_size
            for k in self._access_records:  # 循环获取_access_records容器内的key值
                self.__delete__(k)  # 执行__delete__方法
                break  # 结束当前循环

    @staticmethod
    def get_now():
        return int(time.time())


if __name__ == '__main__':
    cache_dict = LRUCacheDict(max_size=2, expiration=10)
    cache_dict['name'] = 'augustrush'
    cache_dict['age'] = 22
    cache_dict['addr'] = 'zhengzhou'

    print('name' in cache_dict)  # 输出False,因为容量为2,第一个Key会被删掉
    print('age' in cache_dict)  # 输出True
    print(cache_dict['age'])
    time.sleep(11)
    print('age' in cache_dict)  # 输出False,因为缓存失效了

至此我们重新定义了dict后,重新实现缓存装饰器。

缓存装饰器需要增加容量和有效期配置,这样我们就可以在给不同的函数装饰时增加不同的配置:对于数据变化频繁的函数,我们就把有效期设置短一些;对于变化不那么频繁的函数,我们就把有效期设置长一些。

def cache_func(max_size=1024, expiration=60):
    CACHE = LURCacheDict(max_size, expiration)

    def wrapper(func):
        @functools.wraps(func)
        def inner(*args, **kwargs):
             key = repr(*args, **kwargs)
             try:
                result = CACHE[key]
             except KeyError:
                result = func(*args, **kwargs)
                CACHE[key] = result
                return result

        return inner

    return wrapper


@cache_func(max_size=10, expiration=3)
def query(sql):
    time.sleep(1)
    result = f'execute %s' % sql
    return result


  • 标签云

  • 支付宝扫码支持一下

  • 微信扫码支持一下



基于Nginx+Supervisord+uWSGI+Django1.11.1+Python3.6.5构建

版权所有 © 2020-2021 August Rush

京ICP备20007446号-1 & 豫公网安备 41100202000460号

网站地图 & RSS | Feed