创建一个Flink App
在本章中,我们将学习如何创建Flink应用程序。打开Eclipse IDE,单击New Project并选择Java Project。
我们将创建一个APP 来统计Flink包下面的REDEME.txt的单词
创建一个类
写入以下源码,(包自定义)
package com.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
/**
*
*/
private static final long serialVersionUID = 7336944010299293015L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
这时候我们发现,导入的依赖上面eclipse提示找不到我们需要导入外部的flink库
添加外部库,从flink解压的文件夹lib文件夹下面添加所有的jar文件
这时候我们发现eclipse不报错了。
这时候我们需要把我们的应用导出
选择java下的JAR文件进行导出
选择下一步
在main类那里选择我们刚刚创建的应用
这时候运行flink执行刚刚的应用(输入文件,输出文件请对应自己的实际目录)
提示:需要在flink集群启动的情况下,运行下面命令,启动参考 Flink 设置安装一章
./bin/flink run /mnt/myweb/wordcount.jar --input README.txt --output output
这时候我们查看执行过后输出的文件,出现单词的统计情况