flink源码分析-获取JVM最大堆内存

news/2024/12/22 13:08:48/

flink版本: flink-1.11.2

代码位置: org.apache.flink.runtime.util.EnvironmentInformation#getMaxJvmHeapMemory

如果设置了-Xmx参数,就返回这个参数,如果没设置就返回机器物理内存的1/4.  这里主要看各个机器内存的获取方法。

	/*** The maximum JVM heap size, in bytes.** <p>This method uses the <i>-Xmx</i> value of the JVM, if set. If not set, it returns (as* a heuristic) 1/4th of the physical memory size.** @return The maximum JVM heap size, in bytes.*/public static long getMaxJvmHeapMemory() {final long maxMemory = Runtime.getRuntime().maxMemory();if(maxMemory != Long.MAX_VALUE) {// we have the proper max memoryreturn maxMemory;} else {// max JVM heap size is not set - use the heuristic to use 1/4th of the physical memoryfinal long physicalMemory = Hardware.getSizeOfPhysicalMemory();if(physicalMemory != -1) {// got proper value for physical memoryreturn physicalMemory / 4;} else {throw new RuntimeException("Could not determine the amount of free memory.\n" + "Please set the maximum memory for the JVM, e.g. -Xmx512M for 512 megabytes.");}}}

进入getSizeOfPhysicalMemory()方法,里面有获取各种操作系统物理内存的方法:

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.runtime.util;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.flink.util.OperatingSystem;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** Convenience class to extract hardware specifics of the computer executing the running JVM.*/
public class Hardware {private static final Logger LOG = LoggerFactory.getLogger(Hardware.class);private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");// ------------------------------------------------------------------------/*** Gets the number of CPU cores (hardware contexts) that the JVM has access to.* * @return The number of CPU cores.*/public static int getNumberCPUCores() {// TODO_MA 注释: 获取 Cpu Coresreturn Runtime.getRuntime().availableProcessors();}/*** Returns the size of the physical memory in bytes.* * @return the size of the physical memory in bytes or {@code -1}, if*         the size could not be determined.*/public static long getSizeOfPhysicalMemory() {// first try if the JVM can directly tell us what the system memory is// this works only on Oracle JVMstry {Class<?> clazz = Class.forName("com.sun.management.OperatingSystemMXBean");Method method = clazz.getMethod("getTotalPhysicalMemorySize");OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();// someone may install different beans, so we need to check whether the bean// is in fact the sun management beanif (clazz.isInstance(operatingSystemMXBean)) {return (Long) method.invoke(operatingSystemMXBean);}}catch (ClassNotFoundException e) {// this happens on non-Oracle JVMs, do nothing and use the alternative code paths}catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {LOG.warn("Access to physical memory size: " +"com.sun.management.OperatingSystemMXBean incompatibly changed.", e);}// we now try the OS specific access pathsswitch (OperatingSystem.getCurrentOperatingSystem()) {case LINUX:return getSizeOfPhysicalMemoryForLinux();case WINDOWS:return getSizeOfPhysicalMemoryForWindows();case MAC_OS:return getSizeOfPhysicalMemoryForMac();case FREE_BSD:return getSizeOfPhysicalMemoryForFreeBSD();case UNKNOWN:LOG.error("Cannot determine size of physical memory for unknown operating system");return -1;default:LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());return -1;}}/*** Returns the size of the physical memory in bytes on a Linux-based* operating system.* * @return the size of the physical memory in bytes or {@code -1}, if*         the size could not be determined*/private static long getSizeOfPhysicalMemoryForLinux() {try (BufferedReader lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH))) {String line;while ((line = lineReader.readLine()) != null) {Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);if (matcher.matches()) {String totalMemory = matcher.group(1);return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte}}// expected line did not comeLOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +"Unexpected format.");return -1;}catch (NumberFormatException e) {LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +"Unexpected format.");return -1;}catch (Throwable t) {LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo') ", t);return -1;}}/*** Returns the size of the physical memory in bytes on a Mac OS-based* operating system* * @return the size of the physical memory in bytes or {@code -1}, if*         the size could not be determined*/private static long getSizeOfPhysicalMemoryForMac() {BufferedReader bi = null;try {Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));String line;while ((line = bi.readLine()) != null) {if (line.startsWith("hw.memsize")) {long memsize = Long.parseLong(line.split(":")[1].trim());bi.close();proc.destroy();return memsize;}}} catch (Throwable t) {LOG.error("Cannot determine physical memory of machine for MacOS host", t);return -1;} finally {if (bi != null) {try {bi.close();} catch (IOException ignored) {}}}return -1;}/*** Returns the size of the physical memory in bytes on FreeBSD.* * @return the size of the physical memory in bytes or {@code -1}, if*         the size could not be determined*/private static long getSizeOfPhysicalMemoryForFreeBSD() {BufferedReader bi = null;try {Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));String line;while ((line = bi.readLine()) != null) {if (line.startsWith("hw.physmem")) {long memsize = Long.parseLong(line.split(":")[1].trim());bi.close();proc.destroy();return memsize;}}LOG.error("Cannot determine the size of the physical memory for FreeBSD host " +"(using 'sysctl hw.physmem').");return -1;}catch (Throwable t) {LOG.error("Cannot determine the size of the physical memory for FreeBSD host " +"(using 'sysctl hw.physmem')", t);return -1;}finally {if (bi != null) {try {bi.close();} catch (IOException ignored) {}}}}/*** Returns the size of the physical memory in bytes on Windows.* * @return the size of the physical memory in bytes or {@code -1}, if*         the size could not be determined*/private static long getSizeOfPhysicalMemoryForWindows() {BufferedReader bi = null;try {Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));String line = bi.readLine();if (line == null) {return -1L;}if (!line.startsWith("Capacity")) {return -1L;}long sizeOfPhyiscalMemory = 0L;while ((line = bi.readLine()) != null) {if (line.isEmpty()) {continue;}line = line.replaceAll(" ", "");sizeOfPhyiscalMemory += Long.parseLong(line);}return sizeOfPhyiscalMemory;}catch (Throwable t) {LOG.error("Cannot determine the size of the physical memory for Windows host " +"(using 'wmic memorychip')", t);return -1L;}finally {if (bi != null) {try {bi.close();} catch (Throwable ignored) {}}}}// --------------------------------------------------------------------------------------------private Hardware() {}
}

另外注意try catch的这种用法:

		try (BufferedReader lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH))) {String line;while ((line = lineReader.readLine()) != null) {Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);if (matcher.matches()) {String totalMemory = matcher.group(1);return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte}}// expected line did not comeLOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +"Unexpected format.");return -1;}catch (NumberFormatException e) {LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). " +"Unexpected format.");return -1;}


http://www.ppmy.cn/news/987697.html

相关文章

SQL基础使用

SQL的概述 SQL全称&#xff1a; Structured Query Language&#xff0c;结构化查询语言&#xff0c;用于访问和处理数据库的标准的计算机语言。 SQL语言1974年由Boyce和Chamberlin提出&#xff0c;并首先在IBM公司研制的关系数据库系统SystemR上实现。 经过多年发…

问题解决——datagrid远程连接虚拟机中ubuntu的mysql失败

问题解决——datagrid远程连接虚拟机中ubuntu的mysql失败 情况&#xff1a;datagrid远程win11系统下虚拟机里的ubuntu20.04的mysql&#xff0c;连接失败。 1 如果是防火墙没开放3306端口&#xff0c;则需要开放&#xff1a;linux 3306端口无法连接 无法通过防火墙的解决办法 …

ICML 2023 | 拓展机器学习的边界

编者按&#xff1a;如今&#xff0c;机器学习已成为人类未来发展的焦点领域&#xff0c;如何进一步拓展机器学习技术和理论的边界&#xff0c;是一个极富挑战性的重要话题。7月23日至29日&#xff0c;第四十届国际机器学习大会 ICML 2023 在美国夏威夷举行。该大会是由国际机器…

app上有个播放视频的功能,客户反馈某个时间段会卡顿,初步分析是用户多,让测试一下性能

当应用上有播放视频的功能&#xff0c;而用户反馈在某个时间段会出现卡顿问题时&#xff0c;你可以通过进行性能测试来分析这个问题。性能测试是一种评估应用在不同条件下行为的测试方法&#xff0c;以确保它满足所需的性能标准。以下是一些步骤&#xff1a; 设置测试环境&…

反复 Failed to connect to github.com port 443 after xxx ms

前提&#xff1a;使用了代理&#xff0c;浏览器能稳定访问github&#xff0c;但git clone一直超时 解决方案&#xff1a; 1. git config --global http.proxy http://127.0.0.1:1080 2. 代理设置端口1080 3. 1080可自定义 感谢来自这篇博客和评论区的提醒&#xff1a;解决…

CAS比较并交换概述

1.概述 CAS(Compare And Swap)表示比较并交换&#xff0c;是乐观锁(简单理解为不加锁)的实现&#xff0c;采用的是自旋锁的思想。底层是通过Unsafe类中compareAndSwapInt等方法实现。 CAS包含三个操作数&#xff0c;分别为&#xff1a;内存值&#xff0c;预估值&#xff0c;更新…

推荐功能强大的活码管理平台(支持淘宝客和分享卡片)

功能强大的活码管理平台源码-支持淘宝客和分享卡片等功能 演示地址&#xff1a;runruncode.com/code/19494.html 首页 查看群活码、客服码、渠道码当天总访问量查看成员账号个数查看群活码、客服码、渠道码当天各时段访问量 群活码 创建、编辑、删除、分享群活码查看群活码…

使用TensorFlow训练深度学习模型实战(上)

大家好&#xff0c;尽管大多数关于神经网络的文章都强调数学&#xff0c;而TensorFlow文档则强调使用现成数据集进行快速实现&#xff0c;但将这些资源应用于真实世界数据集是很有挑战性的&#xff0c;很难将数学概念和现成数据集与我的具体用例联系起来。本文旨在提供一个实用…