限流器
# 限流器
# 分析问题
在服务端接入第三方系统时,api 请求通常会有限流要求,但对于 api 提供方的限流机制往往是未知的,所以在限流时需要做最严格的限流,才能保证 api 访问的成功率。毕竟 api 请求失败的代价与限流相比是很大的。
这里的限流器使用最保守的方法设计,即保证限流频次,以要求的最大频次去限流。以每秒最大访问次数 10 次为例,则每 0.1 秒分发一次请求。但 api 提供方的限流方式未知,一个 api 在对方系统中的处理时间如果超过了 0.1 秒,可能会对对方系统造成压力,并且如果对方系统以【流入-流出】整个环节作为限流标准的话,也可能出现被限流情况。
实际开发中发现了两个问题:
- 分页访问时,页码大于 1 的请求,应该有限访问
- api 提供方,对部分 api 请求限流要更严格,即统一个限流器对流速也要进行动态调控
# 限流器代码
package limiter
import "time"
// LocalLimiter 进程内限流器,支持按优先级排序触发,支持以最小限流频次倍增限流
// 使用方法:
// 1. 通过 NewLocalLimiter 方法新建限流器
// 2. 通过 Run 添加要限流的函数 f
// 在f执行时会增加限流计数,限流计数标识当前正在执行的函数总数
// 若正在执行的函数总数,达到频次上线,则被限流
// 3. 通过 FuncDone 通知函数执行完成
// 减少对应的限流计数
type LocalLimiter struct {
itemTail *localLimiterItem
itemHead *localLimiterItem
addC chan *localLimiterItem
doingC chan bool
tickerC *time.Ticker
frequency int
duration time.Duration
tickDuration time.Duration
}
type localLimiterItem struct {
priority int
multiple float32
f func()
next *localLimiterItem
}
// FuncDone 函数执行完成,返还限流计数
func (l *LocalLimiter) FuncDone() {
<-l.doingC
}
// Run 以go routine方式执行一个函数f
// 参数 priority 排序优先级,限流器将优先执行优先级高的函数
// 参数 multiple 当前函数执行要占用的限流计数倍数
func (l *LocalLimiter) Run(priority int, multiple float32, f func()) {
if multiple < 1 {
multiple = 1
}
item := &localLimiterItem{
priority: priority,
multiple: multiple,
f: f,
}
l.addC <- item
}
// NewLocalLimiter 新建本地限流器
// 参数 frequency 每秒请求的频次
// 例:frequency = 2,即限制每1/2秒触发一次
func NewLocalLimiter(frequency int) *LocalLimiter {
duration := time.Second/time.Duration(frequency) + 1
l := &LocalLimiter{
frequency: frequency,
duration: duration,
itemHead: &localLimiterItem{},
tickerC: time.NewTicker(duration),
doingC: make(chan bool, frequency),
addC: make(chan *localLimiterItem),
}
l.itemTail = l.itemHead
l.itemHead.next = l.itemTail
go l.run()
return l
}
func (l *LocalLimiter) run() {
for {
select {
case item := <-l.addC:
l.add(item)
case <-l.tickerC.C:
l.do()
}
}
}
func (l *LocalLimiter) do() {
if l.itemHead == l.itemTail {
return
}
item := l.itemHead.next
if cap(l.doingC) < int(float32(len(l.doingC))*item.multiple) {
return
}
l.itemHead.next = item.next
if item == l.itemTail {
l.itemTail = l.itemHead
}
l.doingC <- true
go item.f()
}
func (l *LocalLimiter) add(item *localLimiterItem) {
if l.itemTail != l.itemHead && item.priority > 0 {
p := l.itemHead
for p.next != nil {
if p.next.priority < item.priority {
item.next = p.next
p.next = item
// 插入成功直接返回
return
}
p = p.next
}
}
// 在尾节点前直接插入
l.itemTail.next = item
l.itemTail = item
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 使用方法
limiter := NewLocalLimiter(10)
priority := 1
mutiple := 2
limiter.Run(priority, multiple, func() {
defer limiter.FuncDone()
// 其他代码
})
1
2
3
4
5
6
7
2
3
4
5
6
7
编辑 (opens new window)
上次更新: 2023/02/24, 10:34:03