背景
因为要加一个检测模型加载是否成功的接口,需要读取TF-Serving中保存的加载到内存的model_config_file。
一开始在ServerCore里面加一个函数getConfig(),加锁访问该类中的config_,该变量保存了模型配置。
Status ServerCore::getConfig(std::unordered_map<string, string>* config_map) const {
mutex_lock l(config_mu_);
for (const ModelConfig& model_config : config_.model_config_list().config()) {
config_map->insert(std::make_pair(model_config.base_path(), model_config.name()));
}
return Status::OK();
}
但是当模型配置文件出错,新模型加载不了的时候,发现读取该函数的请求一直处于卡死状态,初步判断是因为getConfig需要获取锁。
模型配置文件加载的入口
在Server::BuildAndStart(const Options& server_options)
方法中,会起一个线程周期性的调用PollFilesystemAndReloadConfig(model_config_file)
。
PollFilesystemAndReloadConfig函数解析model_config_file的内容,调用ServerCore::ReloadConfig(config)
去reload配置。
ReloadConfig首先对config_mu_加锁,config_mu_是保护config_变量的。然后验证新的模型配置以及其他验证,然后调用 AddModelsViaModelConfigList() 加载模型。
AddModelsViaModelConfigList() 首先从config创建FileSystemStoragePathSourceConfig,创建Routes等等。
找出哪些模型是新增的,然后ReloadStoragePathSourceConfig、ReloadRoutes等等,最后WaitUntilModelsAvailable。
WaitUntilModelsAvailable会调用WaitUntilServablesReachState等待模型到达可用状态,如果不可用则会卡在这里。
WaitUntilServablesReachState
这个函数,入参servables包含了所有需要加载的模型,goal_status是available。
这里调用NotifyWhenServablesReachState()然后在这里等待通知。
bool ServableStateMonitor::WaitUntilServablesReachState(//linhongyun:入口
const std::vector<ServableRequest>& servables,
const ServableState::ManagerState goal_state,
std::map<ServableId, ServableState::ManagerState>* const states_reached) {
bool reached_goal_state;
Notification notified;
NotifyWhenServablesReachState(
servables, goal_state,
[&](const bool incoming_reached_goal_state,
const std::map<ServableId, ServableState::ManagerState>&
incoming_states_reached) {
if (states_reached != nullptr) {
*states_reached = incoming_states_reached;
}
reached_goal_state = incoming_reached_goal_state;//如果reach goal则把当前 ManagerState(states_reached)返回回去
notified.Notify();//这里通知
});
notified.WaitForNotification();//卡在这里,一直没得到通知
return reached_goal_state;
}
NotifyWhenServablesReachState把需要加载的模型和目标状态都组装到servable_state_notification_requests_中,调用MaybeSendStateReachedNotifications()遍历servable_state_notification_requests_(只有一个或零个)查看servables看是否和goal_state契合。如果可用则调用notifier_fn,notifier_fnWaitUntilServablesReachState中设置,该方法填充WaitUntilServablesReachState的states_reached返回值,然后发通知,这样WaitUntilServablesReachState就会在WaitForNotification()处唤醒。
void ServableStateMonitor::NotifyWhenServablesReachState(
// 把入参都设置到 servable_state_notification_requests_ 里面
// 遍历servable_state_notification_requests_(只有一个或零个)查看servables看是否和goal_state契合,如果可用则调用notifier_fn发通知
const std::vector<ServableRequest>& servables,
const ServableState::ManagerState goal_state,
const ServableStateNotifierFn& notifier_fn) {
mutex_lock l(mu_);//linhongyun:!!!调用MaybeSendStateReachedNotifications()之前都加了锁!
servable_state_notification_requests_.push_back(
{servables, goal_state, notifier_fn});
MaybeSendStateReachedNotifications();//遍历servable_state_notification_requests_查看servables看是否和goal_state契合,如果可用则调用notifier_fn发通知
}
MaybeSendStateReachedNotifications()遍历servable_state_notification_requests_,调用ShouldSendStateReachedNotification,该函数查看servables看是否和goal_state契合,如果契合则返回{是否契合,<servable_id, ManagerState>}。MaybeSendStateReachedNotifications()再调用返回的{是否契合,<servable_id, ManagerState>}去调用传入的回调函数notifier_fn,该函数在WaitUntilServablesReachState()中设置,notifier_fn会把{是否契合}作为WaitUntilServablesReachState的返回值,然后把{<servable_id, ManagerState>}也通过指针入参返回回去,最后发通知唤醒WaitUntilServablesReachState()。
void ServableStateMonitor::MaybeSendStateReachedNotifications() {
//遍历servable_state_notification_requests_查看servables看是否和goal_state契合,
// 如果可用则调用notifier_fn发通知
// servable_state_notification_requests_
for (auto iter = servable_state_notification_requests_.begin();
iter != servable_state_notification_requests_.end();) {
const ServableStateNotificationRequest& notification_request = *iter;
const optional<
std::pair<bool, std::map<ServableId, ServableState::ManagerState>>>
opt_state_and_states_reached =
ShouldSendStateReachedNotification(notification_request);
//遍历notification_request看servables看是否和goal_state契合,
// 如果契合则返回{是否契合,目标ManagerState}。
if (opt_state_and_states_reached) {//这时候通知
LOG(INFO) << "Send notify";
notification_request.notifier_fn(opt_state_and_states_reached->first,//当前servableid, 当前managerstate
opt_state_and_states_reached->second);
iter = servable_state_notification_requests_.erase(iter);
} else {
LOG(INFO) << "Not send, next";
++iter;
}
}
}
ShouldSendStateReachedNotification()遍历notification_request看servables看是否和goal_state契合,如果契合则返回{是否契合,目标<servable_id, ManagerState>}。
注意,只要要加载的models里面有一个达不到goal_status或END,这个函数就返回空了。
optional<std::pair<bool, std::map<ServableId, ServableState::ManagerState>>>
ServableStateMonitor::ShouldSendStateReachedNotification(
const ServableStateNotificationRequest& notification_request) {
bool reached_goal_state = true;
std::map<ServableId, ServableState::ManagerState> states_reached;
for (const auto& servable_request : notification_request.servables) {
if (servable_request.version) {
const ServableId servable_id = {servable_request.name,
*servable_request.version};
//linhongyun:检查stats_中的状态是否已经到达goal_status或END,如果可用则返回当前的ManagerState
const optional<ServableState::ManagerState> opt_state =
HasSpecificServableReachedState(servable_id,
notification_request.goal_state,
GetStateAndTimeInternal(servable_id));
if (!opt_state) {
return {};
}
// Remains false once false.
reached_goal_state =
reached_goal_state && *opt_state == notification_request.goal_state;
states_reached[servable_id] = *opt_state;
} else {//linhongyun:走这个分支
const optional<ServableId> opt_servable_id =
HasAnyServableInStreamReachedState(//只要有一个version可用就返回其id
servable_request.name, notification_request.goal_state, states_);
if (!opt_servable_id) {//所有version都不可用
return {};
}
const ServableState::ManagerState reached_state =
GetStateAndTimeInternal(*opt_servable_id)->state.manager_state;
// Remains false once false.
reached_goal_state = reached_goal_state &&
reached_state == notification_request.goal_state;
states_reached[*opt_servable_id] = reached_state;//一个new model对应states_reached一个项目
}
}
return {{reached_goal_state, states_reached}};
}
到目前为止,流程都是流水式的,如果需要加载的模型没有达到目标状态,那么会卡在等待通知那里。这里就有个问题了,难道检查的时候新模型还不可用,之后就没有检查的机会,永远会卡在等待通知那里么?(因为刚才的流程没有反复查看模型状态,而只查询一次)。
谁在发通知?
在模型配置文件错误的时候,会发现TF-Serving一直在打以下日志:
FileSystemStoragePathSource encountered a filesystem access error: ...
如果跟上一part所看到的只有一次reload机会,那么不会一直在打这一行日志。
找到这行日志之后,发现是在FileSystemStoragePathSource::SetAspiredVersionsCallback(),这里起了一个周期性的线程调用PollFileSystemAndInvokeCallback(),去检查模型等。
SetAspiredVersionsCallback()是在target.h中调用:
template <typename T>
void ConnectSourceToTarget(Source<T>* source, Target<T>* target) {
source->SetAspiredVersionsCallback(target->GetAspiredVersionsCallback());
}
该函数在ServerCore::CreateStoragePathSource中调用,
Status ServerCore::CreateStoragePathSource(
const FileSystemStoragePathSourceConfig& config,
Target<StoragePath>* target,
std::unique_ptr<FileSystemStoragePathSource>* source) const {
const Status status = FileSystemStoragePathSource::Create(config, source);
if (!status.ok()) {
VLOG(1) << "Unable to create FileSystemStoragePathSource due to: "
<< status;
return status;
}
//call SetAspiredVersionsCallback
//linhongyun:here?
LOG(INFO) << "before ConnectSourceToTarget";
ConnectSourceToTarget(source->get(), target);
return Status::OK();
}
而CreateStoragePathSource又在
Status ServerCore::AddModelsViaModelConfigList() {
const bool is_first_config = storage_path_source_and_router_ == nullopt;
// Create/reload the source, source router and source adapters.
const FileSystemStoragePathSourceConfig source_config =
CreateStoragePathSourceConfig(config_);
DynamicSourceRouter<StoragePath>::Routes routes;
TF_RETURN_IF_ERROR(CreateStoragePathRoutes(config_, &routes));
if (is_first_config) {
LOG(INFO) << "first config";
// Construct the following source topology:
// Source -> Router -> Adapter_0 (for models using platform 0)
// -> Adapter_1 (for models using platform 1)
// -> ...
// -> ErrorAdapter (for unrecognized models)
SourceAdapters adapters;
TF_RETURN_IF_ERROR(CreateAdapters(&adapters));
std::unique_ptr<DynamicSourceRouter<StoragePath>> router;
TF_RETURN_IF_ERROR(CreateRouter(routes, &adapters, &router));
std::unique_ptr<FileSystemStoragePathSource> source;
TF_RETURN_IF_ERROR(
CreateStoragePathSource(source_config, router.get(), &source));
// Connect the adapters to the manager, and wait for the models to load.
TF_RETURN_IF_ERROR(ConnectAdaptersToManagerAndAwaitModelLoads(&adapters));
// Stow the source components.
storage_path_source_and_router_ = {source.get(), router.get()};
manager_.AddDependency(std::move(source));
manager_.AddDependency(std::move(router));
for (auto& entry : adapters.platform_adapters) {
auto& adapter = entry.second;
manager_.AddDependency(std::move(adapter));
}
manager_.AddDependency(std::move(adapters.error_adapter));
} else {//走这个分支
LOG(INFO) << "not first config";
// Create a fresh servable state monitor, to avoid getting confused if we're
// re-loading a model-version that has previously been unloaded.
ServableStateMonitor fresh_servable_state_monitor(
servable_event_bus_.get());
// Figure out which models are new.
const std::set<string> new_models = NewModelNamesInSourceConfig(
storage_path_source_and_router_->source->config(), source_config);
// Now we're ready to start reconfiguring the elements of the Source->
// Manager pipeline ...
// First, add the new routes without removing the old ones.
DynamicSourceRouter<StoragePath>::Routes old_and_new_routes;
LOG(INFO) << "before get routes";
const Status union_status =
UnionRoutes(storage_path_source_and_router_->router->GetRoutes(),
routes, &old_and_new_routes);
LOG(INFO) << "after get routes";
if (!union_status.ok()) {
// ValidateNoModelsChangePlatforms() should have detected any conflict.
DCHECK(false);
LOG(INFO) << "Old and new routes conflict.";
return errors::Internal("Old and new routes conflict.");
}
TF_RETURN_IF_ERROR(ReloadRoutes(old_and_new_routes));
// Change the source config. Among other things this will cause it to emit
// tear-downs of any models that aren't present in the new config.
TF_RETURN_IF_ERROR(ReloadStoragePathSourceConfig(source_config));
// Now that any old models are out of the picture, remove the old routes.
TF_RETURN_IF_ERROR(ReloadRoutes(routes));
// Wait for any new models to get loaded and become available.
TF_RETURN_IF_ERROR(
WaitUntilModelsAvailable(new_models, &fresh_servable_state_monitor));
LOG(INFO) << "AFTER WaitUntilModelsAvailable";
}
LOG(INFO) << "Before return";
return Status::OK();
}