Envoy:httpfilter相关代码阅读

news/2025/1/9 0:06:18/

本篇文章是envoy httpfilter相关代码阅读的整理和总结,笔者试图通过这篇文章将http filter在envoy内部的管控讲清楚,并且将request和response是如何使用这部分 http filter功能的流程介绍清楚。

httpfilter是netfilter中的一种filter,因为envoy对http支持的细粒度管控很全面,所以将httpfilter又做了一层只是针对http协议的filter chain的管控处理逻辑。

httpfilter 在envoy中采用的是生产者和消费者的处理模式,通过配置文件或者xds协议的配置数据将http filter相关的信息,存放到固定的列表中,在有消息request和response到来的时候,通过异步事件触发对应的响应函数,进而从这些列表中取出对应的filter,依次执行filter的功能,达到使用http filter的目的。

f2bd6310e70a843a1616ceefca2a1ac1.png

一、生产者部分的逻辑:

在envoy初始化的时候,或者更新httpfilter配置的时候,通过httpconnectionManagerconfig依次将httpfilter存放到filter_factories中。

逻辑代码如下所示:

Network::FilterFactoryCb
HttpConnectionManagerFilterConfigFactory::createFilterFactoryFromProtoTyped()
---→
std::shared_ptr<HttpConnectionManagerConfig>  Utility::createConfig()
---→ 
std::make_shared<HttpConnectionManagerConfig>() { ......// 操作的是http_filtersconst auto& filters = config.http_filters(); DependencyManager dependency_manager;for (int32_t i = 0; i < filters.size(); i++) {processFilter(filters[i], i, "http", "http", i == filters.size() - 1, filter_factories_,dependency_manager);}......
}
----→ 
void HttpConnectionManagerConfig::processFilter() {......auto* factory =Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(proto_config, proto_config.is_optional());ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(proto_config, context_.messageValidationVisitor(), *factory);Http::FilterFactoryCb callback =factory->createFilterFactoryFromProto(*message, stats_prefix_, context_);......// 这里将filterfactorycb存放到filter_factories中filter_factories.push_back(std::move(filter_config_provider));
}

二、消费者部分的逻辑

事件响应函数,在触发onFileEvent之后,有一个环节会调用createFilterChain()去消费filter_factories中的filterfactorycb函数,并通过这些已经注册好的cb函数,将http filter添加到decoder_filter 或者encoder_filter中。

ConnectionImpl::onFileEvent()-→
.......
------>
Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
......active_request.request_decoder_->decodeHeaders(std::move(headers), false);
......
}
----→ 
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
......const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
......
}
---→ 
bool FilterManager::createFilterChain() {
......if (upgrade != nullptr) {const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),upgrade_map, *this)) {filter_manager_callbacks_.upgradeFilterChainCreated();return true;} else {upgrade_rejected = true;// Fall through to the default filter chain. The function calling this// will send a local reply indicating that the upgrade failed.}}
filter_chain_factory_.createFilterChain(*this);
......
}
----→ 
HttpConnectionManagerConfig::createFilterChain()
HttpConnectionManagerConfig::createUpgradeFilterChain() 
----→ 
HttpConnectionManagerConfig::createFilterChainForFactories() {......for (const auto& filter_config_provider : filter_factories) {auto config = filter_config_provider->config();if (config.has_value()) {// 这里对应的是http_filter 创建的工厂调用函数里面的FilterFactoryCb函数config.value()(callbacks);continue;}
......
}
-----→
以BandwidthLimitFilterConfig 为例子,config.value()(callbacks) 对应的是:
[filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { // 这里的callbacks对应的是fiter_managercallbacks.addStreamFilter(std::make_shared<BandwidthLimiter>(filter_config))
} //callback对应的是fiter_manager,所以这里调用的是下面的函数:
----→ 
void addStreamFilter(StreamFilterSharedPtr filter) override {addStreamDecoderFilterWorker(filter, nullptr, true);addStreamEncoderFilterWorker(filter, nullptr, true);StreamDecoderFilter* decoder_filter = filter.get();filters_.push_back(decoder_filter);
}
---→
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_); 或者
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);或者同时添加进去。

三、encoder_filter和decoder_filter的消费逻辑

这里的入口都是通过libevent里面的消息响应事件,关联到读和写的相应函数,最终从这两个列表里面依次取出对应的http filter进而执行相应的filter里面的功能,达到使用filter chain的目的。

fbbebef06e07991e5a2daec75cfc12ba.png

消费encoder_filters_ 的流程:

ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→ 
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream) 
void encodeData(Buffer::Instance& data, bool end_stream) 
void encodeTrailers(ResponseTrailerMap& trailers)
----→ 
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers, bool end_stream)  
void encodeData(Buffer::Instance& data, bool end_stream)
void encodeTrailers(ResponseTrailerMapPtr&& trailers)
----→ 会去遍历encoder_filters去依次执行对应的
encodeHeaders()\encodeData()\encodeTrailers()函数

消费decoder_filters_的流程:

ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→  
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,bool end_stream)
void decodeData(Buffer::Instance& data, bool end_stream)
void decodeTrailers(RequestTrailerMapPtr&& trailers)
---→ 
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream) 
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
bool end_stream,
FilterIterationStartState filter_iteration_start_state)
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers)
--→ 会去遍历decoder_filters去依次执行对应的
decodeHeaders()\decodeData()\decodeTrailers()函数

补充代码信息:下面是onMessageCompleteBase函数在ServerConnectionImpl和ClientConnectionImpl中的详细代码信息

ParserStatus ServerConnectionImpl::onMessageCompleteBase() {ASSERT(!handling_upgrade_);if (active_request_.has_value()) {auto& active_request = active_request_.value();if (active_request.request_decoder_) {active_request.response_encoder_.readDisable(true);}active_request.remote_complete_ = true;if (deferred_end_stream_headers_) {active_request.request_decoder_->decodeHeaders(std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);deferred_end_stream_headers_ = false;} else if (processing_trailers_) {active_request.request_decoder_->decodeTrailers(std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));} else {Buffer::OwnedImpl buffer;active_request.request_decoder_->decodeData(buffer, true);}// Reset to ensure no information from one requests persists to the next.headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);}// Always pause the parser so that the calling code can process 1 request at a time and apply// back pressure. However this means that the calling code needs to detect if there is more data// in the buffer and dispatch it again.return parser_->pause();
}ParserStatus ClientConnectionImpl::onMessageCompleteBase() {ENVOY_CONN_LOG(trace, "message complete", connection_);if (ignore_message_complete_for_1xx_) {ignore_message_complete_for_1xx_ = false;return ParserStatus::Success;}if (pending_response_.has_value()) {ASSERT(!pending_response_done_);// After calling decodeData() with end stream set to true, we should no longer be able to reset.PendingResponse& response = pending_response_.value();// Encoder is used as part of decode* calls later in this function so pending_response_ can not// be reset just yet. Preserve the state in pending_response_done_ instead.pending_response_done_ = true;if (deferred_end_stream_headers_) {response.decoder_->decodeHeaders(std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);deferred_end_stream_headers_ = false;} else if (processing_trailers_) {response.decoder_->decodeTrailers(std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));} else {Buffer::OwnedImpl buffer;response.decoder_->decodeData(buffer, true);}// Reset to ensure no information from one requests persists to the next.pending_response_.reset();headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);}// Pause the parser after a response is complete. Any remaining data indicates an error.return parser_->pause();
}

参考文档:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/http/http_filters#arch-overview-http-filters


http://www.ppmy.cn/news/38540.html

相关文章

[Linux]环境变量

一.什么是环境变量 为了满足不同的运行场景&#xff0c;操作系统预先设置了一大批全局变量&#xff0c;这种可以指定操作系统运行环境的变量就是环境变量。 我们平常使用的指令本质上也是用C语言实现的一个个小程序&#xff0c;但是我们在执行我们自己的可执行程序时往往是类…

Vue路由

vue路由 文章目录vue路由1.vue-router简单使用1.1vue-router的下载1.2在main.js1.3router的配置1.4 路由的切换1.5路由指定位置的显示2.嵌套路由2.1 多级路由的配置2.2 多级路由的query传参2.3 多级路由的params传参2.4 接受路由的props参数2.编程试路由2.1 按钮的跳转2.2页面的…

ptrace注入游戏介绍

Android系统采用的是Linux内核&#xff0c;很多Linux系统上的技术都可以应用在Android系统上&#xff0c;Android系统上ptrace注入远程进程的技术就是其中一种。本章节将对ptrace注入的完整流程进行介绍。 一、ptrace函数介绍 ptrace注入技术的核心就是ptrace函数&#xff0c…

云计算基础——云存储技术简介

云存储的种类及其合适的应用 可以把云存储分成块存储与文件存储两类。 块存储 快速更改的单一文件系统针对单一文件大量写的高性能计算&#xff08;HPC&#xff09; 文件存储 文件及内容搜寻Tier-2 NAS多文件大量写入的应用数据大量读写的应用多个使用端都希望读取同一个文…

2007-2020年上市公司数字化转型数字化无形资产占比仅计算结果

1、时间&#xff1a;2007-2020年 2、范围&#xff1a;包括3600多家公司 3、方法说明&#xff1a; 借鉴祁怀锦等&#xff08;2020&#xff09;的方法&#xff0c;根据数字化相关词频手工识别企业数字化相关无形资产占比&#xff0c;相关词频在附件中。 据企业数字化转型的定…

Java 缺失的特性:扩展方法

作者&#xff1a;周密(之叶) 什么是扩展方法 扩展方法&#xff0c;就是能够向现有类型直接“添加”方法&#xff0c;而无需创建新的派生类型、重新编译或以其他方式修改现有类型。调用扩展方法的时候&#xff0c;与调用在类型中实际定义的方法相比没有明显的差异。 为什么需…

jsp056ssm客户关系管理系统的设计与实现hsg570685程序

1&#xff0e;系统登录&#xff1a;系统登录是管理员访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括管理员名、密码和验证码&#xff0c;然后对登录进来的管理员判断身份信息&#xff0c;判断是管理员管理员还是普通用户。 2&#xff0e;管理员管理&#xff1a…

【《C Primer Plus》读书笔记】第14章:结构和其他数据形式

【《C Primer Plus》读书笔记】第14章&#xff1a;结构和其他数据形式14.1 C 结构体定义结构结构体变量的初始化访问结构成员结构作为函数参数指向结构的指针复合字面量和结构&#xff08;C99&#xff09;伸缩型数组成员&#xff08;C99&#xff09;匿名结构&#xff08;C11&am…