Flink API 概念

  • API

    Flink具有丰富的API集,开发人员可以使用它们对批处理和实时数据进行转换。各种转换包括映射,过滤,排序,联接,分组和聚合。Apache Flink的这些转换是在分布式数据上执行的。让我们讨论Apache Flink提供的不同API。
  • 数据集API

    Apache Flink中的数据集API用于在一段时间内对数据执行批处理操作。该API可在Java,Scala和Python中使用。它可以对数据集应用不同类型的转换,例如过滤,映射,聚合,联接和分组。可以从诸如本地文件之类的源中创建数据集,也可以通过从特定源中读取文件来创建数据集,并且可以将结果数据写入不同的接收器(如分布式文件或命令行终端)中。Java和Scala编程语言均支持此API。这是Dataset API的Wordcount程序-
    
    public class WordCountProg {
       public static void main(String[] args) throws Exception {
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
          DataSet<String> text = env.fromElements(
          "Hello",
          "My Dataset API Flink Program");
    
          DataSet<Tuple2<String, Integer>> wordCounts = text
          .flatMap(new LineSplitter())
          .groupBy(0)
          .sum(1);
    
          wordCounts.print();
       }
    
       public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
          @Override
          public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
             for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
             }
          }
       }
    }
    
  • 数据流API

    此API用于处理连续流中的数据。您可以对流数据执行各种操作,例如过滤,映射,窗口化,聚合。此数据流上有各种来源,例如消息队列,文件,套接字流,并且结果数据可以写在不同的接收器(如命令行终端)上。Java和Scala编程语言均支持此API。这是DataStream API的流式Wordcount程序,您可以在其中连续不断地进行字数统计并将数据分组在第二个窗口中。
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    public class WindowWordCountProg {
       public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Tuple2<String, Integer>> dataStream = env
          .socketTextStream("localhost", 9999)
          .flatMap(new Splitter())
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1);
          dataStream.print();
          env.execute("Streaming WordCount Example");
       }
       public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
          @Override
          public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
             for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
             }
          }
       }
    }