Flink程序富函数中使用定时任务查询mysql出现内存堆积
问题描述
flink程序在跑的过程中, 发现跑几天就停掉了, 看日志发现是代码中的ResultSet和Statement堆积的太多,引起的内存溢出,但是代码中确实已经关闭了ResultSet和Statement,但是还是一直出现OOM的问题
具体案例
在open方法中的案例
override def open(parameters: Configuration): Unit = {parameterTool = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]queryMysql()TimerUtil.schedule(30000, 30000, queryMysql())}
说明
定时任务中链接问题:1.connection链接 ,这个放在close方法里面进行关闭即可;2.ResultSet链接 , 即用即关;3.Statement链接 ,即用即关;
例如1:
def getMysqlEvent(): Map[Int, mutable.Set[String]] = {val sql = "SELECT * FROM test WHERE status = false"var statement: Statement = nullvar result: ResultSet = nulltry {statement = connection.createStatement()result = statement.executeQuery(sql)val map: mutable.Map[Int, mutable.Set[String]] = mutable.Map()while (result.next()) {val projectId = result.getInt(1)}eventMap.toMap} catch {case e: Exception =>e.printStackTrace()throw new Exception("error", e)} finally {DbUtils.close(result)DbUtils.close(statement)}}
例如2
(重点关注代码中的while循环那块,会产生内存堆积就在这块):
def getStudents(): util.HashMap[Int, util.List[Student]] = {val sql = s"""select id from test01""".stripMarginvar eventState: Statement = nullvar eventResult: ResultSet = nullval map = new util.HashMap[Int, util.List[Student]]()val a11 = new util.ArrayList[Student]()val bbb = new util.ArrayList[Mater]try {eventState = connection.createStatement()eventResult = eventState.executeQuery(sql)while (eventResult.next()) {var attrStatement: Statement = nullvar resultSet: ResultSet = nulltry {val id = eventResult.getInt(1)val projectId = eventResult.getInt(2)val attrSql =s"""select * from aaa """.stripMarginattrStatement = connection.createStatement()resultSet = attrStatement.executeQuery(attrSql)while (resultSet.next()) {val id = resultSet.getInt(1)}} catch {case e: Exception => throw new Exception("Error ", e)}finally {DbUtils.close(resultSet)DbUtils.close(attrStatement)}}ttt}catch {case e: Exception => throw new Exception("Error ", e)}finally {DbUtils.close(eventResult)DbUtils.close(eventState)}}
解释:
在例1中 ,整个方法中只有一次sql查询,所以直接在代码最后面关闭即可在例2中,整个方法中有两次sql查询,第二次sql查询会把第一次sql查询的结果进行循环遍历,然后再进行查询,,注意 while循环 , 这块会产生多个Statement和ResultSet对象,所以需要再代码中每一次查询结束后就会进行一次数据的查询