【原创】通过S3接口将海量文件索引导入elasticsearch

embedded/2025/3/29 8:30:11/

 在医院海量影像文件通过s3传到蓝光存储时,要找一个文件需要全部文件遍历一遍,效率非常非常低。

S3 是对象存储服务,本身不是专门为快速文件查找设计的,而 Elasticsearch 是搜索引擎,在查找特定文件或数据方面具有明显优势,以下是对这种情况的分析:

S3 查找文件慢的原因

  • 存储结构和查询方式:S3 是一种基于对象的存储系统,它将文件作为对象存储在存储桶中。虽然可以通过文件名或键来检索对象,但这种检索方式相对简单,不支持复杂的查询条件和高效的索引机制。如果要在大量文件中查找特定文件,可能需要遍历整个存储桶或使用前缀匹配等有限的方式,这在文件数量庞大时效率较低。
  • 网络延迟:S3 是基于网络的存储服务,数据传输需要通过网络进行。当请求查找文件时,需要与 S3 服务器进行通信,网络延迟会影响查找速度。特别是在跨地域访问或网络环境不稳定的情况下,网络延迟可能会更加明显,导致查找文件的时间延长。

Elasticsearch 查找文件快的原因

  • 倒排索引:Elasticsearch 使用倒排索引来存储和检索数据。倒排索引将文档中的每个词项映射到包含该词项的文档列表,这使得在查找特定关键词或短语时能够快速定位到相关文档。通过对文件内容进行索引,Elasticsearch 可以在瞬间返回包含特定关键词的文件列表,大大提高了查找效率。
  • 分布式架构和并行处理:Elasticsearch 通常以分布式集群的方式部署,可以将数据分布在多个节点上进行存储和处理。在进行查询时,它可以并行地在多个节点上执行搜索操作,然后将结果合并返回。这种分布式架构和并行处理能力使得 Elasticsearch 能够处理大量的数据,并在短时间内返回查询结果。
  • 丰富的查询功能:Elasticsearch 提供了丰富的查询 DSL(Domain Specific Language),支持各种复杂的查询条件,如全文搜索、精确匹配、范围查询、布尔查询等。用户可以根据自己的需求灵活组合查询条件,快速定位到所需的文件。同时,Elasticsearch 还支持模糊查询、同义词查询等高级功能,进一步提高了查找的准确性和灵活性。
  • 上代码,通过go实现导入索引。
package mainimport ("bytes""context""encoding/json""fmt""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/credentials""github.com/aws/aws-sdk-go-v2/service/s3""github.com/elastic/go-elasticsearch/v8""github.com/elastic/go-elasticsearch/v8/esapi""gopkg.in/ini.v1""log""crypto/tls""net/http""time"
)type S3Config struct {BucketName  stringAccessKey   stringSecretKey   stringEndpointURL string
}type ESConfig struct {Host       stringUser       stringPass       stringIndexName  stringSearchType string
}func readConfig() (S3Config, ESConfig) {cfg, err := ini.Load("config.ini")if err != nil {log.Fatalf("无法读取配置文件: %v", err)}s3Cfg := S3Config{BucketName:  cfg.Section("s3").Key("bucket_name").String(),AccessKey:   cfg.Section("s3").Key("access_key").String(),SecretKey:   cfg.Section("s3").Key("secret_key").String(),EndpointURL: cfg.Section("s3").Key("endpoint_url").String(),}esCfg := ESConfig{Host:       cfg.Section("elasticsearch").Key("host").String(),User:       cfg.Section("elasticsearch").Key("user").String(),Pass:       cfg.Section("elasticsearch").Key("password").String(),IndexName:  cfg.Section("elasticsearch").Key("index_name").String(),SearchType: cfg.Section("elasticsearch").Key("search_type").String(),}return s3Cfg, esCfg
}func getS3ETag(s3Client *s3.Client, bucketName, fileKey string) string {resp, err := s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{Bucket: aws.String(bucketName),Key:    aws.String(fileKey),})if err != nil {log.Printf("获取 %s 的ETag失败: %v", fileKey, err)return ""}etag := aws.ToString(resp.ETag)if len(etag) > 0 && etag[0] == '"' && etag[len(etag)-1] == '"' {etag = etag[1 : len(etag)-1]}return etag
}func fetchS3Files(s3Client *s3.Client, esClient *elasticsearch.Client, bucketName, indexName string) {paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{Bucket: aws.String(bucketName),})for paginator.HasMorePages() {page, err := paginator.NextPage(context.TODO())if err != nil {log.Printf("获取S3文件列表页失败: %v", err)continue}for _, obj := range page.Contents {fileKey := aws.ToString(obj.Key)log.Printf("导入索引:",fileKey)fileSize := aws.ToInt64(obj.Size)lastModified := obj.LastModifiedstorageClass := string(obj.StorageClass) // 修复点etag := getS3ETag(s3Client, bucketName, fileKey)fileData := map[string]interface{}{"file_key":      fileKey,"file_size":     fileSize,"last_modified": lastModified,"storage_class": storageClass,"etag":          etag,}fileDataJSON, err := json.Marshal(fileData)if err != nil {log.Printf("将文件数据转换为JSON失败: %v", err)continue}req := esapi.IndexRequest{Index:   indexName,Body:    bytes.NewReader(fileDataJSON), // 修复点Refresh: "true",}resp, err := req.Do(context.TODO(), esClient)if err != nil {log.Printf("将文件数据索引到Elasticsearch失败: %v", err)continue}defer resp.Body.Close()}}fmt.Println("S3 文件索引完成")
}func main() {s3Cfg, esCfg := readConfig()customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {return aws.Endpoint{URL:               s3Cfg.EndpointURL,SigningRegion:     "us-east-1", // 替换为你的实际regionHostnameImmutable: true,}, nil})awsCfg, err := config.LoadDefaultConfig(context.TODO(),config.WithRegion("us-east-1"), // 替换为你的实际regionconfig.WithEndpointResolverWithOptions(customResolver),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(s3Cfg.AccessKey,s3Cfg.SecretKey,"",)),)if err != nil {log.Fatalf("无法加载S3配置: %v", err)}s3Client := s3.NewFromConfig(awsCfg)esCfgOptions := elasticsearch.Config{Addresses: []string{esCfg.Host},Username:  esCfg.User,Password:  esCfg.Pass,Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true, // ⚠️ 跳过证书校验(不安全,仅限开发)},ResponseHeaderTimeout: 10 * time.Second,},}esClient, err := elasticsearch.NewClient(esCfgOptions)if err != nil {log.Fatalf("无法创建Elasticsearch客户端: %v", err)}fetchS3Files(s3Client, esClient, s3Cfg.BucketName, esCfg.IndexName)
}

 config.ini文件

[elasticsearch]
host = https://localhost:9200
user = elastic
password = UfI*****uq
index_name = test
search_type = wildcard[s3]
bucket_name = test
access_key = V4H***RPD6DB
secret_key = lHdm*********n9UjlS
endpoint_url = http://172.*.*.18:7480/
connect_timeout = 10


http://www.ppmy.cn/embedded/176681.html

相关文章

Python实现MySQL数据库对象的血缘分析

Python控制台的程序,实现遍历MySQL中所有的SQL对象(表、视图、用户定义函数、存储过程和触发器等),并取得它们之间之前的依赖性关系,并列出三张表,第一张表的第一列是所有的SQL对象名称,第二列是…

Debezium介绍

1.什么是Debezium Debezium 是一个开源的分布式平台,用于捕获数据库的变更事件(CDC,Change Data Capture)。它能够实时捕获数据库中的行级更改,并将这些更改作为事件流发送到消息中间件(如 Apache Kafka&a…

Categorical分布(分类分布):深度学习中的离散建模利器

Categorical分布:深度学习中的离散建模利器 引言 对于深度学习研究者来说,概率分布是模型设计和优化的基石。在许多生成模型中,如变分自编码器(VAE)及其变种VQ-VAE(Vector Quantized Variational Autoenc…

linux下配置allure的环境变量使之变为可执行文件

https://allurereport.org/docs/install-for-linux/ 操作步骤 1. 检查并删除已存在的符号链接(如果存在) 首先,检查/usr/bin/allure是否已经存在: ls -l /usr/bin/allure如果输出显示/usr/bin/allure已经存在,说明…

防重复请求方法总结 wx.request-微信小程序

在微信小程序中,为了防止 wx.request 的重复请求,可以通过以下几种方式来实现: 1.使用 wx.showLoading 和 wx.hideLoading 在请求中,使用 wx.showLoading 显示加载动画,请求完成后使用 wx.hideLoading 隐藏加载动画。…

Java 大视界 -- 基于 Java 的大数据机器学习模型的多模态融合技术与应用(143)

💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…

Kafka Snappy 压缩异常分析与解决方案

1. 问题描述 在使用 Kafka 进行消息发送时,遇到了以下异常: org.apache.kafka.common.KafkaException: java.lang.UnsatisfiedLinkError: /tmp/snappy-1.1.7-ee0a2284-1d05-4116-9ddc-a0d5d4b3f8cd-libsnappyjava.so: Error loading shared library ld…

C++类与对象的第一个简单的实战练习-3.24笔记

在哔哩哔哩学习的这个老师的C面向对象高级语言程序设计教程&#xff08;118集全&#xff09;讲的真的很不错 实战一&#xff1a; 情况一&#xff1a;将所有代码写到一个文件main.cpp中 #include<iostream> //不知道包含strcpy的头文件名称是什么,问ai可知 #include<…