Skip to main content
Ctrl+K
如果你觉得内容对你有帮助,请在 GitHub 上点个 star 吧!
Flink 原理与实践 - Home
  • GitHub
  • Flink 原理与实践
  • 1. 大数据技术概述
    • 1.1. 什么是大数据
    • 1.2. 从批处理到流处理
    • 1.3. 代表性大数据技术
    • 1.4. 从Lambda到Kappa:大数据处理平台的演进
    • 1.5. 流处理基础概念
    • 1.6. 编程语言的选择
    • 1.7. 案例实战:使用 Kafka 构建文本数据流
  • 2. 大数据必备编程知识
    • 2.1. 继承和多态
    • 2.2. 泛型
    • 2.3. 函数式编程
    • 2.4. 案例实战 Flink开发环境搭建
  • 3. Flink的设计与运行原理
    • 3.1. Flink 数据流图
    • 3.2. 架构与核心组件
    • 3.3. 任务执行与资源划分
    • 3.4. 练习:WordCount
  • 4. DataStream API
    • 4.1. Flink程序的骨架结构
    • 4.2. Transformations
    • 4.3. 数据类型和序列化
    • 4.4. 用户自定义函数
    • 4.5. 习题 股票数据流处理
  • 5. 时间和窗口
    • 5.1. Flink的时间语义
    • 5.2. ProcessFunction
    • 5.3. 窗口算子
    • 5.4. 双流关联
    • 5.5. 迟到数据
    • 5.6. 练习:股票价格
  • 6. 状态与检查点
    • 6.1. Checkpoint
    • 6.2. Savepoint
    • 6.3. 实现有状态的计算
    • 6.4. 练习:电商用户行为
  • 7. Flink连接器
    • 7.1. Flink端到端的Exactly-Once保障
    • 7.2. 自定义Source和Sink
    • 7.3. Flink中常用的Connector
    • 7.4. 实验 读取并输出股票价格数据流
  • 8. Table API & SQL
    • 8.1. Table API & SQL综述
    • 8.2. 动态表和持续查询
    • 8.3. 时间和窗口
    • 8.4. Join
    • 8.5. SQL DDL
    • 8.6. 系统函数
    • 8.7. 用户自定义函数
    • 8.8. 练习:SQL on IoT
  • 9. Flink的部署和配置
    • 9.1. Flink集群部署模式
    • 9.2. 配置文件
    • 9.3. 算子链与槽位共享
    • 9.4. 命令行工具
    • 9.5. 与Hadoop集成
    • 9.6. 实验 作业编码、打包与提交
  • .md

案例实战 Flink开发环境搭建

本节目录

  • 2.4.1 准备所需软件
  • 2.4.2 下载并安装Flink
  • 2.4.3 创建Flink工程
  • 2.4.4 调试和运行Flink程序
  • 2.4.5 运行程序
  • 本章小结

2.4. 案例实战 Flink开发环境搭建#

本案例实战主要带领读者完成对Flink开发环境的搭建。

2.4.1 准备所需软件#

在1.7节中我们简单提到了Kafka的安装部署所需的软件环境,这里我们再次梳理一下Flink开发所需的软件环境。

  1. 操作系统

    • 目前,我们可以在Linux、macOS和Windows操作系统上开发和运行Flink。类UNIX操作系统(Linux或macOS)是大数据首选的操作系统,它们对Flink的支持更好,适合进行Flink学习和开发。后文会假设读者已经拥有了一个类UNIX操作系统。Windows用户为了构建一个类UNIX环境,可以使用专门为Linux操作系统打造的子系统(Windows subsystem for Linux,即WSL)或者是Cygwin,又或者创建一个虚拟机,在虚拟机中安装Linux操作系统。

  2. JDK

    • 和Kafka一样,Flink开发基于JDK,因此也需要提前安装好JDK 1.8+ (Java 8或更高的版本),配置好Java环境变量。

  3. 其他工具

    • 其他的工具因开发者习惯不同来安装,不是Flink开发所必需的,但这里仍然建议提前安装好以下工具。

      • Apache Maven 3.0+

        • Apache Maven是一个项目管理工具,可以对Java或Scala项目进行构建及依赖管理,是进行大数据开发必备的工具。这里推荐使用Maven是因为Flink源码工程和本书的示例代码工程均使用Maven进行管理。

      • IntelliJ IDEA

        • IntelliJ IDEA是一个非常强大的编辑器和开发工具,内置了Maven等一系列工具,是大数据开发必不可少的利器。Intellij IDEA本来是一个商业软件,它提供了社区免费版本,免费版本已经基本能满足绝大多数的开发需求。

        • 除IntelliJ IDEA之外,还有Eclipse IDE或NetBeans IDE等开发工具,读者可以根据自己的使用习惯选择。由于IntelliJ IDEA对Scala的支持更好,本书建议读者使用IntelliJ IDEA。

2.4.2 下载并安装Flink#

从Flink官网下载编译好的Flink程序,把下载的.tgz压缩包放在你想放置的目录。在下载时,Flink提供了不同的选项,包括Scala 2.11、Scala 2.12、源码版等。其中,前两个版本是Flink官方提供的可执行版,解压后可直接使用,无须从源码开始编译打包。Scala不同版本间兼容性较差,对于Scala开发者来说,需要选择自己常用的版本,对于Java开发者来说,选择哪个Scala版本区别不大。本书写作时,使用的是Flink 1.11和Scala 2.11,读者可以根据自身情况下载相应版本。

按照下面的方式,解压该压缩包,进入解压目录,并启动Flink集群。

$ tar -zxvf flink-1.11.2-bin-scala_2.11.tgz  # 解压
$ cd flink-1.11.2-bin-scala_2.11  # 进入解压目录
$ ./bin/start-cluster.sh  # 启动 Flink 集群

成功启动后,打开浏览器,输入http://localhost:8081,可以进入Flink集群的仪表盘(WebUI),如图2-4所示。Flink WebUI可以对Flink集群进行管理和监控。

图2-4  Flink WebUI

2.4.3 创建Flink工程#

我们使用Maven从零开始创建一个Flink工程。

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.11.2 \
    -DgroupId=com.myflink \
    -DartifactId=flink-study-scala \
    -Dversion=0.1 \
    -Dpackage=quickstart \
    -DinteractiveMode=false

archetype是Maven提供的一种项目模板,是别人提前准备好了的项目的结构框架,用户只需要使用Maven工具下载这个模板,在这个模板的基础上丰富并完善代码逻辑。主流框架一般都准备好了archetype,如Spring、Hadoop等。

不熟悉Maven的读者可以先使用IntelliJ IDEA内置的Maven工具,熟悉Maven的读者可直接跳过这部分。

如图2-5所示,在IntelliJ IDEA里依次单击“File”→“New”→“Project”,创建一个新工程。

图2-5  在IntelliJ IDEA中创建新工程

如图2-6所示,选择左侧的“Maven”,并勾选“Create from archetype”,并单击右侧的“Add Archetype”按钮。

图2-6  添加Maven项目

如图2-7所示,在弹出的窗口中填写archetype信息。其中GroupId为org.apache.flink,ArtifactId为flink-quickstart-java,Version为1.11.2,然后单击“OK”。这里主要是告诉Maven去资源库中下载哪个版本的模板。随着Flink的迭代开发,Version也在不断更新,读者可以在Flink的Maven资源库中查看最新的版本。GroupId、ArtifactId、Version可以唯一表示一个发布出来的Java程序包。配置好后,单击Next按钮进入下一步。

图2-7  填写archetype信息

如图2-8所示,这一步是建立你自己的Maven工程,以区别其他Maven工程,GroupId是你的公司或部门名称(可以随意填写),ArtifactId是工程发布时的Java归档(Java Archive,JAR)包名,Version是工程的版本。这些配置主要用于区别不同公司所发布的不同包,这与Maven和版本控制相关,Maven的教程中都会介绍这些概念,这里不赘述。

图2-8  配置你的工程信息

接下来可以继续单击“Next”按钮,注意最后一步选择你的工程所在的磁盘位置,单击“Finish”按钮,如图2-9所示。至此,一个Flink模板就下载好了。

图2-9  配置本工程的位置

工程结构如图2-10所示。左侧的“Project”栏是工程结构,其中src/main/java文件夹是Java代码文件存放位置,src/main/scala是Scala代码文件存放位置。我们可以在StreamingJob这个文件上继续修改,也可以重新创建一个新文件。

图2-10  工程结构

注意,开发前要单击右下角的“Import Changes”,让Maven导入所依赖的包。

2.4.4 调试和运行Flink程序#

我们创建一个新的文件,名为WordCountKafkaInStdOut.java,开始编写第一个Flink程序—流式词频统计(WordCount)程序。这个程序接收一个Kafka文本数据流,进行词频统计,然后输出到标准输出上。这里先不对程序做深入分析,后文中将会做更详细的解释。

首先要设置Flink的运行环境。

// 设置Flink运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

设置Kafka相关参数,连接对应的服务器和端口号,读取名为Shakespeare的Topic中的数据源,将数据源命名为stream。

// Kafka参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
// Source
FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);

使用Flink API处理这个数据流。

// Transformation
// 使用Flink API对输入流的文本进行操作
// 切词转换、分组、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
    .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
      String[] tokens = line.split("\\s");
      // 输出结果
      for (String token : tokens) {
        if (token.length() > 0) {
          collector.collect(new Tuple2<>(token, 1));
        }
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);

将数据流输出。

// Sink
wordCount.print();

最后运行该程序。

// execute
env.execute("kafka streaming word count");

env.execute() 是启动Flink作业所必需的,只有在execute()方法被调用时,之前调用的各个操作才会被提交到集群上或本地计算机上运行。

该程序的完整代码如代码清单2-9所示。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // 设置Flink执行环境
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformation
        // 使用Flink API对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        DataStream<Tuple2<String, Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // 输出结果
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink
        wordCount.print();

        // execute
        env.execute("kafka streaming word count");

    }
}

代码写完后,我们还要在Maven的项目对象模型(Project Object Model,POM)文件中引入下面的依赖,让Maven可以引用Kafka。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

其中,${scala.binary.version}是所用的Scala版本号,可以是2.11或2.12,${flink.version}是所用的Flink的版本号,比如1.11.2。

2.4.5 运行程序#

我们在1.7节中展示过如何启动一个Kafka集群,并向某个Topic内发送数据流。在本次Flink作业启动之前,我们还要按照1.7节提到的方式启动一个Kafka集群、创建对应的Topic,并向Topic中写入数据。

  1. 在IntelliJ IDEA中运行程序

    • 在IntelliJ IDEA中,单击绿色运行按钮,运行这个程序。图2-11所示的两个绿色运行按钮中的任意一个都可以运行这个程序。

    • IntelliJ IDEA下方的“Run”栏会显示程序的输出,包括本次需要输出的结果,如图2-12所示。

图2-11  在IntelliJ IDEA中运行Flink程序

图2-12  WordCount程序运行结果

恭喜你,你的第一个Flink程序运行成功!

提示

如果在Intellij IDEA中运行程序时遇到java.lang.NoClassDefFoundError 报错,这是因为没有把依赖的类都加载进来。在Intellij IDEA中单击“Run”->“Edit configurations…”,在“Use classpath of module”选项上选择当前工程,并且勾选“Include dependencies with‘Provided’ Scope”

  1. 向集群提交作业

    • 目前,我们学会了先下载并启动本地集群,接着在模板的基础上添加代码,并在IntelliJ IDEA中运行程序。而在生产环境中,我们一般需要将代码编译打包,提交到集群上。我们将在第9章详细介绍如何向Flink集群提交作业。

    • 注意,这里涉及两个目录:一个是我们存放刚刚编写代码的工程目录,简称工程目录;另一个是从Flink官网下载解压的Flink主目录,主目录下的bin目录中有Flink提供的命令行工具。

    • 进入工程目录,使用Maven命令行将代码编译打包。

# 使用Maven命令行将代码编译打包
# 打好的包一般放在工程目录的target目录下
$ mvn clean package

回到Flink主目录,使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数–class用来指定哪个主类作为入口。我们之后会介绍命令行的具体使用方法。

$ bin/flink run --class
com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut
/Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar

如图2-13所示,这时,Flink WebUI上就多了一个Flink作业。

图2-13  Flink WebUI中多了一个Flink作业

程序的输出会保存到Flink主目录下面的log目录下的.out文件中,可以使用下面的命令查看结果。

$ tail -f log/flink-*-taskexecutor-*.out

必要时,可以使用下面的命令关停本地集群。

$ ./bin/stop-cluster.sh

Flink开发和调试过程中,一般有如下几种方式运行程序。

  • 使用IntelliJ IDEA内置的绿色运行按钮。这种方式主要在本地调试时使用。

  • 使用Flink提供的命令行工具向集群提交作业,包括Java和Scala程序。这种方式更适合生产环境。

  • 使用Flink提供的其他命令行工具,比如针对Scala、Python和SQL的交互式环境。

对于新手,可以先使用IntelliJ IDEA提供的内置运行按钮,熟练后再使用命令行工具。

本章小结#

本章中,我们回顾了Flink开发经常用到的继承和多态、泛型和函数式编程等概念,在本地搭建了一个Flink集群,创建了第一个Flink工程,并学会了如何运行Flink程序。

previous

2.3. 函数式编程

next

3. Flink的设计与运行原理

本节目录
  • 2.4.1 准备所需软件
  • 2.4.2 下载并安装Flink
  • 2.4.3 创建Flink工程
  • 2.4.4 调试和运行Flink程序
  • 2.4.5 运行程序
  • 本章小结

By Weizheng Lu

© Copyright 2023-2024.