前言
一个业务型的服务,被open接口后,遭遇并发扫数据,于是要做限流操作。一直固执的认为,业务API和OpenAPI要分开处理,或许因为起初接入其他企业ERP系统都是走较为规范的OpenAPI,始终对于这种开发系统业务API的做法感觉不好。
窗口限流
需求是要在Django的一个工程里做限流,倘若是rest_framework的View也好办,直接就提供了限流 rest_framework throttling
可参照文档设置。不能直接使用设置的原因是,面对是Django做的一个服务,然后proxy至别的服务,工程仅仅承担一个转发的职责。如果在LB上限流,无法区分来源IP,只能是总量限流,很可能导致一旦被限流,正常平台访问被拒绝。所以我需要的限流需求非常清晰,首先限流的粒度是需要先知道访问来源的真实IP,在一定窗口时间内的访问次数,诸如 100/min。
rest_framework 提供了比错的实现思路,类似实现一套打点记录的,片段存储,打点记录为需要限制的实时条件。就以上述 100/min为例,首先一分钟之内,IP1没有任何访问,则没有任何限制数据,redis的过期时间,满足了此数据设置,再有,1分钟之内,要满足次数不超过100次,维护一个数组,长度超过100则意味超过访问限制,数组中记录请求每次访问的时刻值,窗口滑动就是淘汰掉连续访问中,以当前时刻后置一分钟之前的访问打点,保证了数组窗口永远都是以当前最近请求进入1min之内的记录点。
# throttle setting
THROTTLE_RATES
=
{
'resource1'
:
'100/min'
,
'resource2'
:
'20/second'
}
# throttle class
class
WindowAccessThrottle
:
cache
=
Cache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
self
.
rate
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
parse_rate
(
self
)
:
num
,
period
=
self
.
rate
.
split
(
'/'
)
num_requests
=
int
(
num
)
duration
=
{
's'
:
1
,
'm'
:
60
,
'h'
:
3600
,
'd'
:
86400
}
[
period
[
0
]
]
return
num_requests
,
duration
def
get_cache_key
(
self
)
:
host
=
self
.
request
.
META
[
'HTTP_X_FORWARDED_FOR'
]
\
if
self
.
request
.
META
.
get
(
'HTTP_X_FORWARDED_FOR'
,
None
)
else
\
self
.
request
.
META
[
'REMOTE_ADDR'
]
return
'throttle:{}:{}'
.
format
(
host
,
self
.
view
.
__name__
)
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
num_requests
,
duration
=
self
.
parse_rate
(
)
while
history
and
history
[
-
1
]
<=
now
-
duration
:
history
.
pop
(
)
if
len
(
history
)
>=
num_requests
:
return
False
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
duration
)
return
True
注意
1,上述示例可根据实际需求修改
2,在做IP级别限定是,如果直接调用request.META[‘REMOTE_ADDR’]获取的是请求直接过来的IP,实际部署服务多数是经过LB,或者nginx反向代理的,REMOTE_ADDR多数就是前置LB的IP,所以取用HTTP_X_FORWARDED_FOR获取发起请求的远端IP。
3,
cache = Cache()
就是一个redis的封装,稍微实现下
cache.get_value(self.key, [])
对获取支持默认值
4,使用时类似原生的throttle,在view函数中设置 scope
4,配合Django的中间件,调用判定,大致如下:
from
django
.
urls
import
resolve
'''
实际下面中间件需要根据需求自定义调试,如果只是rest_framework的View可以直接用原生的设定,因为笔者是自己封装的转发View,
相当于重新自定义一个完全新的通用视图,需要重新实现限流
'''
class
ThrottleMiddleware
(
MiddlewareMixin
)
:
def
process_request
(
self
,
request
)
:
resolver
=
resolve
(
request
.
path
)
throttle_scope
=
getattr
(
resolver
.
func
,
'throttle_scope'
,
None
)
throttle
=
WindowAccessThrottle
(
request
,
resolver
.
func
,
throttle_scope
)
if
throttle
.
allow_request
(
)
:
return
else
:
return
HttpResponse
(
)
漏斗限流
上面窗口限流,一定程度上解决了流量猛增的问题,但是以上面 120/min的限流为例,用户在1分钟的某一瞬间,120的并发,此种场景,上面的限流器基本没有作用了,设想能够在短时间内,既限制访问的总量,也能限制访问的频率至于过高,漏斗限流就非常理想,基本抽象模型:
1,漏斗参数:
- capacity:容量,漏斗大小
- rate:漏斗流出速率,可以用 total和duration计算,一段时间duration内允许通过的总量total
2,当漏斗为空漏斗时:
- 访问进入的速率 < rate,此时漏斗无积压,请求一律通过
- 访问进入的速率 >= rate,此时漏斗中逐渐积压,且漏斗以rate值不断流出
3,当漏斗不为空时:
- 出水口以最大速率流出
- 漏斗未满,会继续纳入
- 漏斗已满,则会直接溢出,拒绝请求
用漏斗限流实现上述IP限流,示例如下:
THROTTLE_RATES
=
{
'funnel'
:
{
'capacity'
:
15
,
'duration'
:
60
,
# seconds
'total'
:
30
,
}
,
}
class
FunnelThrottle
:
cache
=
CusCache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
config
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
rate
=
config
[
'total'
]
/
config
[
'duration'
]
self
.
capacity
=
config
[
'capacity'
]
self
.
duration
=
config
[
'duration'
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
get_cache_key
(
self
)
:
"""
same as WindowAccessThrottle
"""
pass
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
if
not
history
:
# 空漏斗直接放行
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
latest_duration
=
now
-
history
[
0
]
# 距离最近的一次放行时间间隔
leak_count
=
int
(
latest_duration
*
self
.
rate
)
# 由间隔时间和漏斗流速计算此段时间漏斗腾出空间
for
i
in
range
(
leak_count
)
:
if
history
:
history
.
pop
(
)
else
:
break
# 在上述漏斗清理流出空间后,漏斗仍旧满量,直接判定不可访问
if
len
(
history
)
>=
self
.
capacity
:
return
False
# 如果可访问,请求进入漏斗计量
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
Note:
1,漏斗限流方式和之前窗口限流所用的数据结构在cache中基本一致,只因判定算法不同,所达到的限流效果,完全不同
2,漏斗限流,进入漏斗计量的点,表示一律放行通过了,只是,在漏斗中会根据下一次访问进入时间判定该点是否由漏斗的rate失效,而达到容量合理,限制流速的效果
Redis 漏斗限流 (redis-cell)
上述的漏斗限流算法,在Redis的模块中已经内置实现了一个,具体介绍请参见Github redis-cell详细介绍 笔者安装在MacOS上,基本没有问题:
# 下载mac版本安装包
https://github.com/brandur/redis-cell/releases
# 解压
tar
-zxf redis-cell-*.tar.gz
# 复制可执行文件
cp
libredis_cell.dylib /your_redis_server_localtion
# 重启redis-server,把libredis_cell.dylib加载上
redis-server --loadmodule /path/to/modules/libredis_cell.dylib
安装重启后,可以在redis中执行 CL.THROTTLE 命令:
# CL.THROTTLE user123 15 30 60 1和实现算法中的配置类似,user123表示限流key,15: capacity,30: total,60: duration,
127.0.0.1:6379> CL.THROTTLE user123 15 30 60 1
1) (integer) 0 # 0表示允许,1表示拒绝
2) (integer) 16 # 漏斗容量 max_burst + 1 = 15 +1 =16
3) (integer) 15 # 漏斗剩余容量
4) (integer) -1 # 如果被拒绝,多少秒后重试
5) (integer) 2 # 多长时间后漏斗完全漏空
但是redis-cell没有找到对应的sdk
Python Bound method
# python 3.x
def
func
(
)
:
pass
class
A
:
@
classmethod
def
method_cls
(
cls
)
:
pass
def
method_a
(
self
)
:
pass
class
B
(
A
)
:
pass
a
,
b
=
A
(
)
,
B
(
)
print
(
func
)
#
print
(
a
.
method_a
)
#
<__main__.A object at 0x10ef11978>>
print
(
b
.
method_cls
)
#
>
对于上文中
func
就是一个函数对象,而
method_a
和
method_cls
是归属类A的所以,是一个
bound method
,那么如何查看一个
bound method
的归属呢?
Python 2.x中提供了 im_func,im_class,im_self三个属性:
-
im_func
is the function object. -
im_class
is the class the method comes from. -
im_self
is the self object the method is bound to.
Python3.x中
-
__func__
replaceim_func
-
__self__
replaceim_self
2.x中的im_class
取消
# python 3.x
print
(
a
.
method_a
.
__self__
)
print
(
b
.
method_cls
.
__self__
)
# print(func.__self__) error func 无 __self__
print
(
b
.
method_cls
.
__self__
.
__name__
)
# print(b.method_cls.__self__.__name__) error b.method_cls.__self__是一个实例,无__name__属性
关于
__name__
和
__qualname__
请参见 PEP 3155