`
bit1129
  • 浏览: 1051565 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark101】Scala Promise/Future在Spark中的应用

 
阅读更多

Promise和Future是Scala用于异步调用并实现结果汇集的并发原语,Scala的Future同JUC里面的Future接口含义相同,Promise理解起来就有些绕。等有时间了再仔细的研究下Promise和Future的语义以及应用场景,具体参见Scala在线文档:http://docs.scala-lang.org/sips/completed/futures-promises.html

 

如下代码来自于BlockTransferService的fetchBlockSync方法,因为只是拉取一个Block的数据,Spark在此处定义为同步获取,而不是异步获取。异步获取的实现是BlockTransferService的fetchBlocks方法,它可以批量获取多个Blocks,返回结果放于回调函数的ManageBuffer中了。

 

如下代码,首先定义了Promis类型的result变量,Promise将放入ManagedBuffer类型的数据,一旦放入,那么Promise.future将从等待结果的状态中返回。因此,Promise的语义可以理解为Promise会在某个时间点放入一个数据,而Promise.future的语义是等待这个值的放入,放入完成后future从阻塞等待的状态立即返回。

 

Promise数据的放入是通过Promise.success和Promise.failure操作实现的,分别表示放入了异步操作得到正确的结果和异步操作失败而放入失败的结果。

 

 

 

 

  /**
   * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
   *
   * It is also only available after [[init]] is invoked.
   */
  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    //创建Promise对象result,result.future将等待Promise对象写入数据
    val result = Promise[ManagedBuffer]()
    //通过fetchBlocks发起异步获取Block的请求,请求返回后根据调用结果调用BlockFetchingListener的onBlockFetchFailure或者onBlockFetchSuccess方法,在两个方法中
    ///为Promise变量写入请求返回的数据值,此后,result.future将从等待状态返回
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          ret.put(data.nioByteBuffer())
          ret.flip()
          result.success(new NioManagedBuffer(ret))
        }
      })
    //Await.result是Scala并发框架提供的同步等待原语,它等待的事件是它的第一个参数,此处是result.future
    Await.result(result.future, Duration.Inf)
  }

 

从下面Await的result方法可以看到,result.future是一个Awaitable类型的实例,即Future实现了Awaitable,

    @throws(classOf[Exception])
    def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))
  }

 

Awaitable接口的注释:

/**
 * An object that may eventually be completed with a result value of type `T` which may be
 * awaited using blocking methods.
 * 
 * The [[Await]] object provides methods that allow accessing the result of an `Awaitable`
 * by blocking the current thread until the `Awaitable` has been completed or a timeout has
 * occurred.
 */
trait Awaitable[+T] {

 

Future接口

 

/** The trait that represents futures.
 *
 *  Asynchronous computations that yield futures are created with the `future` call:
 *
 *  {{{
 *  val s = "Hello"
 *  val f: Future[String] = future {
 *    s + " future!"
 *  }
 *  f onSuccess {
 *    case msg => println(msg)
 *  }
 *  }}}
 *
 *  @author  Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
 *
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - `Error` - errors are not contained within futures
 *  - `InterruptedException` - not contained within futures
 *  - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
 *
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *  If a future is failed with a `scala.runtime.NonLocalReturnControl`,
 *  it is completed with a value from that throwable instead.
 *
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 *
 *  @define forComprehensionExamples
 *  Example:
 *
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *
 *  is translated to:
 *
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 *
 * @define callbackInContext
 * The provided callback always runs in the provided implicit
 *`ExecutionContext`, though there is no guarantee that the
 * `execute()` method on the `ExecutionContext` will be called once
 * per callback or that `execute()` will be called in the current
 * thread. That is, the implementation may run multiple callbacks
 * in a batch within a single `execute()` and it may run
 * `execute()` either immediately or asynchronously.
 */
trait Future[+T] extends Awaitable[T] {

 

 

关于Promise的用法,http://stackoverflow.com/questions/13381134/what-are-the-use-cases-of-scala-concurrent-promise解释了基本含义:

The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.

A Promise is, by analogy(与此类似), the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.

Regarding your first question, how can it be that for a promise p we have p.future == p. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.

 

Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.

But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this

 

    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response
        })
        p.future
    }

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics