Promise和Future是Scala用于异步调用并实现结果汇集的并发原语,Scala的Future同JUC里面的Future接口含义相同,Promise理解起来就有些绕。等有时间了再仔细的研究下Promise和Future的语义以及应用场景,具体参见Scala在线文档:http://docs.scala-lang.org/sips/completed/futures-promises.html
如下代码来自于BlockTransferService的fetchBlockSync方法,因为只是拉取一个Block的数据,Spark在此处定义为同步获取,而不是异步获取。异步获取的实现是BlockTransferService的fetchBlocks方法,它可以批量获取多个Blocks,返回结果放于回调函数的ManageBuffer中了。
如下代码,首先定义了Promis类型的result变量,Promise将放入ManagedBuffer类型的数据,一旦放入,那么Promise.future将从等待结果的状态中返回。因此,Promise的语义可以理解为Promise会在某个时间点放入一个数据,而Promise.future的语义是等待这个值的放入,放入完成后future从阻塞等待的状态立即返回。
Promise数据的放入是通过Promise.success和Promise.failure操作实现的,分别表示放入了异步操作得到正确的结果和异步操作失败而放入失败的结果。
/** * A special case of [[fetchBlocks]], as it fetches only one block and is blocking. * * It is also only available after [[init]] is invoked. */ def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. //创建Promise对象result,result.future将等待Promise对象写入数据 val result = Promise[ManagedBuffer]() //通过fetchBlocks发起异步获取Block的请求,请求返回后根据调用结果调用BlockFetchingListener的onBlockFetchFailure或者onBlockFetchSuccess方法,在两个方法中 ///为Promise变量写入请求返回的数据值,此后,result.future将从等待状态返回 fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) //Await.result是Scala并发框架提供的同步等待原语,它等待的事件是它的第一个参数,此处是result.future Await.result(result.future, Duration.Inf) }
从下面Await的result方法可以看到,result.future是一个Awaitable类型的实例,即Future实现了Awaitable,
@throws(classOf[Exception]) def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable.result(atMost)(AwaitPermission)) }
Awaitable接口的注释:
/** * An object that may eventually be completed with a result value of type `T` which may be * awaited using blocking methods. * * The [[Await]] object provides methods that allow accessing the result of an `Awaitable` * by blocking the current thread until the `Awaitable` has been completed or a timeout has * occurred. */ trait Awaitable[+T] {
Future接口
/** The trait that represents futures. * * Asynchronous computations that yield futures are created with the `future` call: * * {{{ * val s = "Hello" * val f: Future[String] = future { * s + " future!" * } * f onSuccess { * case msg => println(msg) * } * }}} * * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang * * @define multipleCallbacks * Multiple callbacks may be registered; there is no guarantee that they will be * executed in a particular order. * * @define caughtThrowables * The future may contain a throwable object and this means that the future failed. * Futures obtained through combinators have the same exception as the future they were obtained from. * The following throwable objects are not contained in the future: * - `Error` - errors are not contained within futures * - `InterruptedException` - not contained within futures * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures * * Instead, the future is completed with a ExecutionException with one of the exceptions above * as the cause. * If a future is failed with a `scala.runtime.NonLocalReturnControl`, * it is completed with a value from that throwable instead. * * @define nonDeterministic * Note: using this method yields nondeterministic dataflow programs. * * @define forComprehensionExamples * Example: * * {{{ * val f = future { 5 } * val g = future { 3 } * val h = for { * x: Int <- f // returns Future(5) * y: Int <- g // returns Future(5) * } yield x + y * }}} * * is translated to: * * {{{ * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} * * @define callbackInContext * The provided callback always runs in the provided implicit *`ExecutionContext`, though there is no guarantee that the * `execute()` method on the `ExecutionContext` will be called once * per callback or that `execute()` will be called in the current * thread. That is, the implementation may run multiple callbacks * in a batch within a single `execute()` and it may run * `execute()` either immediately or asynchronously. */ trait Future[+T] extends Awaitable[T] {
关于Promise的用法,http://stackoverflow.com/questions/13381134/what-are-the-use-cases-of-scala-concurrent-promise解释了基本含义:
The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.
A Promise is, by analogy(与此类似), the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.
Regarding your first question, how can it be that for a promise p we have p.future == p
. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.
Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.
But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this
def makeHTTPCall(request: Request): Future[Response] = { val p = Promise[Response] registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) p success response }) p.future }
相关推荐
spark-scala-api zip压缩包
关于spark-scala的离线帮助文档
Spark&Scala安装教程,适合Spark和Scala的初学者
基于spark的scala maven实例项目两个简单的统计实例,适合初学者了解。 /** * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMaster来设置程序要链接的Spark集群的...
大数据实战项目商品推荐系统源码+项目说明(Spark、Scala、MongoDB).zip大数据实战项目商品推荐系统源码+项目说明(Spark、Scala、MongoDB).zip大数据实战项目商品推荐系统源码+项目说明(Spark、Scala、MongoDB)...
com/wordnik/swagger/config/ConfigFactory.scala com/wordnik/swagger/config/FilterFactory.scala com/wordnik/swagger/config/ScannerFactory.scala com/wordnik/swagger/config/SwaggerConfig.scala ...
spark中用scala编写累加器小程序统计指定文章中的空白行,然后通过split函数通过空格切分文章,输出到指定的目录中。
2:兼容java,在scala中可以直接调用java方法。 2:函数式编程,柯里化函数,匿名函数,高阶函数等。 3:代码行简单。 4:支持并发控制,Actor Model机制 5:目前比较流行的kafka,spark均由scala开发。
hadoop,spark,scala环境搭建,有详细步骤,一步一步跟着word操作就可以了。
Scala编程实战+Python+Spark 2.0+Hadoop机器学习与大数据实战+Spark快速大数据分析
sparkscala开发依赖包 ECLIPSE 开发IDE需要的对应依赖包
本书由直接参与Scala开发的一线人员编写,因而对原理的解读和应用的把握更加值得信赖。本书面向具有一定编程经验的开发者,目标是让读者能够全面了解和掌握Scala编程语言的核心特性,并能够深入理解Scala这门语言在...
首先需要安装好Java和Scala,然后下载Spark安装,确保PATH 和JAVA_HOME 已经设置,然后需要使用Scala的SBT 构建Spark如下: $ sbt/sbt assembly 构建时间比较长。构建完成后,通过运行下面命令确证安装成功: $ ....
spark scala认证和代理认证hadoop的kerberos代码示例。
spark,scala,spark系列文章测试案例,scala语言学习,spark案例,单词统计,RDD转DataFrame,地址:https://blog.csdn.net/2301_79691134/article/details/134174759
本资源收集了scala与大数据spark的基础的学习笔记,有兴趣的同学可以下载学习
原始用的jetty做的http接口,最近有时间,研究了下spring boot + scala + spark做大数据计算
本源码为基于Apache Spark的Scala大数据处理设计,共包含191个文件,其中crc文件56个,class文件39个,scala文件20个,bk文件10个,xml文件8个,txt文件2个,properties文件2个,idea/$PRODUCT_WORKSPACE_FILE$文件1...
使用IDEA开发spark scala程序,配置idea开发工具,使用hadoop进行文件搜索
适合初学大数据的学员,里面详细记载了从安装到开发的过程