- Apache Flink 教程
- Apache Flink - 首页
- Apache Flink - 大数据平台
- 批处理与实时处理
- Apache Flink - 简介
- Apache Flink - 架构
- Apache Flink - 系统要求
- Apache Flink - 设置/安装
- Apache Flink - API 概念
- Apache Flink - Table API 与 SQL
- 创建一个 Flink 应用程序
- Apache Flink - 运行 Flink 程序
- Apache Flink - 库
- Apache Flink - 机器学习
- Apache Flink - 使用案例
- Apache Flink - Flink 与 Spark 与 Hadoop 对比
- Apache Flink - 结论
- Apache Flink 资源
- Apache Flink - 快速指南
- Apache Flink - 有用资源
- Apache Flink - 讨论
Apache Flink - 创建 Flink 应用程序
在本章中,我们将学习如何创建 Flink 应用程序。
打开 Eclipse IDE,单击“新建项目”并选择 Java 项目。
给出项目名称,然后单击“完成”。
现在,单击完成,如下面的屏幕截图所示。
现在,右键单击src并选择“新建” >> “类”。
给出类名,然后单击“完成”。
将下面的代码复制并粘贴到编辑器中。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
你将在编辑器中遇到很多错误,因为需要将 Flink 库添加到此项目。
右键单击项目 >> 构建路径 >> 配置构建路径。
选择“库”标签,然后单击“添加外部 JAR”。
转到 Flink 的 lib 目录,选择所有 4 个库,然后单击“确定”。
转到“顺序和导出”标签,选择所有库,然后单击“确定”。
你将看到,错误已经消失了。
现在,让我们导出这个应用程序。右键单击该项目,然后单击“导出”。
选择 JAR 文件并单击“下一步”
给出目标路径并单击“下一步”
单击“下一步 >”
单击“浏览”,选择主要类 (WordCount),然后单击“完成”。
注意 - 如果出现任何警告,请单击“确定”。
运行下面的命令。它将进一步运行你刚刚创建的 Flink 应用程序。
./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output
广告