【envoy源码走读Envoy的初始化

2023年 10月 10日 60.8k 0

Envoy作为容器sidecar被注入应用Pod内的主要流程如下:主要的组件就是Listener和Cluster,Listener负责监控Downstream发送的请求,最终由Cluster来选择Upstream中的某个节点来处理该请求。

image.png

Envoy会从控制面或者静态配置中获取路由的集群信息,保存在内存中,当请求从Downstream流向Upstream时,会通过Filter,根据路由和集群的配置,选择后端的节点建立连接,将请求发送出去。

接下来,我们首先来看下Envoy是如何初始化的。
image.png

main入口

Envoy进程的初始化入口位于source/exec/main.cc的main方法中,main方法是对source/exec/main_common.cc的静态方法Envoy::MainCommon::main的封装,同时可以传入PostServerHook函数回调,创建完成server后会调用。在main方法中创建了main_common对象(使用std::make_unique)。

int main(int argc, char** argv) {
  ... ...
  std::unique_ptr main_common;
  try {
    main_common = std::make_unique(argc, argv);
  ... ...
}

创建MainCommon

进入MainCommon构造方法中,通过初始化列表执行了成员options_base_

MainCommon::MainCommon(int argc, const char* const* argv)
    : options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
      base_(options_, real_time_system_, default_test_hooks_,prod_component_factory_,
      std::make_unique(),platform_impl_.threadFactory(),
      platform_impl_.fileSystem()) {}
  • options_的构造方法在source/server/opitions_impl.cc中,使用开源的 tclap 解析库。OptionsImpl 支持很多参数配置,具体的参数配置参考 operation/cli。
  • base_的构造方法MainCommonBase,会初始化全局的参数,接着调用服务进行初始化。
    • 首先执行configureHotRestarter处理重启新老进程间的热替换问题。
    • 然后创建ThreadLocal对象,用来作为线程局部存储空间;并创建stats_store_用来为stats分配存储空间。
    • 最后创建了最重要的一个对象Server::InstanceImpl

服务 InstanceImpl 初始化☆

InstanceImpl的实现在source/server/server.cc中,下面具体研究下,此类很重要,该文章本质上就是走读该类的具体实现!! ,同时在构造方法中创建Api::Impl api_对象作为底层dispatcher的系统操作执行接口层(应该就是对底层的封装,比如不同操作系统的thread、time、file、random、process等底层系统调用)。

启动参数bootstrap的初始化

InstanceImpl 在核心函数中先加载配置文件,通过参数 -c 配置。
配置文件是一个 json 格式,包括以下几项:

  • node:节点标识,会在管理服务器中呈现,用于标识目的(例如,头域中生成相应的字段)。
  • static_resources:指定静态资源配置。
  • dynamic_resources:动态发现服务源配置。
  • cluster_manager:该服务所有的上游群集的群集管理配置。
  • hds_config:服务健康检查配置。
  • flags_path:用于启动文件标志的路径。
  • stats_sinks:统计汇总设置
  • stats_config:配置内部处理统计信息。
  • stats_flush_interval:刷新到统计信息服务的周期时间。出于性能方面的考虑,Envoy锁定计数器,并且只是周期性地刷新计数器和计量器。如果未指定,则默认值为5000毫秒(5秒)。
  • watchdog:看门狗配置。
  • tracing:配置外置的追踪服务程序。如果没有指定,则不会执行追踪。
  • runtime:配置运行时配置分发服务程序。
  • admin: 配置本地管理的HTTP服务。
  • overload_manager:过载管理配置(资源限制)。

InstanceImpl::InstanceImpl() -> initialize() -> InstanceUtil::loadBootstrapConfig() 启动参数bootstrap的初始化是通过InstanceUtil::loadBootstrapConfig()方法实现的,Envoy进程在启动时用--config-path--config-yaml指定启动配置文件,通过MessageUtil::loadFromFileMessageUtil::loadFromYaml分别加载配置文件,并通过bootstrap.MergeFrom进行配置合并。最终赋值到instancebootstrap的成员变量envoy::config::bootstrap::v3::Bootstrap,就是一个protobuf的message而已。

void InstanceUtil::loadBootstrapConfig(envoy::config::bootstrap::v3::Bootstrap& bootstrap,
                                       const Options& options,
                                       ProtobufMessage::ValidationVisitor& validation_visitor,
                                       Api::Api& api) {
  const std::string& config_path = options.configPath();
  const std::string& config_yaml = options.configYaml();
  const envoy::config::bootstrap::v3::Bootstrap& config_proto = options.configProto();

  if (!config_path.empty()) {
    MessageUtil::loadFromFile(config_path, bootstrap, validation_visitor, api);
  }
  if (!config_yaml.empty()) {
    envoy::config::bootstrap::v3::Bootstrap bootstrap_override;
    MessageUtil::loadFromYaml(config_yaml, bootstrap_override, validation_visitor);
    // TODO(snowp): The fact that we do a merge here doesn't seem to be covered under test.
    bootstrap.MergeFrom(bootstrap_override);
  }
  if (config_proto.ByteSize() != 0) {
    bootstrap.MergeFrom(config_proto);
  }
  MessageUtil::validate(bootstrap, validation_visitor);
}

初始化 admin 服务

image.png
Envoy进程需要对外暴露RESTFUL服务,与外部控制面或观测数据收集系统进行交互,该服务由Envoy主线程的Admin模块承担。相关配置在boostrap的admin部分指定的。
source/server/server.cc:531

  admin_ = std::make_unique(initial_config.admin().profilePath(), *this, initial_config.admin().ignoreGlobalConnLimit());
         
  if (initial_config.admin().address()) {
    admin_->startHttpListener(initial_config.admin().accessLogs(), options_.adminAddressPath(),
                              initial_config.admin().address(),
                              initial_config.admin().socketOptions(),
                              stats_store_.createScope("listener.admin."));
  }
  if (initial_config.admin().address()) {
    admin_->addListenerToHandler(handler_.get());
  }

注册url handler

Admin模块的实现位于source/server/admin/admin.cc中,其构造方法位于AdminImpl::AdminImpl中,将创建前缀为http.admin的观测指标。
主要部分就是定义url的handler处理器,定义了一个UrlHandler的结构体来管理处理器。

  struct UrlHandler {
    const std::string prefix_;
    const std::string help_text_;
    const HandlerCb handler_;
    const bool removable_;
    const bool mutates_server_state_;
  };

对应的,在Admin模块中注册功能处理器时,每个URL的handler配置信息如下:

      handlers_{
          {"/", "Admin home page", MAKE_ADMIN_HANDLER(handlerAdminHome), false, false},
          {"/certs", "print certs on machine",
           MAKE_ADMIN_HANDLER(server_info_handler_.handlerCerts), false, false},
          {"/clusters", "upstream cluster status",
           MAKE_ADMIN_HANDLER(clusters_handler_.handlerClusters), false, false},

下面取其中一条具体分析下

{"/config_dump", "dump current Envoy configs (experimental)", MAKE_ADMIN_HANDLER(config_dump_handler_.handlerConfigDump), false, false},

在每一条配置的MAKE_ADMIN_HANDLER中都会调用prefix匹配到的handler,此处即为:config_dump_handler_.handlerConfigDump(path_and_query, response_headers, data, admin_stream)

启动http服务

启动Admin监听,创建监听Socket和监听器AdminListener。

admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(),
                              initial_config.admin().address(),
                              stats_store_.createScope("listener.admin."));

在启动服务函数内部会创建 TcpListenSocket 和 AdminListener。

void AdminImpl::startHttpListener(const std::string& access_log_path,
                                  const std::string& address_out_path,
                                  Network::Address::InstanceConstSharedPtr address,
                                  Stats::ScopePtr&& listener_scope) {
  ... ...
  socket_ = std::make_unique(address, nullptr, true);
  listener_ = std::make_unique(*this, std::move(listener_scope));
  ... ...
}

(1)创建socket。初始化 TcpListenSocket 时会在内部创建一个 socket 后再进行 bind。

using TcpListenSocket = NetworkListenSocket;

template  class NetworkListenSocket : public ListenSocketImpl {
public:
  NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
                      const Network::Socket::OptionsSharedPtr& options, bool bind_to_port)
      // socket 创建
      : ListenSocketImpl(address->socket(T::type), address) {
    RELEASE_ASSERT(io_handle_->fd() != -1, "");

    setPrebindSocketOptions();

    setupSocket(options, bind_to_port);
  }

void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) {
  setListenSocketOptions(options);
  //准备进行绑定
  if (bind_to_port) {
    doBind();
  }
}

void ListenSocketImpl::doBind() {
  //绑定
  const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
  ... ...
}

(2)创建AdminListener。构造方法就是进行参数初始化。

AdminListener(AdminImpl& parent, Stats::ScopePtr&& listener_scope)
  : parent_(parent), name_("admin"), scope_(std::move(listener_scope)),
    stats_(Http::ConnectionManagerImpl::generateListenerStats("http.admin.", *scope_)) {}

AdminListener加入到监听器队列

将 AdminListener 通过 handler 加入监听队列。handler就是ConnectionHandlerImpl,是在 InstanceImpl 的构造函数内初始化的。

InstanceImpl::InstanceImpl(... ...)
  : handler_(new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)),
  ... ...{
  }

void InstanceImpl::initialize(... ...)
{
  ... ...
  //将 AdminListener 加入 ConnectionHandler
  if (initial_config.admin().address()) {
    admin_->addListenerToHandler(handler_.get());
  }
  ... ...
}

void AdminImpl::addListenerToHandler(Network::ConnectionHandler* handler) {
  // 这里的 listener_ 就是上面生成的 AdminListener
  if (listener_) {
    handler->addListener(*listener_);
  }
}

source/server/connection_handler_impl.cc:67ConnectionHandlerImpl::addListener方法中,根据adminConfig创建ActiveTcpListener,将listener添加到server的listeners列表中。ActiveTcpListener构造方法中通过dispatcher创建listener,在底层的libevent中注册了读写事件。等有新连接到来时,会回调 onAccept。

ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,Network::ListenerConfig& config)
    : ActiveListener(
          parent,
          // 创建listen
          parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
                                            config.handOffRestoredDestinationConnections()),
          config) {}

Network::ListenerPtr
DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) {
  ASSERT(isThreadSafe());
  return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port,
                                                        hand_off_restored_destination_connections)};
}

void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
  listener_.reset(
      //创建 evconnlistener_new ,有连接回调listenCallback
      evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
  ... ...
}

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,int remote_addr_len, void* arg) {
  ... ...

  //回调ActiveListener的onAccept
  listener->cb_.onAccept(
    std::make_unique(std::move(io_handle), local_address, remote_address),
    listener->hand_off_restored_destination_connections_);
}

同时,AdminImpl中通过override createFilterChain方法,增加filter回调AdminFilter。

void AdminImpl::createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) {
  callbacks.addStreamFilter(std::make_shared(createCallbackFunction()));
}
  AdminFilter::AdminServerCallbackFunction createCallbackFunction() {
    return [this](absl::string_view path_and_query, Http::ResponseHeaderMap& response_headers,
                  Buffer::OwnedImpl& response, AdminFilter& filter) -> Http::Code {
      return runCallback(path_and_query, response_headers, response, filter);
    };
  }

admin请求处理

从上面可以看到,对于进入的访问Admin的http请求,将会执行到AdminFilter对象的runCallback回调方法。runCallback方法中将轮询构造方法中注册的handlers_匹配的path_and_query参数,匹配后的restful请求将调用已注册的handler.handler_处理方法。

Http::Code AdminImpl::runCallback(absl::string_view path_and_query,
                                  Http::ResponseHeaderMap& response_headers,
                                  Buffer::Instance& response, AdminStream& admin_stream) {
  std::string::size_type query_index = path_and_query.find('?');
  if (query_index == std::string::npos) {
    query_index = path_and_query.size();
  }

  for (const UrlHandler& handler : handlers_) {
    if (path_and_query.compare(0, query_index, handler.prefix_) == 0) {
      found_handler = true;
...
      code = handler.handler_(path_and_query, response_headers, response, admin_stream);
      break;
    }
  }
  return code;
}

创建ListenerManager☆

创建ListenerManagerImpl,用于管理监听,因为Downstream要访问Upstream的时候,envoy会进行监听,Downstream会连接监听的端口。

ListenerManager拥有一个或多个工作线程,每个工作线程回去处理一个给定的Downstream的tcp连接。ListenerManager负责创建这些工作线程,工作线程创建好了以后ListenerManager就创建完成了。

source/server/server.cc:586InstanceImpl::initialize中是创建ListenerManager对象:

  // Workers get created first so they register for thread local updates.
  listener_manager_ =
      std::make_unique(*this, listener_component_factory_, worker_factory_,
                                            bootstrap_.enable_dispatcher_stats(), quic_stat_names_);

该构造函数传入了4个参数:

  • 第一个参数this,是这里的server对象;
  • 第二个参数listener_component_factory_是监听器组件工厂类ProdListenerComponentFactory,它将创建真实的套接字,在系统热启动时它会尝试从父进程获取套接字;
  • 第三个参数worker_factory_是一个工作线程工厂类ProdWorkerFactory,它将用于为服务器创建工作线程;
  • 第四个参数是个布尔类型,指示是否启用事件分发器的统计功能,这个值默认是false,因为启用了事件分发器的统计功能后数据量可能会非常大,如果需要启用该统计功能,需要将bootstrap的enable_dispatcher_stats的值配置为true。

进入source/server/listener_manager_impl.cc:237文件中,ListenerManagerImpl的构造方法中,创建了很多的worker,Envoy采用libevent监听socket的事件,当有一个新的连接来的时候,会将任务分配给某个worker进行处理,从而实现异步的处理。

根据配置的工作线程数创建工作线程,并通过ProdWorkerFactory来创建新的工作线程对象,工作线程名使用worker_开头。注意哈,此处创建的是worker对象,并没有创建启动系统线程,在后续的envoy运行环节才会有。

ListenerManagerImpl::ListenerManagerImpl(Instance& server,
...
{
  for (uint32_t i = 0; i < server.options().concurrency(); i++) {
    workers_.emplace_back(
        worker_factory.createWorker(i, server.overloadManager(), absl::StrCat("worker_", i)));
  }
}

source/server/worker_impl.cc:17文件中,createWorker创建WorkerImpl实例。

WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
                                          const std::string& worker_name) {
  Event::DispatcherPtr dispatcher(
      api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
  auto conn_handler = std::make_unique(*dispatcher, index);
  return std::make_unique(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
                                      overload_manager, api_, stat_names_);
}

createWorker会创建WorkerImpl,初始化WorkerImpl需要两个重要的参数。

  • 一个是api_.allocateDispatcher创建出来的DispatcherImpl,用来封装libevent的事件分发的。
    • dispatcher作为网络事件及其他内部事件调度器,会创建WatermarkBufferFactory工厂实例,用于创建分配网络请求内存buffer的watermarkBuffer实例。
    • watermarkBuffer可以监控请求内存已分配内存的大小是否超出设置的阈值,如果是则触发L4 ConnectionImpl连接对象上的onWriteBufferLowWatermark方法,暂停接收新请求来保护Envoy进程。
    • dispatcher对象还会负责生命周期较短对象内存的延迟释放,解决这类对象由于被其他生命周期较长对象访问时出现的野指针问题。
    • 同时,dispatcher会负责创建与底层libevent库的通信。
  • 一个是ConnectionHandlerImpl,用来管理一个连接的。
    source/server/worker_impl.cc文件中,是WorkerImpl的构造方法,注册工作线程对象到registered_threads_列表中。后续主线程可以通过runOnAllThreads来让工作线程执行任务。此处涉及到的worker线程为了共享主线程数据,需要使用threadlocal注册到主线程registered_threads_列表中。后续主线程可以通过runOnAllThreads来让工作线程执行任务。

所有要进行数据共享的线程都需要通过registerThread接口进行注册,dispatcher接口则是用来返回当前线程对应的Dispatcher对象。参考文章

WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
...
  tls_.registerThread(*dispatcher_, false);
...
}

创建ClusterManager☆

  • 启动的时候,载入bootstrap配置,创建ProdClusterManagerFactory。在source/server/server.cc:656文件中,InstanceImpl::initialize方法中执行了cluster服务发现的初始化。
  config_.initialize(bootstrap_, *this, *cluster_manager_factory_);
  • 进入source/server/configuration_impl.cc:74中,就是上面的initialize方法MainImpl::initialize,创建cluster管理器,并且根据静态配置创建listener,并添加到active_listeners_或warming_listeners_中。
void MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
                          Instance& server,
                          Upstream::ClusterManagerFactory& cluster_manager_factory) {

  cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);
  const auto& listeners = bootstrap.static_resources().listeners();
  for (ssize_t i = 0; i < listeners.size(); i++) {
    ENVOY_LOG(debug, "listener #{}:", i);
    server.listenerManager().addOrUpdateListener(listeners[i], "", false);
  }
}
  • 通过ProdClusterManagerFactoryclusterManagerFromProto方法创建ClusterManager。注意在cluster_manager_factory.clusterManagerFromProto(bootstrap)中创建ClusterManagerImpl对象。
    构造方法中会针对每个工作线程创建ThreadLocalClusterManagerImpl,解决多个工作线程访问cluster配置的锁问题。

虽然我们有ClusterManager接口,但是这个接口是提供个主线程的,用来添加、更新、删除集群的,并非是线程安全的。而且我们在worker线程也不需要做集群的添加、更新和删除。 因此这里有了ThreadLocalClusterManagerImpl,他实现了ThreadLocalCluster接口,并提供了一些有限的一些接口提供给worker线程来访问我们的Cluster。

  tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
    return std::make_shared(*this, dispatcher, local_cluster_params);
  });
  • ClusterManagerImpl构造函数内部开始创建cds、载入primary集群、static集群等操作。
    source/common/upstream/cluster_manager_impl.cc文件中,ClusterManagerImpl的构造方法。cluster的load分为两个阶段,第一阶段会载入所有的primary集群,第二阶段才会载入secondary集群。is_primary_cluster用来判断一个集群是否是primary集群。 目前除了EDS类型的集群并且不是基于文件发现的方式,其他的都是primary集群。

    • 开始载入primary集群。
      优先载入primary集群,因为Secondary集群的初始化可能会依赖primary集群,比如EDS类型的Cluster就需要依赖一个primary类型的xds集群来提供xds server的地址。
      source/common/upstream/cluster_manager_impl.cc:324
      for (const auto& cluster : bootstrap.static_resources().clusters()) {
        if (is_primary_cluster(cluster)) {
          primary_clusters_.insert(cluster.name());
        }
      }
      // Load all the primary clusters.
      for (const auto& cluster : bootstrap.static_resources().clusters()) {
        if (is_primary_cluster(cluster)) {
          loadCluster(cluster, MessageUtil::hash(cluster), "", false, active_clusters_);
        }
      }
    
    • 开启创建adx通道,后续要在这个通道上创建cds api

核心方法

  • 重点看下loadCluster。无论是Primary cluster、还是Secondary Cluster,最终都是通过loadCluster把Cluster Protobuf变成Cluster对象。
    • 通过ClusterManagerFactory以及Cluster的Protobuf来创建ClusterThreadAwareLoadBalancer
      std::pair new_cluster_pair =
      factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
      auto& new_cluster = new_cluster_pair.first;
      Cluster& cluster_reference = *new_cluster;
    
    • 设置healthChecker、outlierDetector等callback。
    if (new_cluster->healthChecker() != nullptr) {
    new_cluster->healthChecker()->addHostCheckCompleteCb(
        [this](HostSharedPtr host, HealthTransition changed_state) {
          if (changed_state == HealthTransition::Changed &&
              host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
            postThreadLocalHealthFailure(host);
          }
        });
    }
    
    if (new_cluster->outlierDetector() != nullptr) {
    new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
      if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
        ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
                        "host {} in cluster {} was ejected by the outlier detector",
                        host->address(), host->cluster().name());
        postThreadLocalHealthFailure(host);
      }
    });
    }
    

    这么做的目的是因为,当开启健康检查和离群检测的时候,有出现异常机器需要通知所有的线程进行连接的处理,比如drain connection、close conntions然后从连接池中移除等操作。

    • 从ClusterMap中查找Cluster,如果已经存在就替换,没有则新增。
      ClusterDataPtr result;
      auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());
      if (cluster_entry_it != cluster_map.end()) {
        result = std::exchange(cluster_entry_it->second,
                               std::make_unique(cluster, cluster_hash, version_info,
                                                             added_via_api, std::move(new_cluster),
                                                             time_source_));
      } else {
        bool inserted = false;
        std::tie(cluster_entry_it, inserted) = cluster_map.emplace(
            cluster_reference.info()->name(),
            std::make_unique(cluster, cluster_hash, version_info, added_via_api,
                                          std::move(new_cluster), time_source_));
        ASSERT(inserted);
      }
    
  • cds初始化。
    仍然在source/common/upstream/cluster_manager_impl.cc文件中,ClusterManagerImpl的构造方法中,如果在启动文件中配置了cds,那么会创建cds对象。
  // We can now potentially create the CDS API once the backing cluster exists.
  if (dyn_resources.has_cds_config() || !dyn_resources.cds_resources_locator().empty()) {
...
    cds_api_ = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(), *this);
    init_helper_.setCds(cds_api_.get());
  }
CdsApiPtr
ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
                                     const xds::core::v3::ResourceLocator* cds_resources_locator,
                                     ClusterManager& cm) {
  return CdsApiImpl::create(cds_config, cds_resources_locator, cm, stats_,
                            validation_context_.dynamicValidationVisitor());
}

进入source/common/upstream/cds_api_impl.cc文件中,

CdsApiPtr CdsApiImpl::create(const envoy::config::core::v3::ConfigSource& cds_config,
                             const xds::core::v3::ResourceLocator* cds_resources_locator,
                             ClusterManager& cm, Stats::Scope& scope,
                             ProtobufMessage::ValidationVisitor& validation_visitor) {
  return CdsApiPtr{
      new CdsApiImpl(cds_config, cds_resources_locator, cm, scope, validation_visitor)};
}

CdsApiImpl的构造方法中,会注册对cds资源的订阅。

CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,
                       const xds::core::v3::ResourceLocator* cds_resources_locator,
                       ClusterManager& cm, Stats::Scope& scope,
                       ProtobufMessage::ValidationVisitor& validation_visitor)
    : Envoy::Config::SubscriptionBase(validation_visitor,
                                                                           "name"),
  const auto resource_name = getResourceName();
  if (cds_resources_locator == nullptr) {
    subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource(
        cds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
  } else {
    subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
        *cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_);
  }
}

每当有cds配置事件发生变化时,都通过回调方法进行cluster更新。

Dispatcher内存延迟析构

参考

在Envoy的代码中Dispatcher是随处可见的,可以说在Envoy中有着举足轻重的地位,一个Dispatcher就是一个EventLoop,其承担了任务队列、网络事件处理、定时器、信号处理等核心功能。

Dispatcher 和 Libevent

Dispatcher本质上就是一个EventLoop,Envoy并没有重新实现,而是复用了Libevent中的event_base,在Libevent的基础上进行了二次封装并抽象出一些事件类,比如FileEventSignalEventTimer等。Libevent是一个C库,而Envoy是C++,为了避免手动管理这些C结构的内存,Envoy通过继承unique_ptr的方式重新封装了这些libevent暴露出来的C结构。

DeferredDeletable

最后讲一下Dispatcher中比较难理解也很重要的DeferredDeletable,它是一个空接口,所有要进行延迟析构的对象都要继承自这个空接口。在Envoy的代码中像下面这样继承自DeferredDeletable的类随处可见。

class DeferredDeletable { public:   virtual ~DeferredDeletable() {} }; 

那何为延迟析构呢?用在哪个场景呢?延迟析构指的是将析构的动作交由Dispatcher来完成,所以DeferredDeletableDispatcher密切相关。Dispatcher对象有一个vector保存了所有要延迟析构的对象。

class DispatcherImpl : public Dispatcher {   
private:   ........   
    std::vector to_delete_1_;
    std::vector to_delete_2_;
    std::vector* current_to_delete_;  
} 

to_delete_1_to_delete_2_就是用来存放所有的要延迟析构的对象,这里使用两个vector存放,为什么要这样做呢?。current_to_delete_始终指向当前正要析构的对象列表,每次执行完析构后就交替指向另外一个对象列表,来回交替。

void DispatcherImpl::clearDeferredDeleteList() { 
    std::vector* to_delete = current_to_delete_; 
    size_t num_to_delete = to_delete->size(); 
    if (current_to_delete_ == &to_delete_1_) { current_to_delete_ = &to_delete_2_; } 
    else { current_to_delete_ = &to_delete_1_; } 
    deferred_deleting_ = true; 
    for (size_t i = 0; i clear(); 
    deferred_deleting_ = false; 
}

上面的代码在执行对象析构的时候先使用to_delete来指向当前正要析构的对象列表,然后将current_to_delete_指向另外一个列表,这样在添加延迟删除的对象时,就可以做到安全的把对象添加到列表中了。因为deferredDeleteclearDeferredDeleteList都是在同一个线程中运行,所以current_to_delete_是一个普通的指针,可以安全的更改指针指向另外一个,而不用担心有线程安全问题。

void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) { 
    ASSERT(isThreadSafe());   
    current_to_delete_->emplace_back(std::move(to_delete));   
    ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());   
    if (1 == current_to_delete_->size()) {     
        deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0));   
    } 
} 

当有要进行延迟析构的对象时,调用deferredDelete即可,这个函数内部会通过current_to_delete_把对象放到要延迟析构的列表中,最后判断下当前要延迟析构的列表大小是否是1,如果是1表明这是第一次添加延迟析构的对象,那么就需要通过deferred_delete_timer_把背后的线程唤醒执行clearDeferredDeleteList函数。这样做的原因是避免多次唤醒,因为有一种情况是线程已经唤醒了正在执行clearDeferredDeleteList,在这个过程中又有其他的对象需要析构而加入到vector中。

到此为止deferredDelete的实现原理就基本分析完了,可以看出它的实现和任务队列的实现很类似,只不过一个是循环执行callback所代表的任务,另一个是对对象进行析构。最后我们来看一下deferredDelete的应用场景,却“为何要进行延迟析构?”在Envoy的源代码中经常会看到像下面这样的代码片段。

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) { ...... } // 传递裸指针到回调中 

file_event_ = dispatcher_.createFileEvent( fd(), 
        [this](uint32_t events) -> void { onFileEvent(events); },
        Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); 
        ...... 
}

传递给Dispatchercallback都是通过裸指针的方式进行回调,如果进行回调的时候对象已经析构了,就会出现野指针的问题,我相信C++水平还可以的同学都会看出这个问题,除非能在逻辑上保证Dispatcher的生命周期比所有对象都短,这样就能保证在回调的时候对象肯定不会析构,但是这不可能成立的,因为DispatcherEventLoop的核心。

一个线程运行一个EventLoop直到线程结束,Dispatcher对象才会析构,这意味着Dispatcher对象的生命周期是最长的。所以从逻辑上没办法保证进行回调的时候对象没有析构。可能有人会有疑问,对象在析构的时候把注册的事件取消不就可以避免野指针的问题吗? 那如果事件已经触发了,callback正在等待运行呢? 又或者callback运行了一半呢?前者libevent是可以保证的,在调用event_del的时候可以把处于等待运行的事件取消掉,但是后者就无能为力了,这个时候如果对象析构了,那行为就是未定义了。沿着这个思路想一想,是不是只要保证对象析构的时候没有callback正在运行就可以解决问题了呢?是的,只要保证所有在执行中的callback执行完了,再做对象析构就可以了。可以利用Dispatcher是顺序执行所有callback的特点,向Dispatcher中插入一个任务就是用来对象析构的,那么当这个任务执行的时候是可以保证没有其他任何callback在运行。通过这个方法就完美解决了这里遇到的野指针问题了。

未完待续。。。

参考文档

www.cnblogs.com/mathli/p/10…
blog.csdn.net/wyy4045/art…
sq.sf.163.com/blog/articl…
zyfjeff.github.io/%E5%BC%80%E…

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论