Python -- 限流 throttle

系统 1456 0

前言

一个业务型的服务,被open接口后,遭遇并发扫数据,于是要做限流操作。一直固执的认为,业务API和OpenAPI要分开处理,或许因为起初接入其他企业ERP系统都是走较为规范的OpenAPI,始终对于这种开发系统业务API的做法感觉不好。

窗口限流

需求是要在Django的一个工程里做限流,倘若是rest_framework的View也好办,直接就提供了限流 rest_framework throttling
可参照文档设置。不能直接使用设置的原因是,面对是Django做的一个服务,然后proxy至别的服务,工程仅仅承担一个转发的职责。如果在LB上限流,无法区分来源IP,只能是总量限流,很可能导致一旦被限流,正常平台访问被拒绝。所以我需要的限流需求非常清晰,首先限流的粒度是需要先知道访问来源的真实IP,在一定窗口时间内的访问次数,诸如 100/min。

rest_framework 提供了比错的实现思路,类似实现一套打点记录的,片段存储,打点记录为需要限制的实时条件。就以上述 100/min为例,首先一分钟之内,IP1没有任何访问,则没有任何限制数据,redis的过期时间,满足了此数据设置,再有,1分钟之内,要满足次数不超过100次,维护一个数组,长度超过100则意味超过访问限制,数组中记录请求每次访问的时刻值,窗口滑动就是淘汰掉连续访问中,以当前时刻后置一分钟之前的访问打点,保证了数组窗口永远都是以当前最近请求进入1min之内的记录点。

            
              
                # throttle setting
              
              
THROTTLE_RATES 
              
                =
              
              
                {
              
              
                'resource1'
              
              
                :
              
              
                '100/min'
              
              
                ,
              
              
                'resource2'
              
              
                :
              
              
                '20/second'
              
              
                }
              
              
                # throttle class
              
              
                class
              
              
                WindowAccessThrottle
              
              
                :
              
              

    cache 
              
                =
              
               Cache
              
                (
              
              
                )
              
              
    timer 
              
                =
              
               time
              
                .
              
              time

    
              
                def
              
              
                __init__
              
              
                (
              
              self
              
                ,
              
               request
              
                ,
              
               view
              
                ,
              
               scope
              
                )
              
              
                :
              
              
        self
              
                .
              
              rate 
              
                =
              
               settings
              
                .
              
              THROTTLE_RATES
              
                [
              
              scope
              
                ]
              
              
        self
              
                .
              
              request 
              
                =
              
               request
        self
              
                .
              
              view 
              
                =
              
               view
        self
              
                .
              
              key 
              
                =
              
               self
              
                .
              
              get_cache_key
              
                (
              
              
                )
              
              
                def
              
              
                parse_rate
              
              
                (
              
              self
              
                )
              
              
                :
              
              
        num
              
                ,
              
               period 
              
                =
              
               self
              
                .
              
              rate
              
                .
              
              split
              
                (
              
              
                '/'
              
              
                )
              
              
        num_requests 
              
                =
              
              
                int
              
              
                (
              
              num
              
                )
              
              
        duration 
              
                =
              
              
                {
              
              
                's'
              
              
                :
              
              
                1
              
              
                ,
              
              
                'm'
              
              
                :
              
              
                60
              
              
                ,
              
              
                'h'
              
              
                :
              
              
                3600
              
              
                ,
              
              
                'd'
              
              
                :
              
              
                86400
              
              
                }
              
              
                [
              
              period
              
                [
              
              
                0
              
              
                ]
              
              
                ]
              
              
                return
              
               num_requests
              
                ,
              
               duration

    
              
                def
              
              
                get_cache_key
              
              
                (
              
              self
              
                )
              
              
                :
              
              
        host 
              
                =
              
               self
              
                .
              
              request
              
                .
              
              META
              
                [
              
              
                'HTTP_X_FORWARDED_FOR'
              
              
                ]
              
               \
            
              
                if
              
               self
              
                .
              
              request
              
                .
              
              META
              
                .
              
              get
              
                (
              
              
                'HTTP_X_FORWARDED_FOR'
              
              
                ,
              
              
                None
              
              
                )
              
              
                else
              
               \
            self
              
                .
              
              request
              
                .
              
              META
              
                [
              
              
                'REMOTE_ADDR'
              
              
                ]
              
              
                return
              
              
                'throttle:{}:{}'
              
              
                .
              
              
                format
              
              
                (
              
              host
              
                ,
              
               self
              
                .
              
              view
              
                .
              
              __name__
              
                )
              
              
                def
              
              
                allow_request
              
              
                (
              
              self
              
                )
              
              
                :
              
              
        history 
              
                =
              
               self
              
                .
              
              cache
              
                .
              
              get_value
              
                (
              
              self
              
                .
              
              key
              
                ,
              
              
                [
              
              
                ]
              
              
                )
              
              
        now 
              
                =
              
               self
              
                .
              
              timer
              
                (
              
              
                )
              
              
        num_requests
              
                ,
              
               duration 
              
                =
              
               self
              
                .
              
              parse_rate
              
                (
              
              
                )
              
              
                while
              
               history 
              
                and
              
               history
              
                [
              
              
                -
              
              
                1
              
              
                ]
              
              
                <=
              
               now 
              
                -
              
               duration
              
                :
              
              
            history
              
                .
              
              pop
              
                (
              
              
                )
              
              
                if
              
              
                len
              
              
                (
              
              history
              
                )
              
              
                >=
              
               num_requests
              
                :
              
              
                return
              
              
                False
              
              

        history
              
                .
              
              insert
              
                (
              
              
                0
              
              
                ,
              
               now
              
                )
              
              
        self
              
                .
              
              cache
              
                .
              
              
                set
              
              
                (
              
              self
              
                .
              
              key
              
                ,
              
               history
              
                ,
              
               duration
              
                )
              
              
                return
              
              
                True
              
            
          

注意
1,上述示例可根据实际需求修改
2,在做IP级别限定是,如果直接调用request.META[‘REMOTE_ADDR’]获取的是请求直接过来的IP,实际部署服务多数是经过LB,或者nginx反向代理的,REMOTE_ADDR多数就是前置LB的IP,所以取用HTTP_X_FORWARDED_FOR获取发起请求的远端IP。
3, cache = Cache() 就是一个redis的封装,稍微实现下 cache.get_value(self.key, []) 对获取支持默认值
4,使用时类似原生的throttle,在view函数中设置 scope
4,配合Django的中间件,调用判定,大致如下:

            
              
                from
              
               django
              
                .
              
              urls 
              
                import
              
               resolve


              
                '''
实际下面中间件需要根据需求自定义调试,如果只是rest_framework的View可以直接用原生的设定,因为笔者是自己封装的转发View,
相当于重新自定义一个完全新的通用视图,需要重新实现限流
'''
              
              
                class
              
              
                ThrottleMiddleware
              
              
                (
              
              MiddlewareMixin
              
                )
              
              
                :
              
              
                def
              
              
                process_request
              
              
                (
              
              self
              
                ,
              
               request
              
                )
              
              
                :
              
              
		resolver 
              
                =
              
               resolve
              
                (
              
              request
              
                .
              
              path
              
                )
              
              
		throttle_scope 
              
                =
              
              
                getattr
              
              
                (
              
              resolver
              
                .
              
              func
              
                ,
              
              
                'throttle_scope'
              
              
                ,
              
              
                None
              
              
                )
              
              
		throttle 
              
                =
              
               WindowAccessThrottle
              
                (
              
              request
              
                ,
              
               resolver
              
                .
              
              func
              
                ,
              
               throttle_scope
              
                )
              
              
                if
              
               throttle
              
                .
              
              allow_request
              
                (
              
              
                )
              
              
                :
              
              
                return
              
              
                else
              
              
                :
              
              
                return
              
               HttpResponse
              
                (
              
              
                )
              
            
          

漏斗限流

上面窗口限流,一定程度上解决了流量猛增的问题,但是以上面 120/min的限流为例,用户在1分钟的某一瞬间,120的并发,此种场景,上面的限流器基本没有作用了,设想能够在短时间内,既限制访问的总量,也能限制访问的频率至于过高,漏斗限流就非常理想,基本抽象模型:
1,漏斗参数:
- capacity:容量,漏斗大小
- rate:漏斗流出速率,可以用 total和duration计算,一段时间duration内允许通过的总量total
2,当漏斗为空漏斗时:
- 访问进入的速率 < rate,此时漏斗无积压,请求一律通过
- 访问进入的速率 >= rate,此时漏斗中逐渐积压,且漏斗以rate值不断流出
3,当漏斗不为空时:
- 出水口以最大速率流出
- 漏斗未满,会继续纳入
- 漏斗已满,则会直接溢出,拒绝请求
用漏斗限流实现上述IP限流,示例如下:

            
              THROTTLE_RATES 
              
                =
              
              
                {
              
              
                'funnel'
              
              
                :
              
              
                {
              
              
                'capacity'
              
              
                :
              
              
                15
              
              
                ,
              
              
                'duration'
              
              
                :
              
              
                60
              
              
                ,
              
              
                # seconds
              
              
                'total'
              
              
                :
              
              
                30
              
              
                ,
              
              
                }
              
              
                ,
              
              
                }
              
              
                class
              
              
                FunnelThrottle
              
              
                :
              
              

    cache 
              
                =
              
               CusCache
              
                (
              
              
                )
              
              
    timer 
              
                =
              
               time
              
                .
              
              time

    
              
                def
              
              
                __init__
              
              
                (
              
              self
              
                ,
              
               request
              
                ,
              
               view
              
                ,
              
               scope
              
                )
              
              
                :
              
              
        config 
              
                =
              
               settings
              
                .
              
              THROTTLE_RATES
              
                [
              
              scope
              
                ]
              
              
        self
              
                .
              
              rate 
              
                =
              
               config
              
                [
              
              
                'total'
              
              
                ]
              
              
                /
              
               config
              
                [
              
              
                'duration'
              
              
                ]
              
              
        self
              
                .
              
              capacity 
              
                =
              
               config
              
                [
              
              
                'capacity'
              
              
                ]
              
              
        self
              
                .
              
              duration 
              
                =
              
               config
              
                [
              
              
                'duration'
              
              
                ]
              
              
        self
              
                .
              
              request 
              
                =
              
               request
        self
              
                .
              
              view 
              
                =
              
               view
        self
              
                .
              
              key 
              
                =
              
               self
              
                .
              
              get_cache_key
              
                (
              
              
                )
              
              
                def
              
              
                get_cache_key
              
              
                (
              
              self
              
                )
              
              
                :
              
              
                """
       	same as WindowAccessThrottle
       	"""
              
              
                pass
              
              
                def
              
              
                allow_request
              
              
                (
              
              self
              
                )
              
              
                :
              
              
        history 
              
                =
              
               self
              
                .
              
              cache
              
                .
              
              get_value
              
                (
              
              self
              
                .
              
              key
              
                ,
              
              
                [
              
              
                ]
              
              
                )
              
              
        now 
              
                =
              
               self
              
                .
              
              timer
              
                (
              
              
                )
              
              
                if
              
              
                not
              
               history
              
                :
              
              
                # 空漏斗直接放行
              
              
            history
              
                .
              
              insert
              
                (
              
              
                0
              
              
                ,
              
               now
              
                )
              
              
            self
              
                .
              
              cache
              
                .
              
              
                set
              
              
                (
              
              self
              
                .
              
              key
              
                ,
              
               history
              
                ,
              
               self
              
                .
              
              duration
              
                )
              
              
                return
              
              
                True
              
              

        latest_duration 
              
                =
              
               now 
              
                -
              
               history
              
                [
              
              
                0
              
              
                ]
              
              
                # 距离最近的一次放行时间间隔
              
              
        leak_count 
              
                =
              
              
                int
              
              
                (
              
              latest_duration 
              
                *
              
               self
              
                .
              
              rate
              
                )
              
              
                # 由间隔时间和漏斗流速计算此段时间漏斗腾出空间 
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              leak_count
              
                )
              
              
                :
              
              
                if
              
               history
              
                :
              
              
                history
              
                .
              
              pop
              
                (
              
              
                )
              
              
                else
              
              
                :
              
              
                break
              
              
                # 在上述漏斗清理流出空间后,漏斗仍旧满量,直接判定不可访问
              
              
                if
              
              
                len
              
              
                (
              
              history
              
                )
              
              
                >=
              
               self
              
                .
              
              capacity
              
                :
              
              
                return
              
              
                False
              
              
                # 如果可访问,请求进入漏斗计量
              
              
        history
              
                .
              
              insert
              
                (
              
              
                0
              
              
                ,
              
               now
              
                )
              
              
        self
              
                .
              
              cache
              
                .
              
              
                set
              
              
                (
              
              self
              
                .
              
              key
              
                ,
              
               history
              
                ,
              
               self
              
                .
              
              duration
              
                )
              
              
                return
              
              
                True
              
            
          

Note:
1,漏斗限流方式和之前窗口限流所用的数据结构在cache中基本一致,只因判定算法不同,所达到的限流效果,完全不同
2,漏斗限流,进入漏斗计量的点,表示一律放行通过了,只是,在漏斗中会根据下一次访问进入时间判定该点是否由漏斗的rate失效,而达到容量合理,限制流速的效果

Redis 漏斗限流 (redis-cell)

上述的漏斗限流算法,在Redis的模块中已经内置实现了一个,具体介绍请参见Github redis-cell详细介绍 笔者安装在MacOS上,基本没有问题:

            
              
                # 下载mac版本安装包
              
              
https://github.com/brandur/redis-cell/releases

              
                # 解压
              
              
                tar
              
               -zxf redis-cell-*.tar.gz

              
                # 复制可执行文件
              
              
                cp
              
               libredis_cell.dylib /your_redis_server_localtion

              
                # 重启redis-server,把libredis_cell.dylib加载上
              
              
redis-server --loadmodule /path/to/modules/libredis_cell.dylib

            
          

安装重启后,可以在redis中执行 CL.THROTTLE 命令:

            
              # CL.THROTTLE user123 15 30 60 1和实现算法中的配置类似,user123表示限流key,15: capacity,30: total,60: duration,
127.0.0.1:6379> CL.THROTTLE user123 15 30 60 1
1) (integer) 0  # 0表示允许,1表示拒绝
2) (integer) 16  # 漏斗容量 max_burst + 1 = 15 +1 =16
3) (integer) 15  #  漏斗剩余容量
4) (integer) -1  #  如果被拒绝,多少秒后重试
5) (integer) 2  # 多长时间后漏斗完全漏空

            
          

但是redis-cell没有找到对应的sdk

Python Bound method

            
              
                # python 3.x
              
              
                def
              
              
                func
              
              
                (
              
              
                )
              
              
                :
              
              
                pass
              
              
                class
              
              
                A
              
              
                :
              
              
	@
              
                classmethod
              
              
                def
              
              
                method_cls
              
              
                (
              
              cls
              
                )
              
              
                :
              
              
                pass
              
              
                def
              
              
                method_a
              
              
                (
              
              self
              
                )
              
              
                :
              
              
                pass
              
              
                class
              
              
                B
              
              
                (
              
              A
              
                )
              
              
                :
              
              
                pass
              
              

a
              
                ,
              
               b 
              
                =
              
               A
              
                (
              
              
                )
              
              
                ,
              
               B
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              func
              
                )
              
              
                # 
                
              
              
                print
              
              
                (
              
              a
              
                .
              
              method_a
              
                )
              
              
                # 
                
                  <__main__.A object at 0x10ef11978>>
                
              
              
                print
              
              
                (
              
              b
              
                .
              
              method_cls
              
                )
              
              
                # 
                
                  
                    >
                  
                
              
            
          

对于上文中 func 就是一个函数对象,而 method_a method_cls 是归属类A的所以,是一个 bound method ,那么如何查看一个 bound method 的归属呢?
Python 2.x中提供了 im_func,im_class,im_self三个属性:

  • im_func is the function object.
  • im_class is the class the method comes from.
  • im_self is the self object the method is bound to.

Python3.x中

  • __func__ replace im_func
  • __self__ replace im_self
    2.x中的 im_class 取消
            
              
                # python 3.x
              
              
                print
              
              
                (
              
              a
              
                .
              
              method_a
              
                .
              
              __self__
              
                )
              
              
                print
              
              
                (
              
              b
              
                .
              
              method_cls
              
                .
              
              __self__
              
                )
              
              
                # print(func.__self__) error func 无 __self__
              
              
                print
              
              
                (
              
              b
              
                .
              
              method_cls
              
                .
              
              __self__
              
                .
              
              __name__
              
                )
              
              
                # print(b.method_cls.__self__.__name__) error b.method_cls.__self__是一个实例,无__name__属性
              
            
          

关于 __name__ __qualname__ 请参见 PEP 3155


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论