ResourceLoader: Fix edge cases in the management of user tokens

1. Make handling of user tokens atomic:
   Loads started with the external-facing API used to perform a two-step setup of the user token. Between both, the mutex was unlocked without its reference count having been increased. A non-user-initiated load could therefore destroy the load task when it unreferenced the token.
   Those stages now happen atomically so in the one hand, the described race condition can't happen so the load task life insurance doesn't have a gap anymore and, on the other hand, the ugliness that the call to load could return `ERR_BUSY` if happening while other thread was between both steps is gone.
   The code has been refactored so the user token concerns are still outside the inner load start function, which is agnostic to that for a cleaner implementation.
2. Clear ambiguity between load operations running on `WorkerThreadPool`:
   The two cases are: single-loaded thread directly started by a user pool task and a load started by the system as part of a multi-threaded load.
   Since ensuring all the code dealing with this distinction would make it very complex, and error-prone, a different measure is applied instead: just take one of the cases out of the dicotomy. We now ensure every load happening on a pool thread has been initiated by the system.
   The way of achieving that is that a single-threaded user-started load initiated from a pool thread, is run as another task.
This commit is contained in:
Pedro J. Estébanez 2024-07-15 10:30:02 +02:00
parent 5c970db2e4
commit df23858488
2 changed files with 102 additions and 86 deletions

View File

@ -231,23 +231,22 @@ void ResourceLoader::LoadToken::clear() {
WorkerThreadPool::TaskID task_to_await = 0;
// User-facing tokens shouldn't be deleted until completely claimed.
DEV_ASSERT(user_rc == 0 && user_path.is_empty());
if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (!load_task.awaited) {
if (load_task.task_id && !load_task.awaited) {
task_to_await = load_task.task_id;
load_task.awaited = true;
}
// Removing a task which is still in progress would be catastrophic.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
local_path.clear();
}
if (!user_path.is_empty()) {
DEV_ASSERT(user_load_tokens.has(user_path));
user_load_tokens.erase(user_path);
user_path.clear();
}
thread_load_mutex.unlock();
// If task is unused, await it here, locally, now the token data is consistent.
@ -461,36 +460,44 @@ static String _validate_local_path(const String &p_path) {
}
Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) {
thread_load_mutex.lock();
if (user_load_tokens.has(p_path)) {
print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error.");
user_load_tokens[p_path]->reference(); // Additional request.
thread_load_mutex.unlock();
return OK;
}
user_load_tokens[p_path] = nullptr;
thread_load_mutex.unlock();
Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true);
return token.is_valid() ? OK : FAILED;
}
Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode);
if (token.is_valid()) {
thread_load_mutex.lock();
token->user_path = p_path;
token->reference(); // First request.
user_load_tokens[p_path] = token.ptr();
print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size()));
thread_load_mutex.unlock();
return OK;
ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) {
HashMap<String, LoadToken *>::Iterator E = user_load_tokens.find(p_path);
if (E) {
print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error.");
LoadToken *token = E->value;
token->user_rc++;
return token;
} else {
return FAILED;
return nullptr;
}
}
void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) {
p_token->user_path = p_path;
p_token->reference(); // Extra RC until all user requests have been gotten.
p_token->user_rc = 1;
user_load_tokens[p_path] = p_token;
print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size()));
}
Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) {
if (r_error) {
*r_error = OK;
}
Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode);
LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT;
if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) {
// If user is initiating a single-threaded load from a WorkerThreadPool task,
// we instead spawn a new task so there's a precondition that a load in a pool task
// is always initiated by the engine. That makes certain aspects simpler, such as
// cyclic load detection and awaiting.
thread_mode = LOAD_THREAD_SPAWN_SINGLE;
}
Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode);
if (!load_token.is_valid()) {
if (r_error) {
*r_error = FAILED;
@ -502,7 +509,7 @@ Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hi
return res;
}
Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) {
Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) {
String local_path = _validate_local_path(p_path);
bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP;
@ -515,6 +522,13 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
{
MutexLock thread_load_lock(thread_load_mutex);
if (p_for_user) {
LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path);
if (existing_token) {
return Ref<LoadToken>(existing_token);
}
}
if (!ignoring_cache && thread_load_tasks.has(local_path)) {
load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token);
if (load_token.is_valid()) {
@ -528,6 +542,9 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
load_token.instantiate();
load_token->local_path = local_path;
if (p_for_user) {
_load_threaded_request_setup_user_token(load_token.ptr(), p_path);
}
//create load task
{
@ -545,6 +562,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
load_task.resource = existing;
load_task.status = THREAD_LOAD_LOADED;
load_task.progress = 1.0;
DEV_ASSERT(!thread_load_tasks.has(local_path));
thread_load_tasks[local_path] = load_task;
return load_token;
}
@ -576,7 +594,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
} else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr);
}
}
} // MutexLock(thread_load_mutex).
if (run_on_current_thread) {
_run_load_task(load_task_ptr);
@ -666,13 +684,7 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
}
LoadToken *load_token = user_load_tokens[p_path];
if (!load_token) {
// This happens if requested from one thread and rapidly querying from another.
if (r_error) {
*r_error = ERR_BUSY;
}
return Ref<Resource>();
}
DEV_ASSERT(load_token->user_rc >= 1);
// Support userland requesting on the main thread before the load is reported to be complete.
if (Thread::is_main_thread() && !load_token->local_path.is_empty()) {
@ -689,8 +701,15 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
}
res = _load_complete_inner(*load_token, r_error, thread_load_lock);
if (load_token->unreference()) {
memdelete(load_token);
load_token->user_rc--;
if (load_token->user_rc == 0) {
load_token->user_path.clear();
user_load_tokens.erase(p_path);
if (load_token->unreference()) {
memdelete(load_token);
load_token = nullptr;
}
}
}
@ -733,55 +752,45 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
}
bool loader_is_wtp = load_task.task_id != 0;
Error wtp_task_err = FAILED;
if (loader_is_wtp) {
// Loading thread is in the worker pool.
load_task.awaited = true;
thread_load_mutex.unlock();
PREPARE_FOR_WTP_WAIT
wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
RESTORE_AFTER_WTP_WAIT
}
if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock.
if (loader_is_wtp) {
if (wtp_task_err == ERR_BUSY) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
_run_load_task(&load_task);
Ref<Resource> resource = load_task.resource;
if (r_error) {
*r_error = load_task.error;
}
thread_load_mutex.lock();
return resource;
} else {
DEV_ASSERT(wtp_task_err == OK);
thread_load_mutex.lock();
}
} else if (load_task.need_wait) {
// Loading thread is main or user thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
load_task.awaiters_count++;
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.need_wait);
load_task.awaiters_count--;
if (load_task.awaiters_count == 0) {
memdelete(load_task.cond_var);
load_task.cond_var = nullptr;
}
PREPARE_FOR_WTP_WAIT
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
RESTORE_AFTER_WTP_WAIT
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
_run_load_task(&load_task);
}
} else {
if (loader_is_wtp) {
thread_load_mutex.lock();
thread_load_mutex.lock();
load_task.awaited = true;
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
} else if (load_task.need_wait) {
// Loading thread is main or user thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
load_task.awaiters_count++;
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.need_wait);
load_task.awaiters_count--;
if (load_task.awaiters_count == 0) {
memdelete(load_task.cond_var);
load_task.cond_var = nullptr;
}
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
}
}
@ -1221,10 +1230,13 @@ void ResourceLoader::clear_thread_load_tasks() {
}
while (user_load_tokens.begin()) {
// User load tokens remove themselves from the map on destruction.
memdelete(user_load_tokens.begin()->value);
LoadToken *user_token = user_load_tokens.begin()->value;
user_load_tokens.remove(user_load_tokens.begin());
DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty());
user_token->user_path.clear();
user_token->user_rc = 0;
user_token->unreference();
}
user_load_tokens.clear();
thread_load_tasks.clear();

View File

@ -123,6 +123,7 @@ public:
struct LoadToken : public RefCounted {
String local_path;
String user_path;
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
Ref<Resource> res_if_unregistered;
void clear();
@ -132,10 +133,13 @@ public:
static const int BINARY_MUTEX_TAG = 1;
static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode);
static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false);
static Ref<Resource> _load_complete(LoadToken &p_load_token, Error *r_error);
private:
static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path);
static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path);
static Ref<Resource> _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock);
static Ref<ResourceFormatLoader> loader[MAX_LOADERS];