TF-Serving源码分析—模型加载部分

背景

因为要加一个检测模型加载是否成功的接口,需要读取TF-Serving中保存的加载到内存的model_config_file。

一开始在ServerCore里面加一个函数getConfig(),加锁访问该类中的config_,该变量保存了模型配置。

Status ServerCore::getConfig(std::unordered_map<string, string>* config_map) const {
    mutex_lock l(config_mu_);
    for (const ModelConfig& model_config : config_.model_config_list().config()) {
        config_map->insert(std::make_pair(model_config.base_path(), model_config.name()));
    }
    return Status::OK();
}

但是当模型配置文件出错,新模型加载不了的时候,发现读取该函数的请求一直处于卡死状态,初步判断是因为getConfig需要获取锁。

模型配置文件加载的入口

Server::BuildAndStart(const Options& server_options)方法中,会起一个线程周期性的调用PollFilesystemAndReloadConfig(model_config_file)

PollFilesystemAndReloadConfig函数解析model_config_file的内容,调用ServerCore::ReloadConfig(config)去reload配置。

ReloadConfig首先对config_mu_加锁,config_mu_是保护config_变量的。然后验证新的模型配置以及其他验证,然后调用 AddModelsViaModelConfigList() 加载模型。

AddModelsViaModelConfigList() 首先从config创建FileSystemStoragePathSourceConfig,创建Routes等等。

找出哪些模型是新增的,然后ReloadStoragePathSourceConfig、ReloadRoutes等等,最后WaitUntilModelsAvailable。

WaitUntilModelsAvailable会调用WaitUntilServablesReachState等待模型到达可用状态,如果不可用则会卡在这里。

WaitUntilServablesReachState

这个函数,入参servables包含了所有需要加载的模型,goal_status是available。

这里调用NotifyWhenServablesReachState()然后在这里等待通知。

bool ServableStateMonitor::WaitUntilServablesReachState(//linhongyun:入口
    const std::vector<ServableRequest>& servables,
    const ServableState::ManagerState goal_state,
    std::map<ServableId, ServableState::ManagerState>* const states_reached) {
  bool reached_goal_state;
  Notification notified;
    NotifyWhenServablesReachState(
      servables, goal_state,
      [&](const bool incoming_reached_goal_state,
          const std::map<ServableId, ServableState::ManagerState>&
              incoming_states_reached) {
        if (states_reached != nullptr) {
          *states_reached = incoming_states_reached;
        }
        reached_goal_state = incoming_reached_goal_state;//如果reach goal则把当前 ManagerState(states_reached)返回回去
        notified.Notify();//这里通知
      });
  notified.WaitForNotification();//卡在这里,一直没得到通知
  return reached_goal_state;
}

NotifyWhenServablesReachState把需要加载的模型和目标状态都组装到servable_state_notification_requests_中,调用MaybeSendStateReachedNotifications()遍历servable_state_notification_requests_(只有一个或零个)查看servables看是否和goal_state契合。如果可用则调用notifier_fn,notifier_fnWaitUntilServablesReachState中设置,该方法填充WaitUntilServablesReachState的states_reached返回值,然后发通知,这样WaitUntilServablesReachState就会在WaitForNotification()处唤醒。

void ServableStateMonitor::NotifyWhenServablesReachState(
        // 把入参都设置到 servable_state_notification_requests_ 里面
        // 遍历servable_state_notification_requests_(只有一个或零个)查看servables看是否和goal_state契合,如果可用则调用notifier_fn发通知
    const std::vector<ServableRequest>& servables,
    const ServableState::ManagerState goal_state,
    const ServableStateNotifierFn& notifier_fn) {
  mutex_lock l(mu_);//linhongyun:!!!调用MaybeSendStateReachedNotifications()之前都加了锁!
  servable_state_notification_requests_.push_back(
      {servables, goal_state, notifier_fn});
  MaybeSendStateReachedNotifications();//遍历servable_state_notification_requests_查看servables看是否和goal_state契合,如果可用则调用notifier_fn发通知
}

MaybeSendStateReachedNotifications()遍历servable_state_notification_requests_,调用ShouldSendStateReachedNotification,该函数查看servables看是否和goal_state契合,如果契合则返回{是否契合,<servable_id, ManagerState>}。MaybeSendStateReachedNotifications()再调用返回的{是否契合,<servable_id, ManagerState>}去调用传入的回调函数notifier_fn,该函数在WaitUntilServablesReachState()中设置,notifier_fn会把{是否契合}作为WaitUntilServablesReachState的返回值,然后把{<servable_id, ManagerState>}也通过指针入参返回回去,最后发通知唤醒WaitUntilServablesReachState()。

void ServableStateMonitor::MaybeSendStateReachedNotifications() {
    //遍历servable_state_notification_requests_查看servables看是否和goal_state契合,
    // 如果可用则调用notifier_fn发通知
    // servable_state_notification_requests_
  for (auto iter = servable_state_notification_requests_.begin();
       iter != servable_state_notification_requests_.end();) {
    const ServableStateNotificationRequest& notification_request = *iter;
    const optional<
        std::pair<bool, std::map<ServableId, ServableState::ManagerState>>>
        opt_state_and_states_reached =
            ShouldSendStateReachedNotification(notification_request);
      //遍历notification_request看servables看是否和goal_state契合,
      // 如果契合则返回{是否契合,目标ManagerState}。
    if (opt_state_and_states_reached) {//这时候通知
        LOG(INFO) << "Send notify";
      notification_request.notifier_fn(opt_state_and_states_reached->first,//当前servableid, 当前managerstate
                                       opt_state_and_states_reached->second);
      iter = servable_state_notification_requests_.erase(iter);
    } else {
        LOG(INFO) << "Not send, next";
      ++iter;
    }
  }
}

ShouldSendStateReachedNotification()遍历notification_request看servables看是否和goal_state契合,如果契合则返回{是否契合,目标<servable_id, ManagerState>}。

注意,只要要加载的models里面有一个达不到goal_status或END,这个函数就返回空了。

optional<std::pair<bool, std::map<ServableId, ServableState::ManagerState>>>
ServableStateMonitor::ShouldSendStateReachedNotification(
    const ServableStateNotificationRequest& notification_request) {
  bool reached_goal_state = true;
  std::map<ServableId, ServableState::ManagerState> states_reached;
  for (const auto& servable_request : notification_request.servables) {
    if (servable_request.version) {
      const ServableId servable_id = {servable_request.name,
                                      *servable_request.version};
        //linhongyun:检查stats_中的状态是否已经到达goal_status或END,如果可用则返回当前的ManagerState
      const optional<ServableState::ManagerState> opt_state =
          HasSpecificServableReachedState(servable_id,
                                          notification_request.goal_state,
                                          GetStateAndTimeInternal(servable_id));
      if (!opt_state) {
        return {};
      }
      // Remains false once false.
      reached_goal_state =
          reached_goal_state && *opt_state == notification_request.goal_state;
      states_reached[servable_id] = *opt_state;
    } else {//linhongyun:走这个分支
      const optional<ServableId> opt_servable_id =
          HasAnyServableInStreamReachedState(//只要有一个version可用就返回其id
              servable_request.name, notification_request.goal_state, states_);
      if (!opt_servable_id) {//所有version都不可用
        return {};
      }
      const ServableState::ManagerState reached_state =
          GetStateAndTimeInternal(*opt_servable_id)->state.manager_state;
      // Remains false once false.
      reached_goal_state = reached_goal_state &&
                           reached_state == notification_request.goal_state;
      states_reached[*opt_servable_id] = reached_state;//一个new model对应states_reached一个项目
    }
  }
  return {{reached_goal_state, states_reached}};
}

到目前为止,流程都是流水式的,如果需要加载的模型没有达到目标状态,那么会卡在等待通知那里。这里就有个问题了,难道检查的时候新模型还不可用,之后就没有检查的机会,永远会卡在等待通知那里么?(因为刚才的流程没有反复查看模型状态,而只查询一次)。

谁在发通知?

在模型配置文件错误的时候,会发现TF-Serving一直在打以下日志:

FileSystemStoragePathSource encountered a filesystem access error: ...

如果跟上一part所看到的只有一次reload机会,那么不会一直在打这一行日志。

找到这行日志之后,发现是在FileSystemStoragePathSource::SetAspiredVersionsCallback(),这里起了一个周期性的线程调用PollFileSystemAndInvokeCallback(),去检查模型等。

SetAspiredVersionsCallback()是在target.h中调用:

template <typename T>
void ConnectSourceToTarget(Source<T>* source, Target<T>* target) {
  source->SetAspiredVersionsCallback(target->GetAspiredVersionsCallback());
}

该函数在ServerCore::CreateStoragePathSource中调用,

Status ServerCore::CreateStoragePathSource(
    const FileSystemStoragePathSourceConfig& config,
    Target<StoragePath>* target,
    std::unique_ptr<FileSystemStoragePathSource>* source) const {
  const Status status = FileSystemStoragePathSource::Create(config, source);
  if (!status.ok()) {
    VLOG(1) << "Unable to create FileSystemStoragePathSource due to: "
            << status;
    return status;
  }
  //call SetAspiredVersionsCallback
  //linhongyun:here?
  LOG(INFO) << "before ConnectSourceToTarget";
  ConnectSourceToTarget(source->get(), target);
  return Status::OK();
}

而CreateStoragePathSource又在

Status ServerCore::AddModelsViaModelConfigList() {
  const bool is_first_config = storage_path_source_and_router_ == nullopt;

  // Create/reload the source, source router and source adapters.
  const FileSystemStoragePathSourceConfig source_config =
      CreateStoragePathSourceConfig(config_);
  DynamicSourceRouter<StoragePath>::Routes routes;
  TF_RETURN_IF_ERROR(CreateStoragePathRoutes(config_, &routes));
  if (is_first_config) {
      LOG(INFO) << "first config";
    // Construct the following source topology:
    //   Source -> Router -> Adapter_0 (for models using platform 0)
    //                    -> Adapter_1 (for models using platform 1)
    //                    -> ...
    //                    -> ErrorAdapter (for unrecognized models)
    SourceAdapters adapters;
    TF_RETURN_IF_ERROR(CreateAdapters(&adapters));
    std::unique_ptr<DynamicSourceRouter<StoragePath>> router;
    TF_RETURN_IF_ERROR(CreateRouter(routes, &adapters, &router));
    std::unique_ptr<FileSystemStoragePathSource> source;
    TF_RETURN_IF_ERROR(
        CreateStoragePathSource(source_config, router.get(), &source));

    // Connect the adapters to the manager, and wait for the models to load.
    TF_RETURN_IF_ERROR(ConnectAdaptersToManagerAndAwaitModelLoads(&adapters));

    // Stow the source components.
    storage_path_source_and_router_ = {source.get(), router.get()};
    manager_.AddDependency(std::move(source));
    manager_.AddDependency(std::move(router));
    for (auto& entry : adapters.platform_adapters) {
      auto& adapter = entry.second;
      manager_.AddDependency(std::move(adapter));
    }
    manager_.AddDependency(std::move(adapters.error_adapter));
  } else {//走这个分支
      LOG(INFO) << "not first config";
    // Create a fresh servable state monitor, to avoid getting confused if we're
    // re-loading a model-version that has previously been unloaded.
    ServableStateMonitor fresh_servable_state_monitor(
        servable_event_bus_.get());

    // Figure out which models are new.
    const std::set<string> new_models = NewModelNamesInSourceConfig(
        storage_path_source_and_router_->source->config(), source_config);

    // Now we're ready to start reconfiguring the elements of the Source->
    // Manager pipeline ...

    // First, add the new routes without removing the old ones.
    DynamicSourceRouter<StoragePath>::Routes old_and_new_routes;
    LOG(INFO) << "before get routes";
    const Status union_status =
        UnionRoutes(storage_path_source_and_router_->router->GetRoutes(),
                    routes, &old_and_new_routes);
    LOG(INFO) << "after get routes";
    if (!union_status.ok()) {
      // ValidateNoModelsChangePlatforms() should have detected any conflict.
      DCHECK(false);
      LOG(INFO) << "Old and new routes conflict.";
      return errors::Internal("Old and new routes conflict.");
    }
    TF_RETURN_IF_ERROR(ReloadRoutes(old_and_new_routes));

    // Change the source config. Among other things this will cause it to emit
    // tear-downs of any models that aren't present in the new config.
    TF_RETURN_IF_ERROR(ReloadStoragePathSourceConfig(source_config));

    // Now that any old models are out of the picture, remove the old routes.
    TF_RETURN_IF_ERROR(ReloadRoutes(routes));

    // Wait for any new models to get loaded and become available.
    TF_RETURN_IF_ERROR(
        WaitUntilModelsAvailable(new_models, &fresh_servable_state_monitor));
    LOG(INFO) << "AFTER WaitUntilModelsAvailable";
  }
  LOG(INFO) << "Before return";
  return Status::OK();
}

comments powered by Disqus