spark 读es

embedded/2024/11/15 0:57:33/

idea maven 依赖

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.11.1</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.1.1</version>
</dependency>

import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future
import org.apache.http.HttpResponse
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark

import java.io.{BufferedWriter, FileWriter}


object ReadEs {
def main(args: Array[String]): Unit = {

val spark = SparkSession
.builder()
//.master("local[1]") //本地跑打开
.appName("read es")
.config("pushdown", "true")
.config("es.nodes", "host1,host2,host3")
.config("es.port", "9200")
.config("es.index.auto.create", "true")
.config("es.index.read.missing.as.empty", "yes")
.config("es.mapping.date.rich", "false")
.getOrCreate()


val query =
s"""
{
  "query": {
    "match_all": {}
  }
}
|""".stripMargin


val value = EsSpark.esRDD(spark.sparkContext, "index*", query)


value.map(it => {
val x= it._2("x").toString
val t= it._2("t").toString.toInt

(x, t)
})
.reduceByKey(_+_)
.foreachPartition(mp=>{
//写本地文件
val file = new BufferedWriter(new FileWriter("data/1.txt", true))

mp.foreach(it => {
val t= it._2
val x= it._1

file.append(s"$x ${t}\n")
})

file.flush()
file.close()
})


spark.stop()
}
}


http://www.ppmy.cn/embedded/114158.html

相关文章

Python和C++气候模型算法模型气候学模拟和统计学数据可视化及指标评估

&#x1f3af;要点 贝叶斯推理气候模型辐射对流及干湿能量平衡模型时间空间气象变化预测模型评估统计指标气象预测数据变换天气和气象变化长短期影响预估降低气候信息尺度评估算法气象行为模拟&#xff1a;碳循环、辐射强迫和温度响应温室气体排放碳循环温室诱导气候变化评估气…

企业微信应用消息收发实施记录

一、前置配置 1.1 进入我的企业页面&#xff0c;记录下企业ID。 1.2 创建企微应用&#xff0c;记录下应用的 AgentId 和 Secret。 1.3 设置应用的企业可信IP&#xff0c;将服务器公网 IP 填入即可。 1.4 设置应用接收消息API 填入服务器 API 地址&#xff0c;并记录下随机获取…

oracle查询历史操作记录

示例&#xff1a; SELECTsubstr( a.sql_text, 1, 256 ) "SQL Text",( SELECT b1.username FROM all_users b1 WHERE b1.user_id a.parsing_user_id ) "Parsing User Name",a.users_executing "Users Executing",a.rows_processed "Rows P…

激光干涉仪的系统校准时需要注意的关键步骤

在进行激光干涉仪的系统校准时&#xff0c;以下是一些关键步骤和注意事项&#xff1a; 环境条件控制&#xff1a;确保测量环境的稳定性&#xff0c;控制温度、湿度和气压的变化&#xff0c;因为这些因素都可能影响激光的传播和干涉图的形成。预热&#xff1a;在开始校准前&…

Golang | Leetcode Golang题解之第419题棋盘上的战舰

题目&#xff1a; 题解&#xff1a; func countBattleships(board [][]byte) (ans int) {for i, row : range board {for j, ch : range row {if ch X && !(i > 0 && board[i-1][j] X || j > 0 && board[i][j-1] X) {ans}}}return }

Kubernetes 安装网络插件flannel报错Init:ImagePullBackOff,flannel下载镜像报错问题解决

Kubernetes1.28安装网络插件flannel&#xff0c;报错Init:ImagePullBackOff &#xff0c;flannel安装下载镜像失败 问题 1.安装flannel kubectl apply -f https://github.com/flannel-io/flannel/releases/latest/download/kube-flannel.yml 2.flannel报错信息 执行查看安装…

828华为云征文 | 使用Flexus云服务器X实例部署GLPI资产管理系统

828华为云征文 | 使用Flexus云服务器X实例部署GLPI资产管理系统 1. 部署环境说明2. 部署基础环境2.1. 操作系统基本配置2.2. 部署Nginx2.3. 部署MySQL2.4. 部署PHP 3. 部署GLPI资产管理系统 1. 部署环境说明 本次环境选择使用华为云Flexus云服务器X实例&#xff0c;因为其具有高…

详解Linux中cat命令

在 Linux 命令的世界中&#xff0c;cat 命令就像是一位多才多艺的艺术家&#xff0c;它能够将文本文件的美妙旋律编织在一起&#xff0c;或者单独演奏它们的每一个音符。下面&#xff0c;让我们以一种充满情感的方式&#xff0c;用 Markdown 格式来探索 cat 命令的多种用途。 …