本篇文章是envoy httpfilter相关代码阅读的整理和总结,笔者试图通过这篇文章将http filter在envoy内部的管控讲清楚,并且将request和response是如何使用这部分 http filter功能的流程介绍清楚。
httpfilter是netfilter中的一种filter,因为envoy对http支持的细粒度管控很全面,所以将httpfilter又做了一层只是针对http协议的filter chain的管控处理逻辑。
httpfilter 在envoy中采用的是生产者和消费者的处理模式,通过配置文件或者xds协议的配置数据将http filter相关的信息,存放到固定的列表中,在有消息request和response到来的时候,通过异步事件触发对应的响应函数,进而从这些列表中取出对应的filter,依次执行filter的功能,达到使用http filter的目的。
一、生产者部分的逻辑:
在envoy初始化的时候,或者更新httpfilter配置的时候,通过httpconnectionManagerconfig依次将httpfilter存放到filter_factories中。
逻辑代码如下所示:
Network::FilterFactoryCb
HttpConnectionManagerFilterConfigFactory::createFilterFactoryFromProtoTyped()
---→
std::shared_ptr<HttpConnectionManagerConfig> Utility::createConfig()
---→
std::make_shared<HttpConnectionManagerConfig>() { ......// 操作的是http_filtersconst auto& filters = config.http_filters(); DependencyManager dependency_manager;for (int32_t i = 0; i < filters.size(); i++) {processFilter(filters[i], i, "http", "http", i == filters.size() - 1, filter_factories_,dependency_manager);}......
}
----→
void HttpConnectionManagerConfig::processFilter() {......auto* factory =Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(proto_config, proto_config.is_optional());ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(proto_config, context_.messageValidationVisitor(), *factory);Http::FilterFactoryCb callback =factory->createFilterFactoryFromProto(*message, stats_prefix_, context_);......// 这里将filterfactorycb存放到filter_factories中filter_factories.push_back(std::move(filter_config_provider));
}
二、消费者部分的逻辑
事件响应函数,在触发onFileEvent之后,有一个环节会调用createFilterChain()去消费filter_factories中的filterfactorycb函数,并通过这些已经注册好的cb函数,将http filter添加到decoder_filter 或者encoder_filter中。
ConnectionImpl::onFileEvent()-→
.......
------>
Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
......active_request.request_decoder_->decodeHeaders(std::move(headers), false);
......
}
----→
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
......const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
......
}
---→
bool FilterManager::createFilterChain() {
......if (upgrade != nullptr) {const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),upgrade_map, *this)) {filter_manager_callbacks_.upgradeFilterChainCreated();return true;} else {upgrade_rejected = true;// Fall through to the default filter chain. The function calling this// will send a local reply indicating that the upgrade failed.}}
filter_chain_factory_.createFilterChain(*this);
......
}
----→
HttpConnectionManagerConfig::createFilterChain()
HttpConnectionManagerConfig::createUpgradeFilterChain()
----→
HttpConnectionManagerConfig::createFilterChainForFactories() {......for (const auto& filter_config_provider : filter_factories) {auto config = filter_config_provider->config();if (config.has_value()) {// 这里对应的是http_filter 创建的工厂调用函数里面的FilterFactoryCb函数config.value()(callbacks);continue;}
......
}
-----→
以BandwidthLimitFilterConfig 为例子,config.value()(callbacks) 对应的是:
[filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { // 这里的callbacks对应的是fiter_managercallbacks.addStreamFilter(std::make_shared<BandwidthLimiter>(filter_config))
} //callback对应的是fiter_manager,所以这里调用的是下面的函数:
----→
void addStreamFilter(StreamFilterSharedPtr filter) override {addStreamDecoderFilterWorker(filter, nullptr, true);addStreamEncoderFilterWorker(filter, nullptr, true);StreamDecoderFilter* decoder_filter = filter.get();filters_.push_back(decoder_filter);
}
---→
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_); 或者
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);或者同时添加进去。
三、encoder_filter和decoder_filter的消费逻辑
这里的入口都是通过libevent里面的消息响应事件,关联到读和写的相应函数,最终从这两个列表里面依次取出对应的http filter进而执行相应的filter里面的功能,达到使用filter chain的目的。
消费encoder_filters_ 的流程:
ParserStatus ServerConnectionImpl::onMessageCompleteBase()
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream)
void encodeData(Buffer::Instance& data, bool end_stream)
void encodeTrailers(ResponseTrailerMap& trailers)
----→
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers, bool end_stream)
void encodeData(Buffer::Instance& data, bool end_stream)
void encodeTrailers(ResponseTrailerMapPtr&& trailers)
----→ 会去遍历encoder_filters去依次执行对应的
encodeHeaders()\encodeData()\encodeTrailers()函数
消费decoder_filters_的流程:
ParserStatus ServerConnectionImpl::onMessageCompleteBase()
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,bool end_stream)
void decodeData(Buffer::Instance& data, bool end_stream)
void decodeTrailers(RequestTrailerMapPtr&& trailers)
---→
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream)
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
bool end_stream,
FilterIterationStartState filter_iteration_start_state)
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers)
--→ 会去遍历decoder_filters去依次执行对应的
decodeHeaders()\decodeData()\decodeTrailers()函数
补充代码信息:下面是onMessageCompleteBase函数在ServerConnectionImpl和ClientConnectionImpl中的详细代码信息
ParserStatus ServerConnectionImpl::onMessageCompleteBase() {ASSERT(!handling_upgrade_);if (active_request_.has_value()) {auto& active_request = active_request_.value();if (active_request.request_decoder_) {active_request.response_encoder_.readDisable(true);}active_request.remote_complete_ = true;if (deferred_end_stream_headers_) {active_request.request_decoder_->decodeHeaders(std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);deferred_end_stream_headers_ = false;} else if (processing_trailers_) {active_request.request_decoder_->decodeTrailers(std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));} else {Buffer::OwnedImpl buffer;active_request.request_decoder_->decodeData(buffer, true);}// Reset to ensure no information from one requests persists to the next.headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);}// Always pause the parser so that the calling code can process 1 request at a time and apply// back pressure. However this means that the calling code needs to detect if there is more data// in the buffer and dispatch it again.return parser_->pause();
}ParserStatus ClientConnectionImpl::onMessageCompleteBase() {ENVOY_CONN_LOG(trace, "message complete", connection_);if (ignore_message_complete_for_1xx_) {ignore_message_complete_for_1xx_ = false;return ParserStatus::Success;}if (pending_response_.has_value()) {ASSERT(!pending_response_done_);// After calling decodeData() with end stream set to true, we should no longer be able to reset.PendingResponse& response = pending_response_.value();// Encoder is used as part of decode* calls later in this function so pending_response_ can not// be reset just yet. Preserve the state in pending_response_done_ instead.pending_response_done_ = true;if (deferred_end_stream_headers_) {response.decoder_->decodeHeaders(std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);deferred_end_stream_headers_ = false;} else if (processing_trailers_) {response.decoder_->decodeTrailers(std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));} else {Buffer::OwnedImpl buffer;response.decoder_->decodeData(buffer, true);}// Reset to ensure no information from one requests persists to the next.pending_response_.reset();headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);}// Pause the parser after a response is complete. Any remaining data indicates an error.return parser_->pause();
}
参考文档:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/http/http_filters#arch-overview-http-filters