Spark 部署
使用spark-submit的Spark应用程序是一个Shell命令,用于在群集上部署Spark应用程序。它通过统一的界面使用所有各自的集群管理器。因此,您不必为每个应用程序都配置您的应用程序。
例 - 让我们以以前使用shell命令的单词计数为例。在这里,我们考虑与Spark应用程序相同的示例。
样本输入
以下文本是输入数据,名为in.txt的文件。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
看下面的程序-
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount : Unit ={
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Seq(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Seq = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" ")).map(word ⇒ (word, 1)).reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存到名为 SparkWordCount.scala的文件中,并将其放置在名为spark-application的用户定义目录中。
注意-在将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件中)标记为单词,使用map()方法对单词频率进行计数,并使用reduceByKey()方法对每个单词重复进行计数。
使用以下步骤提交此申请。通过终端执行spark-application目录中的所有步骤。
步骤1:下载Spark Jar
编译需要Spark core jar,因此,请从以下链接
Spark核心jar下载spark-core_2.12-3.0.0.jar并将jar文件从下载目录移至spark-application目录。
步骤2:编译程式
使用下面给出的命令编译以上程序。该命令应从 spark-application 目录执行。在这里,/usr/local/spark/jar/hadoop-common-3.2.0.jar是从Spark库获取的Hadoop支持jar。
$ scalac -classpath "spark-core_2.12-3.0.0.jar:/usr/local/spark/jar/hadoop-common-3.2.0.jar" SparkWordCount.scala
步骤3:建立一个JAR
使用以下命令创建spark应用程序的jar文件。在这里,wordcount是jar文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class
步骤4:提交Spark申请
使用以下命令提交spark应用程序-
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,您将在下面找到输出。以下输出中的OK(确定)命令用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,将会发现其他内容,例如-
- 在端口36325上成功启动了服务“sparkDriver”
- MemoryStore开始时的容量为413.9 MiB
- 从http://192.168.61.201:4041启动SparkUI (Service 'SparkUI' could not bind on port 4040) 教程由于在前面开了一个spark-shell 也启动了SparkUI 占用了4040端口,这里会自动监听4041端口
- 添加了JAR文件:/root/spark-application/wordcount.jar at spark://localhost:36325/jars/wordcount.jar
- ResultStage 1(SparkPi.scala:saveAsTextFile:11)在0.540 秒内完成
- 在http://192.168.61.201:4041停止了Spark Web UI
- MemoryStore已清除
20/12/25 11:51:21 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.61.201 instead (on interface enp0s3)
20/12/25 11:51:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/12/25 11:51:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/12/25 11:51:22 INFO SparkContext: Running Spark version 3.0.0
20/12/25 11:51:22 INFO ResourceUtils: ==============================================================
20/12/25 11:51:22 INFO ResourceUtils: Resources for spark.driver:
20/12/25 11:51:22 INFO ResourceUtils: ==============================================================
20/12/25 11:51:22 INFO SparkContext: Submitted application: Word Count
20/12/25 11:51:22 INFO SecurityManager: Changing view acls to: root
20/12/25 11:51:22 INFO SecurityManager: Changing modify acls to: root
20/12/25 11:51:22 INFO SecurityManager: Changing view acls groups to:
20/12/25 11:51:22 INFO SecurityManager: Changing modify acls groups to:
20/12/25 11:51:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
20/12/25 11:51:23 INFO Utils: Successfully started service 'sparkDriver' on port 36325.
20/12/25 11:51:23 INFO SparkEnv: Registering MapOutputTracker
20/12/25 11:51:23 INFO SparkEnv: Registering BlockManagerMaster
20/12/25 11:51:23 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/12/25 11:51:23 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/12/25 11:51:23 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
20/12/25 11:51:23 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-27d998a6-84bb-42ee-b924-519055f345da
20/12/25 11:51:23 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
20/12/25 11:51:23 INFO SparkEnv: Registering OutputCommitCoordinator
20/12/25 11:51:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/12/25 11:51:23 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/12/25 11:51:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://localhost:4041
20/12/25 11:51:23 INFO SparkContext: Added JAR file:/root/spark-application/wordcount.jar at spark://localhost:36325/jars/wordcount.jar with timestamp 1608868283968
20/12/25 11:51:24 INFO Executor: Starting executor ID driver on host localhost
20/12/25 11:51:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44684.
20/12/25 11:51:24 INFO NettyBlockTransferService: Server created on localhost:44684
20/12/25 11:51:24 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/12/25 11:51:24 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, localhost, 44684, None)
20/12/25 11:51:24 INFO BlockManagerMasterEndpoint: Registering block manager localhost:44684 with 413.9 MiB RAM, BlockManagerId(driver, localhost, 44684, None)
20/12/25 11:51:24 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 44684, None)
20/12/25 11:51:24 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 44684, None)
20/12/25 11:51:25 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 293.7 KiB, free 413.6 MiB)
20/12/25 11:51:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.9 KiB, free 413.6 MiB)
20/12/25 11:51:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44684 (size: 26.9 KiB, free: 413.9 MiB)
20/12/25 11:51:25 INFO SparkContext: Created broadcast 0 from textFile at SparkWordCount.scala:14
20/12/25 11:51:26 INFO FileInputFormat: Total input files to process : 1
20/12/25 11:51:26 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
20/12/25 11:51:26 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
20/12/25 11:51:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
20/12/25 11:51:26 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/12/25 11:51:26 INFO SparkContext: Starting job: runJob at SparkHadoopWriter.scala:78
20/12/25 11:51:27 INFO DAGScheduler: Registering RDD 3 (map at SparkWordCount.scala:18) as input to shuffle 0
20/12/25 11:51:27 INFO DAGScheduler: Got job 0 (runJob at SparkHadoopWriter.scala:78) with 1 output partitions
20/12/25 11:51:27 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at SparkHadoopWriter.scala:78)
20/12/25 11:51:27 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
20/12/25 11:51:27 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
20/12/25 11:51:27 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkWordCount.scala:18), which has no missing parents
20/12/25 11:51:27 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.7 KiB, free 413.6 MiB)
20/12/25 11:51:27 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.9 KiB, free 413.6 MiB)
20/12/25 11:51:27 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44684 (size: 3.9 KiB, free: 413.9 MiB)
20/12/25 11:51:27 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1200
20/12/25 11:51:28 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkWordCount.scala:18) (first 15 tasks are for partitions Vector(0))
20/12/25 11:51:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
20/12/25 11:51:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7365 bytes)
20/12/25 11:51:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/12/25 11:51:28 INFO Executor: Fetching spark://localhost:36325/jars/wordcount.jar with timestamp 1608868283968
20/12/25 11:51:28 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:36325 after 92 ms (0 ms spent in bootstraps)
20/12/25 11:51:28 INFO Utils: Fetching spark://localhost:36325/jars/wordcount.jar to /tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237/userFiles-ee247a6f-3eda-4e39-a327-cb2180afe220/fetchFileTemp8416307429326821809.tmp
20/12/25 11:51:28 INFO Executor: Adding file:/tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237/userFiles-ee247a6f-3eda-4e39-a327-cb2180afe220/wordcount.jar to class loader
20/12/25 11:51:29 INFO HadoopRDD: Input split: file:/root/spark-application/in.txt:0+145
20/12/25 11:51:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1330 bytes result sent to driver
20/12/25 11:51:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1722 ms on localhost (executor driver) (1/1)
20/12/25 11:51:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/12/25 11:51:29 INFO DAGScheduler: ShuffleMapStage 0 (map at SparkWordCount.scala:18) finished in 2.061 s
20/12/25 11:51:29 INFO DAGScheduler: looking for newly runnable stages
20/12/25 11:51:29 INFO DAGScheduler: running: Set()
20/12/25 11:51:29 INFO DAGScheduler: waiting: Set(ResultStage 1)
20/12/25 11:51:29 INFO DAGScheduler: failed: Set()
20/12/25 11:51:29 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkWordCount.scala:22), which has no missing parents
20/12/25 11:51:29 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 85.6 KiB, free 413.5 MiB)
20/12/25 11:51:29 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 30.8 KiB, free 413.5 MiB)
20/12/25 11:51:29 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:44684 (size: 30.8 KiB, free: 413.9 MiB)
20/12/25 11:51:29 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1200
20/12/25 11:51:29 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkWordCount.scala:22) (first 15 tasks are for partitions Vector(0))
20/12/25 11:51:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
20/12/25 11:51:29 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, NODE_LOCAL, 7143 bytes)
20/12/25 11:51:29 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
20/12/25 11:51:30 INFO ShuffleBlockFetcherIterator: Getting 1 (156.0 B) non-empty blocks including 1 (156.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
20/12/25 11:51:30 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 13 ms
20/12/25 11:51:30 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
20/12/25 11:51:30 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
20/12/25 11:51:30 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/12/25 11:51:30 INFO FileOutputCommitter: Saved output of task 'attempt_20201225115126_0005_m_000000_0' to file:/root/spark-application/outfile
20/12/25 11:51:30 INFO SparkHadoopMapRedUtil: attempt_20201225115126_0005_m_000000_0: Committed
20/12/25 11:51:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1674 bytes result sent to driver
20/12/25 11:51:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 460 ms on localhost (executor driver) (1/1)
20/12/25 11:51:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
20/12/25 11:51:30 INFO DAGScheduler: ResultStage 1 (runJob at SparkHadoopWriter.scala:78) finished in 0.540 s
20/12/25 11:51:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
20/12/25 11:51:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
20/12/25 11:51:30 INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.830881 s
20/12/25 11:51:30 INFO SparkHadoopWriter: Job job_20201225115126_0005 committed.
OK
20/12/25 11:51:30 INFO SparkContext: Invoking stop() from shutdown hook
20/12/25 11:51:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4041
20/12/25 11:51:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/25 11:51:30 INFO MemoryStore: MemoryStore cleared
20/12/25 11:51:30 INFO BlockManager: BlockManager stopped
20/12/25 11:51:30 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/25 11:51:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/25 11:51:30 INFO SparkContext: Successfully stopped SparkContext
20/12/25 11:51:30 INFO ShutdownHookManager: Shutdown hook called
20/12/25 11:51:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237
20/12/25 11:51:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-6048ded9-925a-46f4-8976-d8619633bbfd
步骤5:检查输出
程序成功执行后,您将在spark-application目录中找到名为outfile的目录。
以下命令用于打开和检查outfile目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
用于检查part-00000文件中的输出的命令是-
$ cat part-00000
(talk.,1)
(are,2)
(not,1)
(people,1)
(share.,1)
(or,1)
(only,1)
(as,8)
(,1)
(care,1)
(they,7)
(beautiful,2)
(walk,1)
(love,,1)
(look,,1)
浏览以下部分以了解有关“ spark-submit”命令的更多信息。