首页IT科技go实现并发(《Go 语言并发之道》读书笔记(七))

go实现并发(《Go 语言并发之道》读书笔记(七))

时间2025-06-18 21:19:43分类IT科技浏览4688
导读:今天这篇笔记我们来学习Go 限流 限流是分布式系统中经常需要用到的技术,因为我们让请求没有限制,很容易就出现某个用户开很多线程把我们的服务拉跨,进而影响到别的用户。...

今天这篇笔记我们来学习Go 限流

限流是分布式系统中经常需要用到的技术             ,因为我们让请求没有限制                    ,很容易就出现某个用户开很多线程把我们的服务拉跨       ,进而影响到别的用户             。

限流

我们来看下Go语言层面可以怎么做到限流      ,先看一段不限流的代码                    ,

type APIConnection struct{} func Open() *APIConnection { return &APIConnection{} } func (a *APIConnection) ReadFile(ctx context.Context) error { //假装我们在这里有运行 return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { //假装我们在这里有运行 return nil } func main() { defer log.Printf("Done") log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) apiConnection := Open() var wg sync.WaitGroup wg.Add(20) for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ReadFile(context.Background()) if err != nil { log.Printf("cannot ReadFile : %v", err) } log.Printf("ReadFile") }() } for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ResolveAddress(context.Background()) if err != nil { log.Printf("cannot ResolveAddress : %v", err) } log.Printf("ResolveAddress") }() } wg.Wait() }

上面的代码我们定义了两个假想的方法ReadFile 和 ResolveAddress             , 假设他们是去访问文件和读取网络      ,都是比较耗资源的操作                    。然后开启了20个goroutine去调用这两个方法

这段代码的运行结果如下 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ResolveAddress 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ReadFile 02:32:52 ResolveAddress 02:32:52 ReadFile 02:32:52 Done

我们可以看到一瞬间就都运行完了                    ,如果我们访问了实际的资源             ,然后又开了很多的goroutine,那么很容易就耗尽资源       。 为了防止这样的事情发生                    ,我们引入限流                    ,限定一段时间内,只能访问一定的资源             。 我们今天要讲的是基于令牌桶算法的限速             ,令牌桶是什么算法呢? 很简单就是有一个基础的令牌数d, 然后有固定的速度r往令牌桶中放令牌, 用户拿到令牌才能进行下一步                    ,拿不到就等待                   。

我们来看代码 type APIConnection struct { rateLimiter *rate.Limiter } func Open() *APIConnection { return &APIConnection{ rateLimiter: rate.NewLimiter(rate.Limit(2), 5), } } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil }

main func我们没有修改       ,这里只是在APIConnection 中增加了一个

rateLimiter: rate.NewLimiter(rate.Limit(2), 5)

rate是golang.org/x/time/rate 下面的一个包             , rate.NewLimiter是限速器                    ,方法定义如下

func NewLimiter(r Limit, b int) *Limiter

r就是我们前面说的速率       ,每秒多少个令牌

b 就是令牌桶的高度      ,开始的时候有几个       。

然后在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx)                    , Wait就是等待有令牌出现       。

运行的结果如下所示 02:48:16 ReadFile 02:48:16 ReadFile 02:48:16 ReadFile 02:48:16 ReadFile 02:48:16 ResolveAddress 02:48:17 ResolveAddress 02:48:17 ResolveAddress 02:48:18 ReadFile 02:48:18 ResolveAddress 02:48:19 ResolveAddress 02:48:19 ResolveAddress 02:48:20 ResolveAddress 02:48:20 ReadFile 02:48:21 ResolveAddress 02:48:21 ReadFile 02:48:22 ReadFile 02:48:22 ReadFile 02:48:23 ResolveAddress 02:48:23 ReadFile 02:48:24 ResolveAddress 02:48:24 Done

通过时间我们可以看到前面很快执行了5次             ,就是拿到了令牌桶中的5个令牌      ,后面每秒中执行两次                    ,也就是我们的速率2个/秒                   。程序运行符合我们预期             ,达到了限速的效果             。

组合限流

书中作者还举了两个例子,运用组合来限速                    ,比如要求一秒中不能超过两个                    ,同时一分钟不能超过10个       。 属于Go语言的一点组合功能,示例代码如下

type RateLimiter interface { Wait(context.Context) error Limit() rate.Limit } type multiLimiter struct { limiters []RateLimiter } func MultiLimiter(limiters ...RateLimiter) *multiLimiter { byLimit := func(i, j int) bool { return limiters[i].Limit() < limiters[j].Limit() } sort.Slice(limiters, byLimit) return &multiLimiter{limiters: limiters} } func (l *multiLimiter) Wait(ctx context.Context) error { for _, l := range l.limiters { if err := l.Wait(ctx); err != nil { return err } } return nil } func (l *multiLimiter) Limit() rate.Limit { return l.limiters[0].Limit() } func Per(eventCount int, duration time.Duration) rate.Limit { return rate.Every(duration / time.Duration(eventCount)) } type APIConnection struct { rateLimiter RateLimiter } func Open() *APIConnection { secondLimit := rate.NewLimiter(Per(2, time.Second), 1) minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)} } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } return nil }

定义了multiLimiter来组合这些限速器             ,然后定义了Wait方法                    。比较简单                    ,这里不详述

还可以不同的设备分不同的限速器, 这里也是贴出代码不详述 type APIConnection struct { networkLimit, diskLimit, apiLimit RateLimiter } func Open() *APIConnection { return &APIConnection{ apiLimit: MultiLimiter( rate.NewLimiter(Per(2, time.Second), 1), rate.NewLimiter(Per(10, time.Minute), 10), ), diskLimit: MultiLimiter( rate.NewLimiter(rate.Limit(1), 1), ), networkLimit: MultiLimiter( rate.NewLimiter(Per(3, time.Second), 3), ), } } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil { return err } return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil { return err } return nil }

不同用户限流

我们做web请求的时候       ,会遇到这样的需求             ,根据不同的用户给不同的限速                    ,这里简单的给个sample, 其实就是用map把用户和限速器关联起来             。

var userLimit = make(map[string]*rate.Limiter) func doWork(user string) { if userLimit[user].Allow() { log.Printf("%s do work \n", user) } else { log.Printf("%s not work \n", user) } } func Per(eventCount int, duration time.Duration) rate.Limit { return rate.Every(duration / time.Duration(eventCount)) } func main() { log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1) userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5) var wg sync.WaitGroup wg.Add(20) for i := 0; i < 10; i++ { go func() { defer wg.Done() doWork("user1") }() time.Sleep(500 * time.Millisecond) } for i := 0; i < 10; i++ { go func() { defer wg.Done() doWork("user2") }() time.Sleep(500 * time.Millisecond) } wg.Wait() }

上面的例子中用户1被限制1秒访问2次       ,用户2被限制1分钟访问2次

03:19:14 user1 do work 03:19:15 user1 do work 03:19:15 user1 do work 03:19:16 user1 do work 03:19:16 user1 do work 03:19:17 user1 do work 03:19:17 user1 do work 03:19:18 user1 do work 03:19:18 user1 do work 03:19:19 user1 do work 03:19:20 user2 do work 03:19:20 user2 do work 03:19:21 user2 do work 03:19:21 user2 do work 03:19:22 user2 do work 03:19:22 user2 not work 03:19:23 user2 not work 03:19:23 user2 not work 03:19:24 user2 not work 03:19:24 user2 not work

这里用户1基本能得到执行      ,用户2执行了5次后                    ,由于没有拿到令牌             ,就不能work了。这样达到了不同用户      ,不同的限速器                    。

总结

对于限速                    ,可以在服务器层面进行限速             ,我们这里是在后台程序端进行限速, 也有不少现成的解决方案 https://www.jianshu.com/p/c13843d2e1ec

对于分布式的系统这样的限速自然是不够的                    ,可以结合redis的功能来进行限速                    ,网上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822

没有实际操作,后面我们再实际操作下再来记录                    。

创心域SEO版权声明:以上内容作者已申请原创保护,未经允许不得转载,侵权必究!授权事宜、对本内容有异议或投诉,敬请联系网站管理员,我们将尽快回复您,谢谢合作!

展开全文READ MORE
无人机飞行仿真模拟训练系统(6款常见的无人机仿真开发平台(附超详细特点功能对比)) 独立站如何打造爆款(探秘独立站SEO:如何提升网站排名吸引流量)