子嘉的博客 子嘉的博客
首页
bic-bic
技术
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

高子嘉

没有比脚更长的路,没有比人更高的山
首页
bic-bic
技术
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 技术文档

  • GitHub技巧

  • 博客搭建

  • 服务端

  • distributed

  • golang

    • gomod
    • compile
    • code_snippet

      • json反序列化 int64丢失
      • 限流器
        • 限流器
          • 分析问题
          • 限流器代码
          • 使用方法
    • protobuf
    • 单元测试
    • script

  • db

  • docker

  • linux

  • 技术
  • golang
  • code_snippet
子嘉
2022-10-11
目录

限流器

# 限流器

# 分析问题

在服务端接入第三方系统时,api 请求通常会有限流要求,但对于 api 提供方的限流机制往往是未知的,所以在限流时需要做最严格的限流,才能保证 api 访问的成功率。毕竟 api 请求失败的代价与限流相比是很大的。

这里的限流器使用最保守的方法设计,即保证限流频次,以要求的最大频次去限流。以每秒最大访问次数 10 次为例,则每 0.1 秒分发一次请求。但 api 提供方的限流方式未知,一个 api 在对方系统中的处理时间如果超过了 0.1 秒,可能会对对方系统造成压力,并且如果对方系统以【流入-流出】整个环节作为限流标准的话,也可能出现被限流情况。

实际开发中发现了两个问题:

  1. 分页访问时,页码大于 1 的请求,应该有限访问
  2. api 提供方,对部分 api 请求限流要更严格,即统一个限流器对流速也要进行动态调控

# 限流器代码

package limiter

import "time"

// LocalLimiter 进程内限流器,支持按优先级排序触发,支持以最小限流频次倍增限流
// 使用方法:
// 1. 通过 NewLocalLimiter 方法新建限流器
// 2. 通过 Run 添加要限流的函数 f
//		在f执行时会增加限流计数,限流计数标识当前正在执行的函数总数
//		若正在执行的函数总数,达到频次上线,则被限流
// 3. 通过 FuncDone 通知函数执行完成
//		减少对应的限流计数
type LocalLimiter struct {
	itemTail     *localLimiterItem
	itemHead     *localLimiterItem
	addC         chan *localLimiterItem
	doingC       chan bool
	tickerC      *time.Ticker
	frequency    int
	duration     time.Duration
	tickDuration time.Duration
}
type localLimiterItem struct {
	priority int
	multiple float32
	f        func()
	next     *localLimiterItem
}

// FuncDone 函数执行完成,返还限流计数
func (l *LocalLimiter) FuncDone() {
	<-l.doingC
}

// Run 以go routine方式执行一个函数f
// 参数 priority 排序优先级,限流器将优先执行优先级高的函数
// 参数 multiple 当前函数执行要占用的限流计数倍数
func (l *LocalLimiter) Run(priority int, multiple float32, f func()) {
	if multiple < 1 {
		multiple = 1
	}
	item := &localLimiterItem{
		priority: priority,
		multiple: multiple,
		f:        f,
	}
	l.addC <- item
}

// NewLocalLimiter 新建本地限流器
// 参数 frequency 每秒请求的频次
// 例:frequency = 2,即限制每1/2秒触发一次
func NewLocalLimiter(frequency int) *LocalLimiter {
	duration := time.Second/time.Duration(frequency) + 1
	l := &LocalLimiter{
		frequency: frequency,
		duration:  duration,
		itemHead:  &localLimiterItem{},
		tickerC:   time.NewTicker(duration),
		doingC:    make(chan bool, frequency),
		addC:      make(chan *localLimiterItem),
	}
	l.itemTail = l.itemHead
	l.itemHead.next = l.itemTail
	go l.run()
	return l
}

func (l *LocalLimiter) run() {
	for {
		select {
		case item := <-l.addC:
			l.add(item)
		case <-l.tickerC.C:
			l.do()
		}
	}
}

func (l *LocalLimiter) do() {
	if l.itemHead == l.itemTail {
		return
	}
	item := l.itemHead.next
	if cap(l.doingC) < int(float32(len(l.doingC))*item.multiple) {
		return
	}

	l.itemHead.next = item.next
	if item == l.itemTail {
		l.itemTail = l.itemHead
	}
	l.doingC <- true
	go item.f()
}

func (l *LocalLimiter) add(item *localLimiterItem) {
	if l.itemTail != l.itemHead && item.priority > 0 {
		p := l.itemHead
		for p.next != nil {
			if p.next.priority < item.priority {
				item.next = p.next
				p.next = item
				// 插入成功直接返回
				return
			}
			p = p.next
		}
	}
	// 在尾节点前直接插入
	l.itemTail.next = item
	l.itemTail = item
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

# 使用方法

limiter := NewLocalLimiter(10)
priority := 1
mutiple := 2
limiter.Run(priority, multiple, func() {
    defer limiter.FuncDone()
    // 其他代码
})
1
2
3
4
5
6
7
编辑 (opens new window)
#golang#限流器
上次更新: 2023/02/24, 10:34:03
json反序列化 int64丢失
protobuf

← json反序列化 int64丢失 protobuf→

最近更新
01
mongodb restore
03-06
02
pytesseract
02-28
03
consul
02-24
更多文章>
Theme by Vdoing | Copyright © 2022-2025 子嘉 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式