Go 协程超时控制 Select 阻塞方式 Context 方式

先说个场景:

假设业务中 A 服务需要调用 服务B,要求设置 5s 超时,那么如何优雅实现?

Select 超时控制

考虑是否可以用 select + time.After 方式进行实现

这里主要利用的是通道在携程之间通信的特点,当程序调用成功后,会向通道中发送信号。没调用成功前,通道会阻塞。

select { case res := <-c2: fmt.Println(res) case <-time.After(time.Second * 3): fmt.Println(“timeout 2”) }

当 c2 通道中有数据时,并且超时时间没有达到 3s,走 case res := <-c2 这个业务逻辑,当超时时间达到 3s , 走的 case <-time.After(time.Second * 3) 这个业务逻辑, 这样就可以实现超时 3s 的控制。

res:= <-c2 是因为channel 可以实现阻塞,那么 time.After 为啥可以阻塞呢?

看 After 源码。sleep.go 可以看到其实也是 channel

func After(d Duration) <-chan Time { return NewTimer(d).C}

完整代码示例:

package timeoutimport ( “fmt” “testing” “time”)func TestSelectTimeOut(t *testing.T) { // 在这个例子中, 假设我们执行了一个外部调用, 2秒之后将结果写入c1 c1 := make(chan string, 1) go func() { time.Sleep(time.Second * 2) c1 <- “result 1” }() // 这里使用select来实现超时, `res := <-c1`等待通道结果, // `<- Time.After`则在等待1秒后返回一个值, 因为select首先 // 执行那些不再阻塞的case, 所以这里会执行超时程序, 如果 // `res := <-c1`超过1秒没有执行的话 select { case res := <-c1: fmt.Println(res) case <-time.After(time.Second * 1): fmt.Println(“timeout 1”) } // 如果我们将超时时间设为3秒, 这个时候`res := <-c2`将在 // 超时case之前执行, 从而能够输出写入通道c2的值 c2 := make(chan string, 1) go func() { time.Sleep(time.Second * 2) c2 <- “result 2” }() select { case res := <-c2: fmt.Println(res) case <-time.After(time.Second * 3): fmt.Println(“timeout 2”) }}

运行结果:

=== RUN   TestSelectTimeOut timeout 1 result 2 — PASS: TestSelectTimeOut (3.00s) PASS

go timer 计时器

这个是 timer 类似的计时器实现,通用也是通过通道来发送数据。

package mainimport “time”import “fmt”func main() { // Ticker使用和Timer相似的机制, 同样是使用一个通道来发送数据。 // 这里我们使用range函数来遍历通道数据, 这些数据每隔500毫秒被 // 发送一次, 这样我们就可以接收到 ticker := time.NewTicker(time.Millisecond * 500) go func() { for t := range ticker.C { fmt.Println(“Tick at”, t) } }() // Ticker和Timer一样可以被停止。 一旦Ticker停止后, 通道将不再 // 接收数据, 这里我们将在1500毫秒之后停止 time.Sleep(time.Millisecond * 1500) ticker.Stop() fmt.Println(“Ticker stopped”)} go context

context 监听是否有 IO 操作,开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号,可发送取消信号,取消所有进行中的网络请求。

go func(ctx context.Context, info *Info) { timeLimit := 120 timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeLimit)*time.Millisecond) defer func() { cancel() wg.Done() }() resp := DoHttp(timeoutCtx, info.req) }(ctx, info)

关键看业务代码: resp := DoHttp(timeoutCtx, info.req) 业务代码中包含 http 调用 NewRequestWithContext

req, err := http.NewRequestWithContext(ctx, “POST”, url, strings.NewReader(paramString))

上面的代码,设置了过期时间,当DoHttp(timeoutCtx, info.req) 处理时间超过超时时间时,会自动截止,并且打印 context deadline exceeded。

看个代码:

package mainimport ( “context” “fmt” “testing” “time”)func TestTimerContext(t *testing.T) { now := time.Now() later, _ := time.ParseDuration(“10s”) ctx, cancel := context.WithDeadline(context.Background(), now.Add(later)) defer cancel() go Monitor(ctx) time.Sleep(20 * time.Second)}func Monitor(ctx context.Context) { select { case <-ctx.Done(): fmt.Println(ctx.Err()) case <-time.After(20 * time.Second): fmt.Println(“stop monitor”) }}

运行结果:

=== RUN   TestTimerContext context deadline exceeded — PASS: TestTimerContext (20.00s) PASS

Context 接口有如下:

type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{}} Deadline — 返回 context.Context 被取消的时间,也就是完成工作的截止日期; Done — 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消之后关闭,多次调用 Done 方法会返回同一个 Channel; Err — 返回 context.Context 结束的原因,它只会在 Done 返回的 Channel 被关闭时才会返回非空的值; 如果 context.Context 被取消,会返回 Canceled 错误; 如果 context.Context 超时,会返回 DeadlineExceeded 错误; Value — 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据;

到此这篇关于

Go 协程超时控制 Select 阻塞方式 Context 方式

先说个场景:

假设业务中 A 服务需要调用 服务B,要求设置 5s 超时,那么如何优雅实现?

Select 超时控制

考虑是否可以用 select + time.After 方式进行实现

这里主要利用的是通道在携程之间通信的特点,当程序调用成功后,会向通道中发送信号。没调用成功前,通道会阻塞。

select { case res := <-c2: fmt.Println(res) case <-time.After(time.Second * 3): fmt.Println(“timeout 2”) }

当 c2 通道中有数据时,并且超时时间没有达到 3s,走 case res := <-c2 这个业务逻辑,当超时时间达到 3s , 走的 case <-time.After(time.Second * 3) 这个业务逻辑, 这样就可以实现超时 3s 的控制。

res:= <-c2 是因为channel 可以实现阻塞,那么 time.After 为啥可以阻塞呢?

看 After 源码。sleep.go 可以看到其实也是 channel

func After(d Duration) <-chan Time { return NewTimer(d).C}

完整代码示例:

package timeoutimport ( “fmt” “testing” “time”)func TestSelectTimeOut(t *testing.T) { // 在这个例子中, 假设我们执行了一个外部调用, 2秒之后将结果写入c1 c1 := make(chan string, 1) go func() { time.Sleep(time.Second * 2) c1 <- “result 1” }() // 这里使用select来实现超时, `res := <-c1`等待通道结果, // `<- Time.After`则在等待1秒后返回一个值, 因为select首先 // 执行那些不再阻塞的case, 所以这里会执行超时程序, 如果 // `res := <-c1`超过1秒没有执行的话 select { case res := <-c1: fmt.Println(res) case <-time.After(time.Second * 1): fmt.Println(“timeout 1”) } // 如果我们将超时时间设为3秒, 这个时候`res := <-c2`将在 // 超时case之前执行, 从而能够输出写入通道c2的值 c2 := make(chan string, 1) go func() { time.Sleep(time.Second * 2) c2 <- “result 2” }() select { case res := <-c2: fmt.Println(res) case <-time.After(time.Second * 3): fmt.Println(“timeout 2”) }}

运行结果:

=== RUN   TestSelectTimeOut timeout 1 result 2 — PASS: TestSelectTimeOut (3.00s) PASS

go timer 计时器

这个是 timer 类似的计时器实现,通用也是通过通道来发送数据。

package mainimport “time”import “fmt”func main() { // Ticker使用和Timer相似的机制, 同样是使用一个通道来发送数据。 // 这里我们使用range函数来遍历通道数据, 这些数据每隔500毫秒被 // 发送一次, 这样我们就可以接收到 ticker := time.NewTicker(time.Millisecond * 500) go func() { for t := range ticker.C { fmt.Println(“Tick at”, t) } }() // Ticker和Timer一样可以被停止。 一旦Ticker停止后, 通道将不再 // 接收数据, 这里我们将在1500毫秒之后停止 time.Sleep(time.Millisecond * 1500) ticker.Stop() fmt.Println(“Ticker stopped”)} go context

context 监听是否有 IO 操作,开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号,可发送取消信号,取消所有进行中的网络请求。

go func(ctx context.Context, info *Info) { timeLimit := 120 timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeLimit)*time.Millisecond) defer func() { cancel() wg.Done() }() resp := DoHttp(timeoutCtx, info.req) }(ctx, info)

关键看业务代码: resp := DoHttp(timeoutCtx, info.req) 业务代码中包含 http 调用 NewRequestWithContext

req, err := http.NewRequestWithContext(ctx, “POST”, url, strings.NewReader(paramString))

上面的代码,设置了过期时间,当DoHttp(timeoutCtx, info.req) 处理时间超过超时时间时,会自动截止,并且打印 context deadline exceeded。

看个代码:

package mainimport ( “context” “fmt” “testing” “time”)func TestTimerContext(t *testing.T) { now := time.Now() later, _ := time.ParseDuration(“10s”) ctx, cancel := context.WithDeadline(context.Background(), now.Add(later)) defer cancel() go Monitor(ctx) time.Sleep(20 * time.Second)}func Monitor(ctx context.Context) { select { case <-ctx.Done(): fmt.Println(ctx.Err()) case <-time.After(20 * time.Second): fmt.Println(“stop monitor”) }}

运行结果:

=== RUN   TestTimerContext context deadline exceeded — PASS: TestTimerContext (20.00s) PASS

Context 接口有如下:

type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{}} Deadline — 返回 context.Context 被取消的时间,也就是完成工作的截止日期; Done — 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消之后关闭,多次调用 Done 方法会返回同一个 Channel; Err — 返回 context.Context 结束的原因,它只会在 Done 返回的 Channel 被关闭时才会返回非空的值; 如果 context.Context 被取消,会返回 Canceled 错误; 如果 context.Context 超时,会返回 DeadlineExceeded 错误; Value — 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据;

到此这篇关于Go语言

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。