Spark 部署

  • Spark 部署

    使用spark-submit的Spark应用程序是一个Shell命令,用于在群集上部署Spark应用程序。它通过统一的界面使用所有各自的集群管理器。因此,您不必为每个应用程序都配置您的应用程序。
    - 让我们以以前使用shell命令的单词计数为例。在这里,我们考虑与Spark应用程序相同的示例。
    样本输入
    以下文本是输入数据,名为in.txt的文件。
    
    people are not as beautiful as they look, 
    as they walk or as they talk. 
    they are only as beautiful  as they love, 
    as they care as they share.
    
    看下面的程序-
    SparkWordCount.scala
    
    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext._ 
    import org.apache.spark._  
    
    object SparkWordCount : Unit ={ 
       def main(args: Array[String]) { 
    
          val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Seq(), Map()) 
          
          /* local = master URL; Word Count = application name; */  
          /* /usr/local/spark = Spark Home; Seq = jars; Map = environment */ 
          /* Map = variables to work nodes */ 
          /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
          val input = sc.textFile("in.txt") 
          /* Transform the inputRDD into countRDD */ 
          
          val count = input.flatMap(line ⇒ line.split(" ")).map(word ⇒ (word, 1)).reduceByKey(_ + _) 
           
          /* saveAsTextFile method is an action that effects on the RDD */  
          count.saveAsTextFile("outfile") 
          System.out.println("OK"); 
       } 
    } 
    
    将上述程序保存到名为 SparkWordCount.scala的文件中,并将其放置在名为spark-application的用户定义目录中。
    注意-在将inputRDD转换为countRDD时,我们使用flatMap()将行(从文本文件中)标记为单词,使用map()方法对单词频率进行计数,并使用reduceByKey()方法对每个单词重复进行计数。
    使用以下步骤提交此申请。通过终端执行spark-application目录中的所有步骤。
    步骤1:下载Spark Jar
    编译需要Spark core jar,因此,请从以下链接Spark核心jar下载spark-core_2.12-3.0.0.jar并将jar文件从下载目录移至spark-application目录。
    步骤2:编译程式
    使用下面给出的命令编译以上程序。该命令应从 spark-application 目录执行。在这里,/usr/local/spark/jar/hadoop-common-3.2.0.jar是从Spark库获取的Hadoop支持jar。
    
    $ scalac -classpath "spark-core_2.12-3.0.0.jar:/usr/local/spark/jar/hadoop-common-3.2.0.jar" SparkWordCount.scala
    
    步骤3:建立一个JAR
    使用以下命令创建spark应用程序的jar文件。在这里,wordcount是jar文件的文件名。
    
    jar -cvf wordcount.jar SparkWordCount*.class 
    
    步骤4:提交Spark申请
    使用以下命令提交spark应用程序-
    
    spark-submit --class SparkWordCount --master local wordcount.jar
    
    如果执行成功,您将在下面找到输出。以下输出中的OK(确定)命令用于用户识别,这是程序的最后一行。如果您仔细阅读以下输出,将会发现其他内容,例如-
    • 在端口36325上成功启动了服务“sparkDriver”
    • MemoryStore开始时的容量为413.9 MiB
    • 从http://192.168.61.201:4041启动SparkUI (Service 'SparkUI' could not bind on port 4040) 教程由于在前面开了一个spark-shell 也启动了SparkUI 占用了4040端口,这里会自动监听4041端口
    • 添加了JAR文件:/root/spark-application/wordcount.jar at spark://localhost:36325/jars/wordcount.jar
    • ResultStage 1(SparkPi.scala:saveAsTextFile:11)在0.540 秒内完成
    • 在http://192.168.61.201:4041停止了Spark Web UI
    • MemoryStore已清除
    
    20/12/25 11:51:21 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.61.201 instead (on interface enp0s3)
    20/12/25 11:51:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    20/12/25 11:51:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/12/25 11:51:22 INFO SparkContext: Running Spark version 3.0.0
    20/12/25 11:51:22 INFO ResourceUtils: ==============================================================
    20/12/25 11:51:22 INFO ResourceUtils: Resources for spark.driver:
    
    20/12/25 11:51:22 INFO ResourceUtils: ==============================================================
    20/12/25 11:51:22 INFO SparkContext: Submitted application: Word Count
    20/12/25 11:51:22 INFO SecurityManager: Changing view acls to: root
    20/12/25 11:51:22 INFO SecurityManager: Changing modify acls to: root
    20/12/25 11:51:22 INFO SecurityManager: Changing view acls groups to: 
    20/12/25 11:51:22 INFO SecurityManager: Changing modify acls groups to: 
    20/12/25 11:51:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    20/12/25 11:51:23 INFO Utils: Successfully started service 'sparkDriver' on port 36325.
    20/12/25 11:51:23 INFO SparkEnv: Registering MapOutputTracker
    20/12/25 11:51:23 INFO SparkEnv: Registering BlockManagerMaster
    20/12/25 11:51:23 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    20/12/25 11:51:23 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    20/12/25 11:51:23 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
    20/12/25 11:51:23 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-27d998a6-84bb-42ee-b924-519055f345da
    20/12/25 11:51:23 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
    20/12/25 11:51:23 INFO SparkEnv: Registering OutputCommitCoordinator
    20/12/25 11:51:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    20/12/25 11:51:23 INFO Utils: Successfully started service 'SparkUI' on port 4041.
    20/12/25 11:51:23 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://localhost:4041
    20/12/25 11:51:23 INFO SparkContext: Added JAR file:/root/spark-application/wordcount.jar at spark://localhost:36325/jars/wordcount.jar with timestamp 1608868283968
    20/12/25 11:51:24 INFO Executor: Starting executor ID driver on host localhost
    20/12/25 11:51:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44684.
    20/12/25 11:51:24 INFO NettyBlockTransferService: Server created on localhost:44684
    20/12/25 11:51:24 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    20/12/25 11:51:24 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, localhost, 44684, None)
    20/12/25 11:51:24 INFO BlockManagerMasterEndpoint: Registering block manager localhost:44684 with 413.9 MiB RAM, BlockManagerId(driver, localhost, 44684, None)
    20/12/25 11:51:24 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 44684, None)
    20/12/25 11:51:24 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 44684, None)
    20/12/25 11:51:25 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 293.7 KiB, free 413.6 MiB)
    20/12/25 11:51:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.9 KiB, free 413.6 MiB)
    20/12/25 11:51:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44684 (size: 26.9 KiB, free: 413.9 MiB)
    20/12/25 11:51:25 INFO SparkContext: Created broadcast 0 from textFile at SparkWordCount.scala:14
    20/12/25 11:51:26 INFO FileInputFormat: Total input files to process : 1
    20/12/25 11:51:26 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
    20/12/25 11:51:26 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
    20/12/25 11:51:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
    20/12/25 11:51:26 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
    20/12/25 11:51:26 INFO SparkContext: Starting job: runJob at SparkHadoopWriter.scala:78
    20/12/25 11:51:27 INFO DAGScheduler: Registering RDD 3 (map at SparkWordCount.scala:18) as input to shuffle 0
    20/12/25 11:51:27 INFO DAGScheduler: Got job 0 (runJob at SparkHadoopWriter.scala:78) with 1 output partitions
    20/12/25 11:51:27 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at SparkHadoopWriter.scala:78)
    20/12/25 11:51:27 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
    20/12/25 11:51:27 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
    20/12/25 11:51:27 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkWordCount.scala:18), which has no missing parents
    20/12/25 11:51:27 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.7 KiB, free 413.6 MiB)
    20/12/25 11:51:27 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.9 KiB, free 413.6 MiB)
    20/12/25 11:51:27 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44684 (size: 3.9 KiB, free: 413.9 MiB)
    20/12/25 11:51:27 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1200
    20/12/25 11:51:28 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkWordCount.scala:18) (first 15 tasks are for partitions Vector(0))
    20/12/25 11:51:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    20/12/25 11:51:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7365 bytes)
    20/12/25 11:51:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    20/12/25 11:51:28 INFO Executor: Fetching spark://localhost:36325/jars/wordcount.jar with timestamp 1608868283968
    20/12/25 11:51:28 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:36325 after 92 ms (0 ms spent in bootstraps)
    20/12/25 11:51:28 INFO Utils: Fetching spark://localhost:36325/jars/wordcount.jar to /tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237/userFiles-ee247a6f-3eda-4e39-a327-cb2180afe220/fetchFileTemp8416307429326821809.tmp
    20/12/25 11:51:28 INFO Executor: Adding file:/tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237/userFiles-ee247a6f-3eda-4e39-a327-cb2180afe220/wordcount.jar to class loader
    20/12/25 11:51:29 INFO HadoopRDD: Input split: file:/root/spark-application/in.txt:0+145
    20/12/25 11:51:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1330 bytes result sent to driver
    20/12/25 11:51:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1722 ms on localhost (executor driver) (1/1)
    20/12/25 11:51:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    20/12/25 11:51:29 INFO DAGScheduler: ShuffleMapStage 0 (map at SparkWordCount.scala:18) finished in 2.061 s
    20/12/25 11:51:29 INFO DAGScheduler: looking for newly runnable stages
    20/12/25 11:51:29 INFO DAGScheduler: running: Set()
    20/12/25 11:51:29 INFO DAGScheduler: waiting: Set(ResultStage 1)
    20/12/25 11:51:29 INFO DAGScheduler: failed: Set()
    20/12/25 11:51:29 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkWordCount.scala:22), which has no missing parents
    20/12/25 11:51:29 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 85.6 KiB, free 413.5 MiB)
    20/12/25 11:51:29 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 30.8 KiB, free 413.5 MiB)
    20/12/25 11:51:29 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:44684 (size: 30.8 KiB, free: 413.9 MiB)
    20/12/25 11:51:29 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1200
    20/12/25 11:51:29 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkWordCount.scala:22) (first 15 tasks are for partitions Vector(0))
    20/12/25 11:51:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
    20/12/25 11:51:29 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, NODE_LOCAL, 7143 bytes)
    20/12/25 11:51:29 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
    20/12/25 11:51:30 INFO ShuffleBlockFetcherIterator: Getting 1 (156.0 B) non-empty blocks including 1 (156.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
    20/12/25 11:51:30 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 13 ms
    20/12/25 11:51:30 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
    20/12/25 11:51:30 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
    20/12/25 11:51:30 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
    20/12/25 11:51:30 INFO FileOutputCommitter: Saved output of task 'attempt_20201225115126_0005_m_000000_0' to file:/root/spark-application/outfile
    20/12/25 11:51:30 INFO SparkHadoopMapRedUtil: attempt_20201225115126_0005_m_000000_0: Committed
    20/12/25 11:51:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1674 bytes result sent to driver
    20/12/25 11:51:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 460 ms on localhost (executor driver) (1/1)
    20/12/25 11:51:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
    20/12/25 11:51:30 INFO DAGScheduler: ResultStage 1 (runJob at SparkHadoopWriter.scala:78) finished in 0.540 s
    20/12/25 11:51:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
    20/12/25 11:51:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
    20/12/25 11:51:30 INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.830881 s
    20/12/25 11:51:30 INFO SparkHadoopWriter: Job job_20201225115126_0005 committed.
    OK
    20/12/25 11:51:30 INFO SparkContext: Invoking stop() from shutdown hook
    20/12/25 11:51:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4041
    20/12/25 11:51:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    20/12/25 11:51:30 INFO MemoryStore: MemoryStore cleared
    20/12/25 11:51:30 INFO BlockManager: BlockManager stopped
    20/12/25 11:51:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    20/12/25 11:51:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    20/12/25 11:51:30 INFO SparkContext: Successfully stopped SparkContext
    20/12/25 11:51:30 INFO ShutdownHookManager: Shutdown hook called
    20/12/25 11:51:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-24f41970-5ea6-4f24-b1c8-8744d908a237
    20/12/25 11:51:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-6048ded9-925a-46f4-8976-d8619633bbfd
    
    步骤5:检查输出
    程序成功执行后,您将在spark-application目录中找到名为outfile的目录。
    以下命令用于打开和检查outfile目录中的文件列表。
    
    $ cd outfile 
    $ ls 
    Part-00000 part-00001 _SUCCESS
    
    用于检查part-00000文件中的输出的命令是-
    
    $ cat part-00000 
    (talk.,1)
    (are,2)
    (not,1)
    (people,1)
    (share.,1)
    (or,1)
    (only,1)
    (as,8)
    (,1)
    (care,1)
    (they,7)
    (beautiful,2)
    (walk,1)
    (love,,1)
    (look,,1)
    
    浏览以下部分以了解有关“ spark-submit”命令的更多信息。
  • Spark提交语法

    
    spark-submit [options] <app jar | python file> [app arguments]
    
    [options]
    下表给出了选项列表-
    选项 描述
    --master spark://host:port,mesos://host:port,yarn或local。
    --deploy-mode 是在本地启动驱动程序(“client”),还是在集群内部的一台辅助计算机上启动驱动程序(“cluster”)(默认:client)。
    --class 应用程序的主类(适用于Java/Scala应用程序)。
    --name 您的应用程序的名称。
    --jars 以逗号分隔的本地jar列表,​​包括在驱动程序和执行程序的类路径中。
    --packages 以逗号分隔的jar的行家坐标列表,包括在驱动程序和执行程序的类路径中。
    --repositories 以逗号分隔的其他远程存储库列表,以搜索--packages给出的Maven坐标。
    --py-files 以逗号分隔的.zip,.egg或.py文件列表,以放置在Python应用程序的PYTHON PATH上。
    --files 以逗号分隔的文件列表,将其放置在每个执行程序的工作目录中。
    --conf (prop = val)任意Spark配置属性。
    --properties-file 从中加载额外属性的文件的路径。如果未指定,它将查找conf/spark-defaults。
    --driver-memory 驱动程序内存(例如1000M,2G)(默认:512M)。
    --driver-java-options 传递给驱动程序的其他Java选项。
    --driver-library-path 额外的库路径条目传递给驱动程序。
    --driver-class-path 额外的类路径条目传递给驱动程序。请注意,添加了--jars的jar将自动包含在类路径中。
    --executor-memory 每个执行器的内存(例如1000M,2G)(默认值:1G)。
    --proxy-user 用户在提交申请时模拟。
    --help, -h 显示此帮助消息并退出。
    --verbose, -v 打印其他调试输出。
    --version 打印当前Spark的版本。
    --driver-cores NUM个驱动程序内核(默认值:1)。
    --supervise 如果给出,则在出现故障时重新启动驱动程序。
    --kill 如果给出,则终止指定的驱动程序。
    --status 如果给出,则请求指定驱动程序的状态。
    --total-executor-cores 所有执行者的核心总数。
    --executor-cores 每个执行者的核心数。 (默认值:在YARN模式下为1,在独立模式下为worker上的所有可用内核)。