超时和取消
发布者:admin 发表于:417天前 阅读数:679 评论:0

在编写并发代码时,超时和取消会频繁出现。我们将在本节中看到,超时对于创建一个可以健壮易读的程序至关重要。取消是对超时的回应。我们还将探讨在并发进程中引发取消的其他原因。

那么,我们为什么需要并发程序支持超时呢?

系统饱和

正如我们在“队列”部分所讨论的那样,如果系统已经达到最大负荷(即,它的处理请求的能力达到了极限),我们可能希望系统的请求超时而不是花很长时间等待。你选择哪条路线取决于你的实际业务,但这里有一些关于何时触发超时的一般性指导:

如果请求在超时情况下不太可能重复发送。

如果没有资源来存储请求(例如,临时队列的内存,持久队列的磁盘空间)。

如果请求或其发送的数据的过期(我们将在下面讨论)。 如果一个请求可能会重复发生,那么系统将会接受并超时请求。 如果开销超过我们系统的容量,这可能导致死亡螺旋。 但是,如果我们缺乏将请求存储在队列中所需的系统资源,这是一个有争议的问题。 即使我们符合这两条准则,将该请求加入队列也没有什么意义,只要我们可以处理请求,请求就会过期。 这给我们带来了超时的下一个理由。

数据过期

有时数据有一个窗口,在这个窗口中必须优先处理部分相关数据,或者处理数据的需求已过期。如果一个并发进程比这个窗口花费更长的时间来处理数据,我们希望超时并取消该进程。例如,如果某个并发进程在长时间等待后发起请求,则在队列中请求或其数据可能已过期。

如果这个窗口已经事先知晓,那么将context.WithDeadline或context.WithTimeout创建的context.Context传递给我们的并发进程是有意义的。 如果不是,我们希望并发进程的父节点能够在需求不再需要时取消并发进程。 context.WithCancel完美适用于此目的。

防止死锁

在大型系统中,尤其是分布式系统中,有时难以理解数据流动的方式或系统边界可能出现的情况。这并非毫无道理,甚至有人建议将超时放置在所有并发操作上,以确保系统不会发生死锁。 超时时间不一定要接近执行并发操作所需的实际时间。 设置超时时间的目的仅仅是为了防止死锁,所以它只需要足够短以满足死锁系统会在合理的时间内解锁即可。

我们在“死锁,活锁和锁的饥饿问题”章节中提到过,通过设置超时来避免死锁可能会将问题从死锁变为活锁。在大型系统中,由于存在更多的移动部件,因此与死锁相比,系统遇到不同的时序配置文件的可能性更大。因此,最好有机会锁定并修复进程,而非直接让系统死锁最终不得不重启。

请注意,这不是关于如何正确构建系统的建议。而是建议你考思考在开发和测试期间的时间、时序问题。我建议你使用超时,但是目标应该集中在一个没有死锁的系统上,在这种系统中,超时基本不会触发。

现在我们了解了何时使用超时,让我们将注意力转向取消,以及如何构建并发进程以优雅地处理取消。并发进程可能被取消的原因有很多:

超时

超时是隐式的取消操作。

用户干预

为了获得良好的用户体验,通常建议如果启动长时间运行的进程时,向服务器做轮询将状态报告给用户,或允许用户查看他们的状态。当面向用户的并发操作时,有时需要允许用户取消他们已经开始的操作。

父节点取消

如果作为子节点的任何父节点停止,我们应当执行取消。

重复请求

我们可能希望将数据发送到多个并发进程,以尝试从其中一个进程获得更快的响应。 当收到响应时,需要取消其余的处理。 我们将在“重复请求”一节中详细讨论。

此外,也可能有其他的原因。然而,“为什么”这个问题并不像“如何”这样的问题那么困难或有趣。在第4章中,我们探讨了两种取消并发进程的方法:使用done通道和context.Context类型。 但这里我们要探索更复杂的问题:当一个并发进程被取消时,这对正在执行的算法及其下游消费者意味着什么?在编写可随时终止的并发代码时,需要考虑哪些事项?

为了回答这些问题,我们需要探索的第一件事是并发进程的可抢占性。下面是一个简单的例子:

var value interface{}
select {
case <-done:
    return
case value = <-valueStream:
}

result := reallyLongCalculation(value)

select {
case <-done:
    return
case resultStream <- result:
}

我们已经将valueStream的读取和resultStream的写入耦合起来,并检查done通道,看看goroutine是否已被取消,但这里存在问题。reallyLongCalculation看起来并不会执行抢占操作,而且根据名字,它看起来可能需要很长时间。这意味着,如果在reallyLongCalculation正在执行时某些事件试图取消这个goroutine,则可能需要很长时间才能确认取消并停止。让我们试着让reallyLongCalculation抢占进程,看看会发生什么:

reallyLongCalculation := func(done <-chan interface{}, value interface{}) interface{} {
    intermediateResult := longCalculation(value)
    select {
    case <-done:
        return nil
    default:
    }

    return longCaluclation(intermediateResult)
}

我们已经取得了一些进展:reallyLongCalculation现在可以抢占进程。但问题依然存在:我们只能对调用该函数的地方进行抢占。为了解决这个问题,我们需要继续调整

reallyLongCalculation := func(done <-chan interface{}, value interface{}) interface{} {
    intermediateResult := longCalculation(done, value)
    return longCaluclation(done, intermediateResult)
}

如果将这一推理结果归纳一下,我们会看到当前必须做两件事:定义并发进程可抢占的时间段,并确保任何花费比此时间段更多时间的函数本身是可抢占的。一个简单的方法就是将你的goroutine分解成更小的部分。 你应该瞄准所有不可抢占的原子操作,以便在更短的时间内完成。

这里还存在另一个问题:如果goroutine恰好修改共享状态(例如,数据库,文件,内存数据结构),那么当goroutine被取消时会发生什么?goroutine是否尝试回滚? 这项工作需要多长时间?既然goroutine已经开始运行,它应该停下来,所以这个时间不应该花太长的时间来执行回滚,对吧?

如何处理这个问题很难给出一般性的建议,因为算法的性质决定了你如何处理这种情况; 如果在较小的范围内保留对任何共享状态的修改,无论是否需要确保这些修改回滚,通常都可以很好地处理。如果可能的话,在内存中建立临时存储,然后尽可能快地修改状态。作为一个例子,这是错误的做法:

result := add(1, 2, 3)
writeTallyToState(result)
result = add(result, 4, 5, 6)
writeTallyToState(result)
result = add(result, 7, 8, 9)
writeTallyToState(result)

我们在这里向state写如三次。如果运行此代码的goroutine在最终写入之前被取消,我们需要以某种方式回滚之前的写入。对比这种方法:

result := add(1, 2, 3, 4, 5, 6, 7, 8, 9)
writeTallyToState(result)

这里需要担心的回滚范围要小得多。 如果在我们调用writeToState之后取消,仍然需要一种方法来退出更改,但发生这种情况的可能性会很小,因为我们只修改一次状态。

你需要关心的另一个问题是消息重复。假设你的管道有三个阶段:generator阶段,A阶段和B阶段。generator阶段监控A阶段,跟踪自上次从其通道读取以来的时间长度,如果当前实例不正常,就创建一个新实例 A2。如果发生这种情况,阶段B可能会收到重复的消息(图5-1)。

可以在此看到,如果在阶段A已经将阶段B的结果发送到阶段B后取消消息进入,则阶段B可能会收到重复的消息。

有几种方法可以避免这样的情况发生。最简单的方法(以及我推荐的方法)是,在子例程已经报告结果后,父例程不再发送取消信号。这需要各个阶段之间的双向通信,我们将在“心跳”一节中详细介绍。其他方法是:

接受返回的第一个或最后一个结果

如果你的算法允许,或者你的并发进程是幂等的,那么可以简单地在下游进程中允许重复消息,并选择是否接受你收到的第一条或最后一条消息。

检查goroutine许可

可以使用与父节点的双向通信来明确请求发送消息的权限。这种方法类似于心跳。它看起来像这样。

因为我们明确要求许可执行写入B的通道,所以这是比心跳更安全的方法。然而在实践中很少这样做,因为它比心跳更复杂,所以我建议你只是使用心跳。

在设计并发进程时,一定要考虑超时和取消。像软件工程中的许多其他技术问题一样,如果在项目初期忽略超时和取消,然后尝试将它们放在项目后期加入,有点像试图在蛋糕烘烤后再将蛋添加到蛋糕中。