Spark 高级编程
-
高级Spark编程
Spark包含两种不同类型的共享变量 - 一种是 Broadcast variables(广播变量),另一种是 Accumulators (累加器)。- 广播变量-用于有效地分配较大的值。
- 累加器-用于汇总特定集合的信息。
-
广播变量
广播变量使程序员可以在每台计算机上保留一个只读变量,而不是将其副本与任务一起发送。例如,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用有效的广播算法分配广播变量,以降低通信成本。Spark 动作是通过一组阶段执行的,这些阶段由分布式“随机”操作分开。Spark自动广播每个阶段中任务所需的通用数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着仅当跨多个阶段的任务需要相同数据或以反序列化形式缓存数据非常重要时,显式创建广播变量才有用。广播变量是通过调用SparkContext.broadcast(v) 从变量v创建的。broadcast变量是v的包装,可以通过调用value方法来访问其值。下面给出的代码显示了这一点-scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
输出-broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建广播变量之后,在集群上运行的任何函数中都应使用它代替值v,以使v不会多次传送到节点。另外,对象v广播后不应修改,以确保所有节点都具有相同的广播变量值。 -
累加器
累加器是仅通过关联操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在MapReduce中)或总和。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。如果使用名称创建累加器,它们将显示在Spark的UI中。这对于理解运行阶段的进度很有用(注意-Python尚不支持此功能)。通过调用SparkContext.longAccumulator("name")创建一个Long累加器。然后,可以使用add方法向集群中运行的任务添加集群。但是,他们无法读取其值。只有驱动程序才能使用其value方法读取累加器的值。下面给出的代码显示了一个累加器,用于累加一个数组的元素-scala> val accum = sc.longAccumulator("longAccum") scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
如果您想查看以上代码的输出,请使用以下命令-scala> accum.value
res2: Long = 10
-
数字RDD运算
Spark允许您使用预定义的API方法之一对数字数据执行不同的操作。Spark的数字运算是通过流算法实现的,该算法允许一次仅一个元素地构建模型。通过调用status()方法来计算这些操作并将其作为StatusCounter对象返回。以下是StatusCounter中可用的数值方法的列表。方法 描述 count() RDD中的元素数。 Mean() RDD中元素的平均值。 Sum() RDD中元素的总值。 Max() RDD中所有元素之间的最大值。 Min() RDD中所有元素中的最小值。 Variance() 元素的差异。 Stdev() 标准偏差。 如果只想使用这些方法之一,则可以直接在RDD上调用相应的方法。