第一个flink项目
时间: 2025-02-12 17:19:21 浏览: 38
### 创建并运行第一个Apache Flink项目
#### 准备工作环境
为了成功创建并运行首个Flink应用,需先安装配置好Java开发环境以及Maven构建工具。确保已设置JAVA_HOME环境变量指向JDK路径,并能通过命令行访问`mvn`命令。
#### 初始化Maven项目结构
使用Maven原型插件来初始化一个新的Flink应用程序骨架:
```bash
mvn archetype:generate \
-DgroupId=com.example.flink \
-DartifactId=my-flink-project \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
```
这会生成一个基础的Maven工程目录树[^2]。
#### 添加依赖项至pom.xml文件
编辑项目的`pom.xml`文件,加入必要的Flink库依赖声明以便后续编译链接时能够识别这些类库:
```xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<!-- Other dependencies... -->
</dependencies>
```
上述XML片段展示了如何引入用于流处理的核心模块版本号为1.14.0的具体实例[^1]。
#### 编写简单的Word Count程序作为入门案例
在src/main/java下新建名为`com/example/flink/WordCount.java`源码文件,输入如下所示的基础代码实现单词计数逻辑功能:
```java
package com.example.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.fromElements(
"To be, or not to be, that is the question",
"Whether 'tis nobler in the mind to suffer"
);
text.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1)
.print();
env.execute("Word Count Example");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
```
这段代码定义了一个基本的数据流水线,它读取字符串列表中的句子,将其拆分为单独词语并通过键控窗口聚合统计每个词频次最后打印输出结果。
#### 构建可执行jar包
完成编码后回到IDEA界面依次操作菜单栏选项:File->Project Structure->Artifacts 来指定要被打包的内容;接着选择 Build->Build Artifacts 完成整个过程。
#### 部署到远程集群执行
假设已经有一个正在运行着单节点模式下的Flink集群,则可通过SCP协议把本地生成好的jar复制过去放置于服务器特定位置等待提交任务给JobManager调度器去启动作业流程:
```bash
docker cp my-flink-projec2.jar master:/opt/
```
随后登录目标机器终端连接进入容器内部利用CLI客户端提交该job请求即可开始正式跑批运算任务。
阅读全文
相关推荐


















