Flink 创建Flink应用程序

  • 创建一个Flink App

    在本章中,我们将学习如何创建Flink应用程序。打开Eclipse IDE,单击New Project并选择Java Project。
    我们将创建一个APP 来统计Flink包下面的REDEME.txt的单词
    flink
    创建一个类
    flink
    写入以下源码,(包自定义)
    flink
    
    package com.test;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
    
       // *************************************************************************
       // PROGRAM
       // *************************************************************************
       public static void main(String[] args) throws Exception {
          final ParameterTool params = ParameterTool.fromArgs(args);
          // set up the execution environment
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          // make parameters available in the web interface
          env.getConfig().setGlobalJobParameters(params);
          // get input data
          DataSet<String> text = env.readTextFile(params.get("input"));
          DataSet<Tuple2<String, Integer>> counts =
          // split up the lines in pairs (2-tuples) containing: (word,1)
          text.flatMap(new Tokenizer())
          // group by the tuple field "0" and sum up tuple field "1"
          .groupBy(0)
          .sum(1);
          // emit result
          if (params.has("output")) {
             counts.writeAsCsv(params.get("output"), "\n", " ");
             // execute program
             env.execute("WordCount Example");
          } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             counts.print();
          }
       }
       
       // *************************************************************************
       // USER FUNCTIONS
       // *************************************************************************
       public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
          /**
             * 
             */
            private static final long serialVersionUID = 7336944010299293015L;
    
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
             // normalize and split the line
             String[] tokens = value.toLowerCase().split("\\W+");
             // emit the pairs
             for (String token : tokens) {
                if (token.length() > 0) {
                   out.collect(new Tuple2<>(token, 1));
                }
             }
          }
       }
    }
    
    这时候我们发现,导入的依赖上面eclipse提示找不到我们需要导入外部的flink库
    flink
    添加外部库,从flink解压的文件夹lib文件夹下面添加所有的jar文件
    flink
    这时候我们发现eclipse不报错了。
    flink
    这时候我们需要把我们的应用导出
    flink
    选择java下的JAR文件进行导出
    flink
    选择下一步
    flink
    在main类那里选择我们刚刚创建的应用
    flink
    这时候运行flink执行刚刚的应用(输入文件,输出文件请对应自己的实际目录)
    提示:需要在flink集群启动的情况下,运行下面命令,启动参考 Flink 设置安装一章
    
    ./bin/flink run /mnt/myweb/wordcount.jar --input README.txt --output output
    
    flink
    这时候我们查看执行过后输出的文件,出现单词的统计情况
    flink