【译】异步计算:Web服务器+ Dask

原文链接: Asynchronous Computation: Web Servers + Dask

让我们想象一个简单的Web服务器,它既可以快速加载页面,也可以在较慢的加载页面上执行一些计算。在我们的例子中,这将是一个简单的Fibonnaci服务应用程序,但您可以想象替换fib函数在一些输入数据上运行机器学习模型,从数据库中获取结果等。

In [1]:
import tornado.ioloop
import tornado.web

def fib(n):
    if n < 2:
        return n
    else:
        return fib(n - 1) + fib(n - 2)

class FibHandler(tornado.web.RequestHandler):
    def get(self, n):
        result = fib(int(n))
        self.write(str(result))
        
class FastHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello!") 
        
def make_app():
    return tornado.web.Application([
        (r"/fast", FastHandler),
        (r"/fib/(\d+)", FibHandler),
    ])

app = make_app()
app.listen(8000)
Out[1]:
<tornado.httpserver.HTTPServer at 0x11911e5f8>

速度

我们知道用户会将响应时间快速与否和网站权威内容和信任联系起来,因此我们希望衡量页面加载的速度。我们重点感兴趣的在于,在模拟我们的web服务器为许多用户提供服务时,在许多同时加载请求期间执行此操作的耗时。

In [2]:
import tornado.httpclient

client = tornado.httpclient.AsyncHTTPClient()

import tornado.gen
from time import time

@tornado.gen.coroutine
def measure(url, n=100):
    """ Get url n times concurrently.  Print duration. """
    start = time()
    futures = [client.fetch(url) for i in range(n)]
    results = yield futures
    end = time()
    print(url, ', %d simultaneous requests, ' %  n, 'total time: ', (end - start))

耗时

我们来看看以下几种执行情况的执行耗时

  • Tornado 一次执行 fast 返回耗时 10ms
    (实际上在我的机器上这个执行要更快)
  • 执行100次 fast 大约在 100ms 左右, 所以这个是比较好的接近并发的效率
    (实际上,tornado在没有特殊处理的时候,仍然是一个串型执行的过程,原文给的例子中看不出来,但是从我的执行结果中可以很好的看出来,实际上并没有并发)
  • 调用 fib 函数需要的时间
  • 调用 fib 100次需要大约100倍的时间,并没有那么多的并行性效率 (递归执行,这个差距更加明显)
In [3]:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests,  total time:  0.006060123443603516
In [4]:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=100)
http://localhost:8000/fast , 100 simultaneous requests,  total time:  0.2758209705352783
In [5]:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=1)
http://localhost:8000/fib/28 , 1 simultaneous requests,  total time:  0.16449999809265137
In [6]:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=100)
http://localhost:8000/fib/28 , 100 simultaneous requests,  total time:  17.069420099258423

异步阻塞

在下面的示例中,我们看到对路由fib/ 的一次缓慢调用会阻塞其他更快的请求:

In [7]:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/35', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests,  total time:  5.0035529136657715
http://localhost:8000/fib/35 , 1 simultaneous requests,  total time:  5.003870010375977

讨论

这里存在两个问题:

  • 我们所有的fib调用都是独立的,我们希望与多个内核或附近的集群并行运行这些计算。

  • 我们缓慢的计算密集的fib函数的请求可能会妨碍我们的快速请求。一个慢用户可以影响其他所有人。

使用dask 进行异步进程计算

要解决这两个问题,我们将使用Dask将计算派发到其他进程或计算机中。因为Dask是一个异步框架,它可以很好地与Tornado或Asyncio集成。

In [8]:
from dask.distributed import Client
dask_client = Client(asynchronous=True)  # use local processes for now


def fib(n):
    if n < 2:
        return n
    else:
        return fib(n - 1) + fib(n - 2)

    
class FibHandler(tornado.web.RequestHandler):
    async def get(self, n):
        future = dask_client.submit(fib, int(n))  # submit work to happen elsewhere
        result = await future
        self.write(str(result))

        
class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        self.write("Hello, world")

        
def make_app():
    return tornado.web.Application([
        (r"/fast", MainHandler),
        (r"/fib/(\d+)", FibHandler),
                
    ])

app = make_app()
app.listen(9000)
Out[8]:
<tornado.httpserver.HTTPServer at 0x1190f9550>

性能变化

通过将 fib 派发到dask其他进程上,我们解决了两件事情

并行计算

我们现在支持在更少的时间处理更多的请求了。接下来的测验将会测验同时发起20个访问 fib(28) 的请求。在旧版本中,我们在几秒钟内按顺序计算这些(当浏览器完成时最后一个请求也会等待同样长的时间)。而在这个新版本中,许多都可以并行计算,所以这些请求都将会在几百毫秒内得到结果。

In [11]:
# Before parallelism
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=20)
http://localhost:8000/fib/28 , 20 simultaneous requests,  total time:  3.5174269676208496
In [12]:
# After parallelism
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fib/28', n=20)
http://localhost:9000/fib/28 , 20 simultaneous requests,  total time:  0.3605771064758301

异步计算

在之前当一个请求在忙于计算 fib(...) 陷入阻塞 的时候 Tornado 同样也被阻塞了。此时 Tornado 它无法处理任何其他请求。当我们的服务提供这些昂贵、便宜的计算时,这将会成为严重的问题,开销少的请求会因为这种不必要的请求而挂起。

由于Dask能够与Tornado或Asyncio等异步系统集成,因此我们的Web服务器可以在多个请求之间自由跳转,即使在后台进行计算时也是如此。在下面的例子中,我们看到即使首先开始慢速计算,快速计算也只需几毫秒就会返回。

In [9]:
# Before async
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/35', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
http://localhost:8000/fast , 1 simultaneous requests,  total time:  4.998321056365967
http://localhost:8000/fib/35 , 1 simultaneous requests,  total time:  5.000464916229248
In [10]:
# After async
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fib/35', n=1) 
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fast', n=1) # 这个请求不会被阻塞
http://localhost:9000/fast , 1 simultaneous requests,  total time:  0.021255016326904297
http://localhost:9000/fib/35 , 1 simultaneous requests,  total time:  5.034911870956421

其他注意点

在这些情况下,今天人们倾向于使用 concurrent.futuresCelery

  • concurrent.futures 允许在一台机器上轻松实现并行性,并且可以很好地集成到异步框架中。 API正是我们上面展示的(Dask实现了concurrent.futures API)。但是,concurrent.futures不容易扩展到群集。
  • Celery更容易扩展到多台机器,但是具有更高的延迟,不能很好地缩小,并且需要一些努力来集成到异步框架中(或者至少这是我的理解,我的经验很浅)

在这种情况下,Dask提供了两者的一些好处。它很容易在常见的单机情况下进行设置和使用,但也可以扩展到集群。它与异步框架很好地集成,只增加了非常小的延迟。

In [13]:
async def f():
    start = time()
    result = await dask_client.submit(lambda x: x + 1, 10)
    end = time()
    print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))
    
tornado.ioloop.IOLoop.current().add_callback(f)
Roundtrip latency: 14.16 ms

Comments !