Flink迷你集群是一个轻量级的本地集群,可用于在本地环境中快速开发和测试Flink应用程序。迷你集群不需要任何复杂的配置和管理,只需要提供一个简单的配置即可使用。在启动迷你集群时,必须指定Flink应用程序所需的资源,如TaskManager的数量、内存大小、并行度等。
迷你集群的默认参数通常由Flink框架的配置文件控制。以下是一些常见的默认值:
- singleRpcService: SHARED(默认情况下,所有的RPC服务都共享同一个线程池)
- numTaskManagers: 1(默认情况下使用一个TaskManager节点)
- commonBindAddress: null(默认情况下使用本地地址)
- taskmanager.memory.network.min: 64 mb(TaskManager网络内存的最小值)
- taskmanager.memory.network.max: 64 mb(TaskManager网络内存的最大值)
- taskmanager.memory.managed.size: 128 mb(TaskManager管理内存的默认大小)
- taskmanager.numberOfTaskSlots: 12(每个TaskManager节点的默认任务插槽数量)
- parallelism.default: 1(操作符默认并行度)
- execution.target: local(默认情况下在本地运行)
- execution.runtime-mode: AUTOMATIC(默认情况下自适应选择执行模式)
- taskmanager.cpu.cores: 1.7976931348623157E308(TaskManager可以使用的CPU核心数最大值)
- taskmanager.memory.task.heap.size: 9223372036854775807 bytes(TaskManager堆内存的最大大小)
- taskmanager.memory.task.off-heap.size: 9223372036854775807 bytes(TaskManager堆外内存的最大大小)
- rest.bind-port: 0(默认情况下,REST API将绑定到系统可用端口)
- rest.address: localhost(REST API的默认地址)。
这些参数可以在Flink框架的配置文件中进行修改。在使用迷你集群时,可以使用命令行选项或Java代码覆盖这些默认设置。其中,最常用的命令行选项是-D
和-yT
。-D
选项可以在JVM启动参数中设置要覆盖的配置参数,-yT
选项可以从外部文件中加载YAML格式的配置文件。
简单demo代码如下:
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class FlinkDemo {public static void main(String[] args) throws Exception {// 1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置运行模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2.加载数据源DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java,c,c++,python,go");// 3.数据转换DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String element, Collector<String> out) throws Exception {String[] wordArr = element.split(",");for (String word : wordArr) {out.collect(word);}}});//DataStream 下边为DataStream子类SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}}).returns(String.class);// 4.数据输出source.print();// 5.执行程序env.execute("FlinkDemo");} }