加载中

Asynchronous code is hard. Everyone knows that. Writing asynchronous tests is even harder. Recently I fixed a flaky test and I want to share some thoughts about writing asynchronous tests.

In this post we explore a common problem with asynchronous tests—how to force a test to take a specific ordering between threads, and forcing some operations by some threads to complete before other operations by other threads. Normally we do not want to enforce ordering between the execution of different threads because it defeats the reason to use threads, which is to allow concurrency and to allow the CPU to select the best order of execution given the current resources and state of the application. But in the case of testing, deterministic ordering is sometimes required to ensure the test stability.

如果说异步代码不好写是共识的话,那么写异步代码测试用例就更难了。最近我刚刚完成了一个 Flaky 测试,所以想和大家分享一些关于写异步测试用例的想法。

这篇文章里,我们会探索一个关于异步测试用例的常见问题 —— 如何强制规定某些线程的顺序,如何强制某一个线程操作早于另一些执行。通常我们并不想强行规定线程之间的顺序,因为这违背了多线程的原则,所谓多线程就是为了做到并发,从而使得 CPU 可以根据当前资源及应用状态选择最佳的执行顺序。但是在测试中,为了确保测试结果的稳定性,又必须明确线程顺序。

Testing a Throttler

A throttler is a pattern in software that is responsible for limiting the number of concurrent operations to preserve some resource quota, like a connection pool, a networking buffer, or a CPU-intensive operation. Unlike other synchronization tools, the role of a throttler is to enable “fail-fast”, allowing the over-quota requests to fail immediately without waiting. Failing fast is important because the alternative, waiting, consumes resources—ports, threads, and memory.

Here  is a simple implementation of a throttler (basically it is a wrapper around a Semaphore; in the real world there could be waiting, retries, etc.):

class ThrottledException extends RuntimeException("Throttled!")
class Throttler(count: Int) {
  private val semaphore = new Semaphore(count)
  def apply(f: => Unit): Unit = {
    if (!semaphore.tryAcquire()) throw new ThrottledException
    try {
      f
    } finally {
      semaphore.release()
    }
  }
}

Let’s start with a basic unit test: testing the throttler for a single thread (we use specs2 for testing). In this test we are verifying that we can make more calls sequentially than the maximum number of concurrent calls for the throttler (the maxCount variable below). Note that because we are using a single thread, we do not test the throttler “fail-fast” feature as we do not saturate the throttler. In fact, we only test that while the throttler is not saturated it does not abort operations. 

class ThrottlerTest extends Specification {
  "Throttler" should {
    "execute sequential" in new ctx {
      var invocationCount = 0
      for (i <- 0 to maxCount) {
        throttler {
          invocationCount += 1
        }
      }
      invocationCount must be_==(maxCount + 1)
    }
  }
  trait ctx {
    val maxCount = 3
    val throttler = new Throttler(maxCount)
  }
}

测试节流阀(Throttler)

在软件业里节流阀指的是用于限制并发操作个数,预留资源的模式,好比连接池,网络缓存,或者 CPU 密集型操作。和其他同步工具不同的是,节流阀的角色是启动“快速失败”机制,即促使超额请求立即失败,而不是等待。“快速失败”机制之所以重要,是因为切换操作,等待操作会消耗资源 —— 端口,线程,内存等

以下就是一个节流阀的简单实现(基本上是信号量的包装,实际应用中应该是等待,重试等等)

class ThrottledException extends RuntimeException("Throttled!")
class Throttler(count: Int) {
  private val semaphore = new Semaphore(count)
  def apply(f: => Unit): Unit = {
    if (!semaphore.tryAcquire()) throw new ThrottledException
    try {
      f
    } finally {
      semaphore.release()
    }
  }
}

现在我们开始基本的单元测试:测试单线程的节流阀(我们使用测试框架 specs2)。本例里,我们会验证顺序调用是否会超过节流阀的最大限制(maxCount 变量如下所示)。注意,这里我们用的是单线程,所以我们并不验证节流阀的“快速失败”功能,这里的节流阀都处于不饱和状态。事实上,我们只会测试节流阀在不饱和状态下不会终止操作。

class ThrottlerTest extends Specification {
  "Throttler" should {
    "execute sequential" in new ctx {
      var invocationCount = 0
      for (i <- 0 to maxCount) {
        throttler {
          invocationCount += 1
        }
      }
      invocationCount must be_==(maxCount + 1)
    }
  }
  trait ctx {
    val maxCount = 3
    val throttler = new Throttler(maxCount)
  }
}

Testing a Throttler Asynchronously

In the previous test we did not saturate the throttler simply because it is not possible with a single thread. So the next step is to test that the throttler works well in a multithreaded environment.

The setup:

val e = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext=ExecutionContext.fromExecutor(e)
private val waitForeverLatch = new CountDownLatch(1)
 
override def after: Any = {
  waitForeverLatch.countDown()
  e.shutdownNow()
}
 
def waitForever(): Unit = try {
  waitForeverLatch.await()
} catch {
  case _: InterruptedException =>
  case ex: Throwable => throw ex
}

The ExecutionContext is used for Future construction; the waitForever method holds a thread until the latch is released—before the end of the test. In the after function, we shut down an executor service.

A simplistic way to test multithreaded behavior of the throttler is the following:

"throw exception once reached the limit [naive,flaky]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  throttler {} must throwA[ThrottledException]
}

Here we’re creating maxCount threads (the calls to Future {}) that call the waitForever function, which is waiting until the end of the test. Then we try to perform another operation to bypass the throttler—maxCount + 1. By design, at this point we should get a ThrottledException. However, while we wait for an exception, one may not happen. The last call for a throttler (with expectation) may occur before one of the futures has started (causing an exception to be thrown in this future but not at the expectation).

测试并发节流阀

前一个例子里,节流阀处于不饱和状态,因为单线程里节流阀一般都不会饱和。下面我们来测试一下多线程环境下节流阀是否还能工作良好。

设置如下:

val e = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext=ExecutionContext.fromExecutor(e)
private val waitForeverLatch = new CountDownLatch(1)
 
override def after: Any = {
  waitForeverLatch.countDown()
  e.shutdownNow()
}
 
def waitForever(): Unit = try {
  waitForeverLatch.await()
} catch {
  case _: InterruptedException =>
  case ex: Throwable => throw ex
}

ExecutionContext 用来构建 Future,waitForever 方法用来持有线程,直到测试结束前的锁释放。接下来的函数里,我们会关闭一个执行服务。

以下就是一个测试节流器多线程行为的例子:

"throw exception once reached the limit [naive,flaky]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  throttler {} must throwA[ThrottledException]

我们创建了 maxCount 个线程(调用 Future{})来调用 waitForever 函数,该函数会一直直到道测试结束。然后我们绕开节流阀执行另一个操作 —— maxCount + 1。预期的行为是,此时应该抛出 ThrottledException 例外。但是,也许预期的例外并不发生,因为接力器的最后的一个调用可能会比 future 里的先执行(future 里会抛出例外,但是这不是预期结果)。

The problem with the above test is that we do not ensure all the threads have started and are waiting in the waitForever function before we try to violate the throttler with the expected result of the throttler throwing an exception. To fix it, we need some way to wait until all futures start. Here is an approach that is familiar to many of us: just add a sleep method call with some reasonable duration.

"throw exception once reached the limit [naive, bad]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  Thread.sleep(1000)
  throttler {} must throwA[ThrottledException]
}

OK, now our test will almost always pass, but this approach is wrong for at least two reasons:
The duration of the test will last only as long as the “reasonable duration” we set.

In very rare situations, like when the machine is under high load,  this reasonable duration won’t be enough.

If you’re still in doubt,  search Google for more reasons.

A better approach is to synchronize the start of our threads (futures) and our expectation. Let’s use CountDownLatch class from java.util.concurrent:

"throw exception once reached the limit [working]" in new ctx {
  val barrier = new CountDownLatch(maxCount)
 
  for (i <- 1 to maxCount) {
    Future {
      throttler {
        barrier.countDown()
        waitForever()
      }
    }
  }
 
  barrier.await(5, TimeUnit.SECONDS) must beTrue
 
  throttler {} must throwA[ThrottledException]
}

We use CountDownLatch for barrier synchronization. The await method blocks the main thread until the latch count is zero. As the other threads run (let’s denote those other threads as the futures), each of those futures calls the barrier countDown method to lower the latch count by one. Once the latch count is zero, all the futures are inside the waitForever method.

上面这个测试的问题是,在像期望中那样节流阀抛出异常然后导致节流阀被违反之前,我们无法确定所有的线程都已经开始并且在 waitForever 函数中被阻塞。为了修复这个问题,我们需要一些方法去等待所有 future 开始。这有一个我们大多数都很熟悉的一种方法:只要增加一个 sleep 函数等待一些合适的时间。

"throw exception once reached the limit [naive, bad]" in new ctx {
  for (i <- 1 to maxCount) {
    Future {
      throttler(waitForever())
    }
  }
  Thread.sleep(1000)
  throttler {} must throwA[ThrottledException]
}

好了,现在这个测试几乎都能通过了,但是这个方法还是错的因为下面这两个原因:

测试持续的时间至少会和我们设置好的"合适的时间"差不多久。

在非常罕见的情况下,比如机器处于高负载的时候,这个合适的时间不一定足够。

如果你仍然感到疑惑,可以搜索一下 Google 更多的原因。

一个更好的方式是将我们的线程(future)的开始和我们期望的东西同步起来。我们来使用 java.util.concurrent 里面的 CountDownLatch 类:

"throw exception once reached the limit [working]" in new ctx {
  val barrier = new CountDownLatch(maxCount)
 
  for (i <- 1 to maxCount) {
    Future {
      throttler {
        barrier.countDown()
        waitForever()
      }
    }
  }
 
  barrier.await(5, TimeUnit.SECONDS) must beTrue
 
  throttler {} must throwA[ThrottledException]
}

我们使用 CountDownLatch 处理障碍同步。这个等待的方法会阻塞主线程直到锁存计数变为 0。随着其它线程的运行(我们把这些其它线程表示为 future),每一个 future 都会调用 countDown 方法使锁存计数减 1。一但计数变为 0,所有的 future 就都已经运行到 waitForever 方法中了。

By that point, we are ensured that the throttler is saturated, with maxCount threads inside it. An attempt by another thread to enter the throttler will result in an exception. We have a deterministic way to set up our test, which is to try and have the main thread enter the throttler. The main thread can and does resume at this point (the latch count reaches zero and the CountDownLatch releases the waiting thread).

We use a slightly higher timeout as a safeguard to avoid blocking infinitely if something unexpected happens. If something does happen, we fail the test. This timeout won’t affect the test duration because, unless something unexpected happens, we should not wait for it.

通过那一点,我们可以确保 throttler 是饱和的,内部有最大数量(maxCount)的线程。另一个线程试图进入 throttler 将导致异常。我们有一个确定的方式建立我们的测试,测试会有一个主线程进入 throttler。主线程可以恢复到这个点(门闩计数为 0 并等 CountDownLatch 释放等待线程)。

如果一些意想不到的事情发生,我们使用超时略高保障避免无限阻塞发生。如果这样的事情发生,我们的测试就失败了。这个超时不会影响到测试时间,除非发生意外情况,否则,我们都不应该等待。

Conclusion

When testing asynchronous code it is quite common to require a specific ordering of operations between threads for a specific test. Not using any synchronization results in flaky tests that sometimes work and sometimes fail. Using Thread.sleep slows down and reduces the flakiness of tests, but it does not solve the problem.

In most cases when we need to enforce ordering between threads in a test, we can use a CountDownLatch instead of Thread.sleep. The advantage of CountDownLatch is that we can tell it when to release the waiting (holding) thread, gaining two important benefits: deterministic ordering, and therefore more reliable tests, and faster running tests. Even for trivial waiting—for example, the waitForever function—we could have used something like Thread.sleep(Long.MAX_VALUE), but it’s always better not to use fragile approaches.

You can find the full code on GitHub.

结论

测试异步程序时,通常需要在具体的测试用例中指定多个线程之间的执行顺序。不使用任何同步策略的测试是不可靠的,测试结果有时成功有时失败。使用 Thread.sleep 降低了测试出错的概率,但没有完全解决这个问题。

在大多数情况下,当需要在测试中保证多个线程的执行顺序时,可以使用 CountDownLatch 代替 Thead.sleep。使用 CountDownlatch 的好处是通过它可以指定释放(保持)线程的时机,有两个优点:确保按顺序执行使测试结果更可靠;加快了测试程序的执行速度。即使对于普通的 waiting 操作,比如 waitForever 函数,尽管也可以使用 Thread.sleep(Long.MAX_VALUE) 这样的函数实现,但为了保证程序的健壮性最好不要这样做。

完整的代码可以在 GitHub 中找到。

返回顶部
顶部