PostgreSQL数据库FDW——Parquet S3 DefaultParquetReader类

news/2024/12/2 15:55:47/

S3RandomAccessFile

S3RandomAccessFile类定义在parquet_s3_fdw.hpp,用于访问s3对象存储的类。其成员函数定义在parquet_s3_fdw.cpp文件中,S3RandomAccessFile构造函数用于初始化private成员(offset设置为0,isclosed设置为false)。

class S3RandomAccessFile : public arrow::io::RandomAccessFile{private:Aws::String bucket_;Aws::String object_;Aws::S3::S3Client *s3_client_;int64_t offset;bool isclosed;public:S3RandomAccessFile(Aws::S3::S3Client *s3_client, const Aws::String &bucket, const Aws::String &object);arrow::Status Close();arrow::Result<int64_t>Tell() const;       // return offsetbool closed() const;                      // return isclosedarrow::Status Seek(int64_t position);     // offset = positionarrow::Result<int64_t> Read(int64_t nbytes, void* out);arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes);arrow::Result<int64_t> GetSize();
};

Read函数首先初始化object_request,调用s3_client_->GetObject尝试获取数据,然后通过stringstream,将数据读取到out缓冲区中。

arrow::Result<int64_t> S3RandomAccessFile::Read(int64_t nbytes, void* out){Aws::S3::Model::GetObjectRequest object_request;object_request.WithBucket(bucket_.c_str()).WithKey(object_.c_str());string bytes = "bytes=" + to_string(offset) + "-" + to_string(offset + nbytes - 1);object_request.SetRange(bytes.c_str());object_request.SetBucket(this->bucket_);object_request.SetKey(this->object_);object_request.SetResponseStreamFactory([](){ return Aws::New<Aws::StringStream >(S3_ALLOCATION_TAG); });Aws::S3::Model::GetObjectOutcome get_object_outcome = this->s3_client_->GetObject(object_request);if (!get_object_outcome.IsSuccess()) {auto err = get_object_outcome.GetError();Aws::String msg = "GetObject failed. " + err.GetExceptionName() + ": " + err.GetMessage();return arrow::Status(arrow::StatusCode::IOError, msg.c_str());}int64_t n_read = get_object_outcome.GetResult().GetContentLength();offset += n_read;std::stringstream string_stream;string_stream << get_object_outcome.GetResult().GetBody().rdbuf();string_stream.read((char*)out, n_read);return n_read;
}

另外一个read函数通过调用上一个read函数获取数据,然后封装到arrow::Buffer中返回。

arrow::Result<std::shared_ptr<arrow::Buffer>> S3RandomAccessFile::Read(int64_t nbytes){char *out = (char*)malloc(nbytes);arrow::Result<int64_t> res = this->Read(nbytes, out);int64_t n = res.ValueOrDie();std::shared_ptr<arrow::Buffer> buf = make_shared<arrow::Buffer>((const uint8_t*)out, n);return buf;
}

GetSize函数通过HeadObjectRequest获取数据大小。

arrow::Result<int64_t> S3RandomAccessFile::GetSize(){Aws::S3::Model::HeadObjectRequest headObj;headObj.SetBucket(bucket_); headObj.SetKey(object_);auto object = this->s3_client_->HeadObject(headObj);if (!object.IsSuccess()){return arrow::Status(arrow::StatusCode::IOError, "HeadObject failed");}int64_t fileSize = object.GetResultWithOwnership().GetContentLength();return fileSize;
}

ReaderCacheEntry

arrow::FileReader类作为Arrow读取适配器类,用于将Parquet文件反序列化为Arrow行批处理。该接口满足不同的用例,因此提供了不同的接口。在其最简单的形式中,我们迎合了希望使用FileReader::ReadTable方法一次读取整个Parquet数据。更高级的用户如果也希望在每个Parquet文件上实现并行性,应该在RowGroup级别执行此操作。为此,他们可以调用FileReader::RowGroup(i)->ReadTable,以仅接收指定的RowGroup作为表。在最高级的情况下,消费者希望并行独立读取RowGroup并单独使用每一列,他们可以调用FileReader::RowGroup(i)->column(j)->read并接收arrow::column实例。parquet格式支持可分配给字段field的可选整数field_id。Arrow会将这些字段id转换为相应字段上名为PARQUET:field_id的元数据键。Arrow read adapter class for deserializing Parquet files as Arrow row batches. This interfaces caters for different use cases and thus provides different interfaces. In its most simplistic form, we cater for a user that wants to read the whole Parquet at once with the FileReader::ReadTable method. More advanced users that also want to implement parallelism on top of each single Parquet files should do this on the RowGroup level. For this, they can call FileReader::RowGroup(i)->ReadTable to receive only the specified RowGroup as a table. In the most advanced situation, where a consumer wants to independently read RowGroups in parallel and consume each column individually, they can call FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column instance. The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.
从下面注释中可以看出这里为何需要用FileReaderCache类包装std::unique_ptr<parquet::arrow::FileReader>。We would like to cache FileReader. When creating new hash entry, the memory of entry is allocated by PostgreSQL core. But FileReader is a unique_ptr. In order to initialize it in parquet_s3_fdw, we define FileReaderCache class and the cache entry has the pointer of this class.

class FileReaderCache{public: std::unique_ptr<parquet::arrow::FileReader> reader;
};

FileReaderHash变量为指向ReaderCache HTAB的指针,ReaderCacheKey为其key,ReaderCacheEntry为其键。

static HTAB *FileReaderHash = NULL;
typedef struct ReaderCacheKey{char dname[256]; char fname[256];
} ReaderCacheKey;
typedef struct ReaderCacheEntry{ReaderCacheKey key;			/* hash key (must be first) */FileReaderCache *file_reader;arrow::MemoryPool *pool;
} ReaderCacheEntry;

parquetGetFileReader函数定义在parquet_s3_fdw_connection.cpp文件中,用于获取ReaderCacheEntry(ReaderCacheKey为其key,ReaderCacheEntry为其键)。

ReaderCacheEntry *parquetGetFileReader(Aws::S3::S3Client *s3client, char *dname, char *fname){bool		found;ReaderCacheEntry *entry; ReaderCacheKey key = {0};	if (FileReaderHash == NULL){ /* First time through, initialize connection cache hashtable */HASHCTL		ctl; MemSet(&ctl, 0, sizeof(ctl));ctl.keysize = sizeof(ReaderCacheKey); ctl.entrysize = sizeof(ReaderCacheEntry);		ctl.hcxt = CacheMemoryContext; /* allocate ConnectionHash in the cache context */FileReaderHash = hash_create("parquet_s3_fdw file reader cache", 8, &ctl,
#if (PG_VERSION_NUM >= 140000)HASH_ELEM | HASH_BLOBS);
#elseHASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
#endif/* Register some callback functions that manage connection cleanup. This should be done just once in each backend. */CacheRegisterSyscacheCallback(FOREIGNSERVEROID, parquet_fdw_inval_callback, (Datum) 0);CacheRegisterSyscacheCallback(USERMAPPINGOID, parquet_fdw_inval_callback, (Datum) 0);}/* Create hash key for the entry.  Assume no pad bytes in key struct */strcpy(key.dname, dname); strcpy(key.fname, fname);/* Find or create cached entry for requested connection. */entry = (ReaderCacheEntry *) hash_search(FileReaderHash, &key, HASH_ENTER, &found);if (!found){/* We need only clear "file_reader" here; remaining fields will be filled later when "file_reader" is set. */entry->file_reader = NULL;}/* If cache entry doesn't have a reader, we have to establish a new reader. */if (entry->file_reader == NULL || entry->file_reader->reader == nullptr){ // 构造新的arrow::FileReaderstd::unique_ptr<parquet::arrow::FileReader> reader;entry->pool = arrow::default_memory_pool();std::shared_ptr<arrow::io::RandomAccessFile> input(new S3RandomAccessFile(s3client, dname, fname)); // 使用上述S3RandomAccessFile类构造arrow::io::RandomAccessFilearrow::Status status = parquet::arrow::OpenFile(input, entry->pool, &reader);if (!status.ok())  throw Error("failed to open Parquet file %s", status.message().c_str());if (!entry->file_reader) entry->file_reader = new FileReaderCache();entry->file_reader->reader = std::move(reader);elog(DEBUG3, "parquet_s3_fdw: new parquet file reader for s3handle %p %s/%s", s3client, dname, fname);}return entry;
}

DefaultParquetReader

DefaultParquetReader类继承自ParquetReader类,其包含了用于封装row group的table成员,chunk_info中包含了row group中列数据的ChunkInfo信息,chunks中包含了row group中列数据。

class DefaultParquetReader : public ParquetReader{
private:std::shared_ptr<arrow::Table>   table; /* Current row group */struct ChunkInfo{int     chunk;      /* current chunk number */int64   pos;        /* current pos within chunk */int64   len;        /* current chunk length */ChunkInfo (int64 len) : chunk(0), pos(0), len(len) {}};std::vector<ChunkInfo> chunk_info;  /* current chunk and position per-column */std::vector<arrow::Array *>     chunks; /* Plain pointers to inner the structures of row group. It's needed to prevent excessive shared_ptr management. */int             row_group;          /* current row group index */uint32_t        row;                /* current row within row group */uint32_t        num_rows;           /* total rows in row group */
...

open

open成员函数主要工作就是获取FileReader,有形参的函数调用上一节介绍的parquetGetFileReader函数,没有形参的函数直接使用arrow::FileReader::Make创建FileReader,不涉及ReaderCache对FileReader的缓存。

    void open(const char *dirname, Aws::S3::S3Client *s3_client){arrow::Status   status;char *dname; char *fname;parquetSplitS3Path(dirname, filename.c_str(), &dname, &fname);this->reader_entry = parquetGetFileReader(s3_client, dname, fname);elog(DEBUG1, "parquet_s3_fdw: open Parquet file on S3. %s%s", dname, fname);pfree(dname); pfree(fname);this->reader = std::move(this->reader_entry->file_reader->reader); // this->reader_entry和this->reader的关系      this->reader->set_use_threads(this->use_threads && parquet_fdw_use_threads); /* Enable parallel columns decoding/decompression if needed */}void open() {arrow::Status   status;std::unique_ptr<parquet::arrow::FileReader> reader;status = parquet::arrow::FileReader::Make( arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(filename, use_mmap), &reader);if (!status.ok()) throw Error("parquet_s3_fdw: failed to open Parquet file %s", status.message().c_str());this->reader = std::move(reader);        this->reader->set_use_threads(this->use_threads && parquet_fdw_use_threads); /* Enable parallel columns decoding/decompression if needed */}

next

next函数用于对外返回rowgroup提取的记录(通过read_next_rowgroup函数),并将其放置到TupleTableSlot中(通过populate_slot函数,populate_slot函数也会提取记录)。

    ReadStatus next(TupleTableSlot *slot, bool fake=false){allocator->recycle();if (this->row >= this->num_rows) {/* Read next row group. We do it in a loop to skip possibly empty row groups. */do {if (!this->read_next_rowgroup())return RS_EOF;} while (!this->num_rows);}this->populate_slot(slot, fake);this->row++;return RS_SUCCESS;}

read_next_rowgroup提取数据到chunk

read_next_rowgroup函数首先需要确定row_group序号,如果是并行foreign scan,需要确定本DefaultParquetReader reader_id标识在coordinator类中所存储的rowgroup计数器的值;否则是串行foreign scan,直接递增DefaultParquetReader的row_group计数值。通过row_group计数值计数值确定DefaultParquetReader类保存的this->rowgroups[this->row_group]rowgroups号。利用reader->RowGroup(rowgroup)->ReadTable(this->indices, &this->table)函数获取rowgroup数据。通过遍历查询涉及project column,提取row group中的列数据和其长度,并保存到chunk_info和chunk中。

    bool read_next_rowgroup() {arrow::Status               status;       if (coordinator) { /* In case of parallel query get the row group index from the coordinator. Otherwise just increment it. */coordinator->lock();if ((this->row_group = coordinator->next_rowgroup(reader_id)) == -1){coordinator->unlock();return false;}coordinator->unlock();} elsethis->row_group++;/* row_group cannot be less than zero at this point so it is safe to cast it to unsigned int  */if ((uint) this->row_group >= this->rowgroups.size())  return false;int  rowgroup = this->rowgroups[this->row_group];auto rowgroup_meta = this->reader->parquet_reader()->metadata()->RowGroup(rowgroup);status = this->reader->RowGroup(rowgroup)->ReadTable(this->indices, &this->table);if (!status.ok()) throw Error("parquet_s3_fdw: failed to read rowgroup #%i: %s", rowgroup, status.message().c_str());if (!this->table) throw std::runtime_error("parquet_s3_fdw: got empty table");/* TODO: don't clear each time */this->chunk_info.clear(); this->chunks.clear();for (uint64_t i = 0; i < types.size(); ++i){  // 遍历查询涉及project columnconst auto &column = this->table->column(i);int64 len = column->chunk(0)->length();this->chunk_info.emplace_back(len);              // 列数据长度信息this->chunks.push_back(column->chunk(0).get());  // 列数据}this->row = 0;this->num_rows = this->table->num_rows();return true;}

populate_slot从chunks中获取记录组装成TupleTableSlot

populate_slot函数用于从parquet row中提取的数据封装成TupleTableSlot。如果是无schema模式,则调用schemaless_populate_slot函数。否则就是正常指定schema模式,仅仅关注targetlist或clauses涉及的列,slot中未涉及的列全部设置为null;

    /* populate_slot Fill slot with the values from parquet row.* If `fake` set to true the actual reading and populating the slot is skipped.* The purpose of this feature is to correctly skip rows to collect sparse* samples. */void populate_slot(TupleTableSlot *slot, bool fake=false){if (this->schemaless == true) {schemaless_populate_slot(slot, fake); return;}/* Fill slot values */for (int attr = 0; attr < slot->tts_tupleDescriptor->natts; attr++) {int arrow_col = this->map[attr]; /* We only fill slot attributes if column was referred in targetlist or clauses. In other cases mark attribute as NULL. */if (arrow_col >= 0){ChunkInfo   &chunkInfo = this->chunk_info[arrow_col];arrow::Array *array = this->chunks[arrow_col];TypeInfo    &typinfo = this->types[arrow_col];if (chunkInfo.pos >= chunkInfo.len) { // chunk中数据已经读取完成const auto &column = this->table->column(arrow_col);           if (++chunkInfo.chunk >= column->num_chunks()) /* There are no more chunks */break;array = column->chunk(chunkInfo.chunk).get();this->chunks[arrow_col] = array;chunkInfo.pos = 0;chunkInfo.len = array->length();}if (fake) continue; /* Don't do actual reading data into slot in fake mode */if (array->IsNull(chunkInfo.pos)) { // 确定该列数据是否为nullslot->tts_isnull[attr] = true; chunkInfo.pos++; continue;}slot->tts_isnull[attr] = false;/* Currently only primitive types and lists are supported */ // 根据arrow列类型进行相应的转换为postgres类型数据switch (typinfo.arrow.type_id) {case arrow::Type::LIST:{arrow::ListArray   *larray = (arrow::ListArray *) array;slot->tts_values[attr] = this->nested_list_to_datum(larray, chunkInfo.pos, typinfo);break;}case arrow::Type::MAP: {arrow::MapArray* maparray = (arrow::MapArray*) array;slot->tts_values[attr] = this->map_to_datum(maparray, chunkInfo.pos, typinfo);break;}default: slot->tts_values[attr] =  this->read_primitive_type(array, typinfo, chunkInfo.pos);}chunkInfo.pos++;}else{slot->tts_isnull[attr] = true;}}}};

schemaless_populate_slot函数用于在无schema模式下从parquet row中提取的数据封装成TupleTableSlot。主要依赖read_schemaless_column从chunks中获取一条记录,并将其组装成jsonb;并调用allocator函数在内存上下文中申请空间,将数据拷贝其中,再设置到slot的tts_values中。

    /* schemaless_populate_slot      Fill slot with the jsonb values made from parquet row.* If `fake` set to true the actual reading and populating the slot is skipped.* The purpose of this feature is to correctly skip rows to collect sparse* samples. */void schemaless_populate_slot(TupleTableSlot *slot, bool fake=false){memset(slot->tts_isnull, true, sizeof(bool) * slot->tts_tupleDescriptor->natts);for (int attr = 0; attr < slot->tts_tupleDescriptor->natts; attr++) {Jsonb *result;Form_pg_attribute attr_desc = TupleDescAttr(slot->tts_tupleDescriptor, attr);if (attr_desc->attisdropped || attr_desc->atttypid != JSONBOID)  continue;result = read_schemaless_column(fake); // 从chunks中获取一条记录,并将其组装成jsonb/* Copy jsonb into memory block allocated by FastAllocator to prevent its destruction though to be able to recycle it once it fulfilled its purpose. */void *res = allocator->fast_alloc(VARSIZE_ANY(result));memcpy(res, (Jsonb *) result, VARSIZE_ANY(result));pfree((Jsonb *) result);slot->tts_isnull[attr] = false;slot->tts_values[attr] = (Datum) res;break;}}

read_schemaless_column函数主要功能是从chunks中获取一条记录,并将其组装成jsonb。

    /* Get a record and contruct it to an jsonb value */Jsonb *read_schemaless_column(bool fake=false) {JsonbParseState *jb_pstate = NULL; JsonbValue     *result;        result = pushJsonbValue(&jb_pstate, WJB_BEGIN_OBJECT, NULL); /* Fill jsonb values */for (size_t i = 0; i < this->column_names.size(); i++) {char *key = pstrdup(this->column_names[i].c_str());bool    isnull = false; Datum value = (Datum) 0;ChunkInfo   &chunkInfo = this->chunk_info[i]; // 第i列数据信息arrow::Array *array = this->chunks[i];    // 第i列数据chunk     TypeInfo    &typinfo = this->types[i];    // 第i列数据类型转换信息int sorted_col = this->sorted_col_map[i]; // 第i列sorted columnOid         value_type = InvalidOid; Oid			typoutput; bool		typIsVarlena;          FmgrInfo    finfo;if (chunkInfo.pos >= chunkInfo.len){ // 该chunk中没有数据了,尝试获取下一个chunkconst auto &column = this->table->column(chunkInfo.pos);        if (++chunkInfo.chunk >= column->num_chunks()) break; /* There are no more chunks */ // 没有数据了array = column->chunk(chunkInfo.chunk).get(); // 获取下一个chunk数据this->chunks[i] = array;chunkInfo.pos = 0; chunkInfo.len = array->length();}if (fake) continue; /* Don't do actual reading data into slot in fake mode */if (strlen(key) == 0) throw std::runtime_error("key is null");if (!array->IsNull(chunkInfo.pos)){/* Currently only primitive types, lists and map are supported */switch (typinfo.arrow.type_id) {case arrow::Type::LIST: {arrow::ListArray   *larray = (arrow::ListArray *) array;value = this->nested_list_to_jsonb_datum(larray, chunkInfo.pos, typinfo);value_type = JSONBOID;break;}case arrow::Type::MAP:{arrow::MapArray* maparray = (arrow::MapArray*) array;value = this->map_to_datum(maparray, chunkInfo.pos, typinfo);value_type = JSONBOID;break;}default:{value_type = to_postgres_type(typinfo.arrow.type_id);value = this->read_primitive_type(array, typinfo, chunkInfo.pos);}}getTypeOutputInfo(value_type, &typoutput, &typIsVarlena);fmgr_info(typoutput, &finfo);if (sorted_col >= 0) {this->sorted_cols_data[sorted_col].val = value;this->sorted_cols_data[sorted_col].is_null = false;}} else {isnull = true;elog(DEBUG2, "key %s is null", key);}/* TODO: adding cstring would be cheaper than adding text */push_jsonb_string_key(jb_pstate, key);datum_to_jsonb(value, value_type, isnull, &finfo, jb_pstate, WJB_VALUE);chunkInfo.pos++;}result = pushJsonbValue(&jb_pstate, WJB_END_OBJECT, NULL);return JsonbValueToJsonb(result);}

http://www.ppmy.cn/news/11895.html

相关文章

yolov5+车道线检测

目标检测与车道线检测在自动驾驶以及车辆定位中起着重要的辅助作用&#xff0c;是环境感知中不可缺少的一个部分。基于深度学习的车道线检测方法近年来也在不断的提升&#xff0c;比如论文&#xff1a;Ultra Fast Deep Lane Detection with HybridAnchor Driven Ordinal Classi…

[ 环境搭建篇 ] 安装 java 环境并配置环境变量(附 JDK1.8 安装包)

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

包体积优化 · 实战论 · 怎么做包体优化? 做好能晋升吗? 能涨多少钱?

“ 【小木箱成长营】包体积优化系列文章&#xff1a; 包体积优化 方法论 揭开包体积优化神秘面纱 包体积优化 工具论 初识包体积优化 BaguTree 包体积优化录播视频课 ”一、引言 Hello&#xff0c;我是小木箱&#xff0c;欢迎来到小木箱成长营系列教程&#xff0c;今天将分…

锁策略和synchronized

1.常见的锁策略&#xff08;1&#xff09;乐观锁 和 悲观锁乐观锁&#xff1a;预测锁竞争的情况不激烈&#xff08;工作量较少&#xff09;悲观锁&#xff1a;预测锁竞争的情况很激烈&#xff08;工作量较多&#xff09;&#xff08;2&#xff09;轻量级锁 和 重量级锁轻量级锁…

【每日一题】【LeetCode】【第十二天】区域和检索 - 数组不可变

解决之路 题目描述 测试案例&#xff08;部分&#xff09; 第一次 emmm&#xff0c;说实话&#xff0c;一开始我还真没看懂题目是什么意思。。。。 自己按我自己理解的方式写了一下代码&#xff0c;用测试案例跑了下&#xff0c;成功了。 不过&#xff0c;放进去跑不通&…

【Linux】六、Linux 基础IO(一)|重谈文件|C语言文件操作|操作系统文件操作(系统文件I/O)|文件描述符

目录 一、重谈文件 二、C语言文件操作 2.1 重谈C语言文件操作 2.2 补充细节 三、操作系统文件操作&#xff08;系统文件I/O&#xff09; 3.1 文件相关系统调用&#xff1a;close 3.2 文件相关系统调用&#xff1a;open 3.2.1 open 的第二个参数 flags 3.2.2 open 的第…

自定义类型:结构体,枚举,联合(2)

TIPS 1. 类型的定义可以考虑放在头文件里头。 2. 一个汉字存储的时候占两个字节空间 3. 关于结构体变量初始化的一些细节 4. 关于结构体内存对齐的补充 1. 2. S1和S2类型的成员一模一样&#xff0c;但是S1和S2所占空间的大小有了一些区别。 3. 这两个结构体类型成员都…

C++ Socket 构造函数参数解析

int socket(int af, int type, int protocol); 1、) af 为地址族&#xff08;Address Family&#xff09;&#xff0c;也就是 IP 地址类型&#xff0c;常用的有 AF_INET 和 AF_INET6。AF 是“Address Family”的简写&#xff0c;INET是“Inetnet”的简写。AF_INET 表示 IPv4 地…