Spark 核心编程
-
Spark 核心编程
Spark Core是整个项目的基础。它提供了分布式任务分配,调度和基本的I/O功能。Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,该结构是跨机器分区的逻辑数据集合。RDD可以通过两种方式创建:一种是通过引用外部存储系统中的数据集,第二种是通过对现有RDD进行转换(例如,映射,过滤器,化简,联接)。RDD抽象是通过语言集成的API公开的。这简化了编程的复杂性,因为应用程序处理RDD的方式类似于处理本地数据集合。 -
Spark Shell
Spark提供了一个交互式Shell - 一个强大的工具,可以交互式地分析数据。它支持Scala或Python语言。Spark的主要抽象是称为弹性分布数据集(RDD)的项目的分布式集合。可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD创建RDD。打开spark shell以下命令用于打开Spark Shell。$ spark-shell
创建简单的RDD让我们从文本文件创建一个简单的RDD。使用以下命令创建一个简单的RDD。scala> val inputfile = sc.textFile(“input.txt”)
上面命令的输出是inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API引入了一些2转换和一些的操作来操纵RDD。 -
RDD 转换
RDD转换返回指向新RDD的指针,并允许您在RDD之间创建依赖关系。依赖关系链中的每个RDD(依赖关系的字符串)都具有计算其数据的功能,并具有指向其父RDD的指针(依赖关系)。Spark是懒惰的,因此除非您调用将触发作业创建和执行的某些转换或操作,否则将不会执行任何操作。请看下面的单词计数示例片段。因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉Spark如何获取数据以及如何处理数据。下面给出了 RDD 转换的列表。转换 意义 map(func) 返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的。 filter(func) 返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的。 flatMap(func) 与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)。 mapPartitions(func) 与map相似,但是分别在RDD的每个分区(块)上运行,因此func在类型T的RDD上运行时必须为Iterator <T>⇒Iterator <U>类型。 mapPartitionsWithIndex(func) 与map Partitions类似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func的类型必须为(Int,Iterator <T>)⇒Iterator <U>。 sample(withReplacement, fraction, seed) 使用给定的随机数生成器种子,对一部分数据进行抽样,无论是否进行替换。 union(otherDataset) 返回一个新的数据集,其中包含源数据集中的元素与参数的并集。 intersection(otherDataset) 返回一个新的RDD,其中包含源数据集中的元素和参数的交集。 distinct([numTasks]) 返回一个新的数据集,其中包含源数据集的不同元素。 groupByKey([numTasks]) 在(K,V)对的数据集上调用时,返回(K,Iterable <V>)对的数据集。注–如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或AggregateByKey将产生更好的性能。 reduceByKey(func, [numTasks]) 在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func进行汇总,该函数必须为(V,V)⇒V类型与groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。 sortByKey([ascending], [numTasks]) 在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值升序参数指定按键升序或降序排序。 join(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持外部联接。 cogroup(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为分组方式。 cartesian(otherDataset) 在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集。 pipe(command, [envVars]) 通过shell命令通过管道传输RDD的每个分区Perl或bash脚本。将RDD元素写入进程的stdin,并将输出到其stdout的行作为字符串的RDD返回。 coalesce(numPartitions) 将RDD中的分区数减少到numPartitions。筛选大型数据集后,对于更有效地运行操作很有用。 repartition(numPartitions) 随机地重新随机排列RDD中的数据以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据。 repartitionAndSortWithinPartitions(partitioner) 根据给定的分区程序对RDD进行重新分区,并在每个结果分区中,按其键对记录进行排序。这比调用重新分区然后在每个分区内进行排序更有效,因为它可以将排序向下推入洗牌机制。 -
动作
下表列出了返回值的动作列表。动作 意义 reduce(func) 使用函数func(使用两个参数并返回一个参数)聚合数据集的元素。 该函数应该是可交换的和关联的,以便可以并行正确地计算它。 collect() 在驱动程序中将数据集的所有元素作为数组返回。 这通常在返回足够小的数据子集的过滤器或其他操作之后很有用。 count() 返回数据集中的元素数。 first() 返回数据集的第一个元素(类似于take(1))。 take(n) 返回具有数据集前n个元素的数组。 takeSample (withReplacement,num, [seed]) 返回一个数组,该数组包含数据集的num个元素的随机样本,有或没有替换,可以选择预先指定一个随机数生成器种子。 takeOrdered(n, [ordering]) 使用自然顺序或自定义比较器返回RDD的前n个元素。 saveAsTextFile(path) 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark在每个元素上调用toString将其转换为文件中的一行文本。 saveAsSequenceFile(path) (Java and Scala) 将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径中。这在实现Hadoop的Writable接口的键/值对的RDD上可用。在Scala中,它也可用于隐式转换为Writable的类型(Spark包括对基本类型(如Int,Double,String等)的转换。 saveAsObjectFile(path) (Java and Scala) 使用Java序列化以简单格式写入数据集的元素,然后可以使用SparkContext.objectFile()进行加载。 countByKey() 仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数。 foreach(func) 在数据集的每个元素上运行函数func。通常这样做是出于副作用,例如更新累加器或与外部存储系统交互。注意-在foreach()之外修改除Accumulators以外的变量可能会导致未定义的行为。有关更多详细信息,请参见了解闭包。 -
用RDD编程
让我们借助示例来了解RDD编程中一些RDD转换和动作的实现。例 - 考虑一个单词计数示例-它计算文档中出现的每个单词。将以下文本视为输入,并将其另存为主目录中的input.txt文件。input.txt-输入文件。people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
请按照下面给出的步骤执行给定的示例。打开Spark-Shell以下命令用于打开Spark-Shell。通常,使用Scala构建spark。因此,Spark程序在Scala环境中运行。$ spark-shell
如果Spark Shell成功打开,则将找到以下输出。在开始程序的第一步之前,应创建SparkContext对象。To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://localhost:4040 Spark context available as 'sc' (master = local[*], app id = local-1608794330204). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_262) Type in expressions to have them evaluated. Type :help for more information. scala>
创建一个RDD首先,我们必须使用Spark-Scala API读取输入文件并创建一个RDD。以下命令用于从给定位置读取文件。在这里,使用输入文件的名称创建新的RDD。在textFile("") 方法中作为参数给出的String是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。scala> val inputfile = sc.textFile("input.txt")
执行字数转换我们的目的是计算文件中的单词数。创建一个平面地图,将每行分割成多个单词(flatMap(line⇒line.split(" "))。接下来,使用映射函数(map(word⇒(word,1)),将每个单词作为键读取,值为'1'(<key,value> = <word,1>)。最后,通过添加相似键的值(reduceByKey(_ + _))来减少这些键。以下命令用于执行字数逻辑。执行此操作后,您将找不到任何输出,因为这不是操作,而是转换。指向新的RDD或告知Spark如何处理给定的数据)scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
当前RDD在使用RDD时,如果您想了解当前的RDD,请使用以下命令。它将向您显示有关当前RDD及其调试依赖关系的描述。scala> counts.toDebugString
缓存转换您可以使用其上的persist()或cache()方法将RDD标记为要持久。第一次在操作中对其进行计算时,它将被保存在节点上的内存中。使用以下命令将中间转换存储在内存中。scala> counts.cache()
应用动作应用动作(如存储所有转换)将结果生成文本文件。saveAsTextFile("") 方法的String参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。scala> counts.saveAsTextFile("output")
检查输出打开另一个终端以转到主目录(在另一个终端中执行spark)。使用以下命令检查输出目录。[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1
part-00000 part-00001 _SUCCESS
以下命令用于查看Part-00000文件的输出。[hadoop@localhost output]$ cat part-00000
输出(people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
以下命令用于查看Part-00001文件的输出。[hadoop@localhost output]$ cat part-00001
输出(walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
-
取消持久存储
取消永久保留之前,如果要查看用于该应用程序的存储空间,请在浏览器中使用以下URL。http://localhost:4040
您将看到以下屏幕,其中显示了用于应用程序的存储空间,这些存储空间正在Spark Shell上运行。如果要取消永久保留特定RDD的存储空间,请使用以下命令。Scala> counts.unpersist()
您将看到如下输出:2020/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 2020/06/27 00:57:33 INFO BlockManager: Removing RDD 9 2020/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 2020/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 2020/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 2020/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
要验证浏览器中的存储空间,请使用以下URL。http://localhost:4040/
您将看到以下屏幕。它显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。