V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
OneAPM
V2EX  ›  程序员

新手福利: Apache Spark 入门攻略 - Part 2

  •  6
     
  •   OneAPM ·
    oneapm · 2015-07-15 13:51:18 +08:00 · 2566 次点击
    这是一个创建于 3208 天前的主题,其中的信息可能已经有所发展或是发生改变。

    接上一篇: http://v2ex.com/t/205851

    六、RDD持久性

    Apache Spark 中一个主要的能力就是在集群内存中持久化/缓存 RDD 。这将显著地提升交互速度。下表显示了 Spark 中各种选项。

    Storage Level |Purpose|
    --|---|
    MEMORY_ONLY (Default level)| This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed.|
    MEMORY_AND_DISK |This option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed.|
    MEMORY_ONLY_SER| This options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed.|
    MEMORY_ONLY_DISK_SER |This option is same as above except that disk is used when memory is not sufficient.|
    DISC_ONLY |This option stores the RDD only on the disk|
    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |Same as other levels but partitions are replicated on 2 slave nodes|

    上面的存储等级可以通过 RDD. cache() 操作上的 persist () 操作访问,可以方便地指定 MEMORY_ONLY 选项。关于持久化等级的更多信息,可以访问这里 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。

    Spark 使用 Least Recently Used (LRU) 算法来移除缓存中旧的、不常用的 RDD ,从而释放出更多可用内存。同样还提供了一个 unpersist() 操作来强制移除缓存/持久化的 RDD 。

    七、变量共享

    Accumulators。Spark 提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题—— Accumulators 。Accumulators 在一个 Spark context 中通过默认值初始化,这些计数器在 Slaves 节点上可用,但是 Slaves 节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到 Master 。 Master 是唯一可以读取和计算所有更新合集的节点。举个例子:

    akuntamukkala@localhost~/temp$ cat output.log
    error
    warning
    info
    trace
    error
    info
    info
    scala> val nErrors=sc.accumulator(0.0)
    scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”)
    scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1)
    scala> nErrors.value
    Result:Int = 2

    Broadcast Variables。实际生产中,通过指定 key 在 RDDs 上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给 slave nodes 发送大体积数据集的情况,让其负责托管需要做 join 的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络 IO 比内存访问速度慢 100 倍。为了解决这个问题,Spark 提供了 Broadcast Variables,如其名称一样,它会向 slave nodes 进行广播。因此,节点上的 RDD 操作可以快速访问 Broadcast Variables 值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个 look-up table指定每种运输类型的成本,这个look-up table 就可以作为 Broadcast Variables 。

    akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground
    express
    media
    priority
    priority
    ground
    express
    media
    scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap
    map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10)
    scala> val bcMailRates = sc.broadcast(map)

    上述命令中,我们建立了一个 broadcast variable,基于服务类别成本的 map 。

    scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)

    在上述命令中,我们通过 broadcast variable 的 mailing rates 来计算运输成本。

    scala> pts.map(shipType=>(shipType,1)).reduceByKey(+). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()

    通过上述命令,我们使用 accumulator 来累加所有运输的成本。详细信息可通过下面的 PDF 查看 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。

    八、Spark SQL

    通过 Spark Engine,Spark SQL 提供了一个便捷的途径来进行交互式分析,使用一个被称为 SchemaRDD 类型的 RDD 。SchemaRDD 可以通过已有 RDDs 建立,或者其他外部数据格式,比如 Parquet files、JSON 数据,或者在 Hive 上运行 HQL。SchemaRDD 非常类似于 RDBMS 中的表格。一旦数据被导入 SchemaRDD,Spark 引擎就可以对它进行批或流处理。Spark SQL 提供了两种类型的 Contexts——SQLContext 和 HiveContext,扩展了 SparkContext 的功能。

    SparkContext 提供了到简单 SQL parser 的访问,而 HiveContext 则提供了到 HiveQL parser 的访问。HiveContext 允许企业利用已有的 Hive 基础设施。

    这里看一个简单的 SQLContext 示例。

    下面文本中的用户数据通过 “ | ” 来分割。

    John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854

    定义 Scala case class 来表示每一行:

    case class Customer(name:String,age:Int,gender:String,address: String)
    

    下面的代码片段体现了如何使用 SparkContext 来建立 SQLContext ,读取输入文件,将每一行都转换成 SparkContext 中的一条记录,并通过简单的 SQL 语句来查询 30 岁以下的男性用户。

    val sparkConf = new SparkConf().setAppName(“Customers”)
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))
    val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)
    sqlContext.sql(“select * from customers where gender=’M’ and age <
                30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,
                TX,75461]
    

    更多使用 SQL 和 HiveQL 的示例请访问下面链接 https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。

    p11

    九、Spark Streaming

    Spark Streaming 提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了 Spark 的简易编程模型。从真正意义上讲,Spark Streaming 会将流数据转换成 micro batches,从而将 Spark 批处理编程模型应用到流用例中。这种统一的编程模型让 Spark 可以很好地整合批量处理和交互式流分析。下图显示了 Spark Streaming 可以从不同数据源中读取数据进行分析。
    p12

    Spark Streaming 中的核心抽象是 Discretized Stream(DStream)。DStream 由一组 RDD 组成,每个 RDD 都包含了规定时间(可配置)流入的数据。上图很好地展示了 Spark Streaming 如何通过将流入数据转换成一系列的 RDDs,再转换成 DStream 。每个 RDD 都包含两秒(设定的区间长度)的数据。在 Spark Streaming 中,最小长度可以设置为 0.5 秒,因此处理延时可以达到 1 秒以下。

    Spark Streaming 同样提供了 window operators ,它有助于更有效率在一组 RDD ( a rolling window of time)上进行计算。同时,DStream 还提供了一个 API ,其操作符(transformations 和 output operators)可以帮助用户直接操作 RDD 。下面不妨看向包含在 Spark Streaming 下载中的一个简单示例。示例是在 Twitter 流中找出趋势 hashtags ,详见下面代码。

    spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
    val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)
    

    上述代码用于建立 Spark Streaming Context 。Spark Streaming 将在 DStream 中建立一个 RDD ,包含了每 2 秒流入的 tweets 。

    val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))
    

    上述代码片段将 Tweet 转换成一组 words ,并过滤出所有以 a# 开头的。

    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))
    

    上述代码展示了如何整合计算 60 秒内一个 hashtag 流入的总次数。

    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println(“\nPopular topics in last 60 seconds (%s
    total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s
    tweets)”.format(tag, count))} })
    

    上面代码将找出 top 10 趋势 tweets ,然后将其打印。

    ssc.start()
    

    上述代码让 Spark Streaming Context 开始检索 tweets 。一起聚焦一些常用操作,假设我们正在从一个 socket 中读入流文本。

    al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)
    

    p14


    更多 operators 请访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

    Spark Streaming 拥有大量强大的 output operators ,比如上文提到的 foreachRDD(),了解更多可访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。

    十、附加学习资源

    原文链接:Apache Spark:An Engine for Large-Scale Data Processing

    本文系 OneAPM 工程师编译整理。OneAPM 是中国基础软件领域的新兴领军企业,能帮助企业用户和开发者轻松实现:缓慢的程序代码和 SQL 语句的实时抓取。想阅读更多技术文章,请访问 OneAPM 官方博客

    1 条回复    2015-07-16 11:59:32 +08:00
    duobei
        1
    duobei  
       2015-07-16 11:59:32 +08:00
    好文,谢谢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1098 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 23:23 · PVG 07:23 · LAX 16:23 · JFK 19:23
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.