From 8983b20ccd2f8a91d87789f2c0dd90f4e31b8b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Sun, 5 Mar 2023 01:09:18 +0100 Subject: [PATCH] Avoid interaction issues between resource loading threads --- core/io/resource_format_binary.cpp | 47 +- core/io/resource_format_binary.h | 2 +- core/io/resource_loader.cpp | 642 +++++++++++++---------- core/io/resource_loader.h | 47 +- core/os/mutex.h | 19 +- core/os/thread.cpp | 5 +- core/os/thread.h | 2 +- main/main.cpp | 4 +- scene/resources/resource_format_text.cpp | 83 +-- scene/resources/resource_format_text.h | 2 +- 10 files changed, 461 insertions(+), 392 deletions(-) diff --git a/core/io/resource_format_binary.cpp b/core/io/resource_format_binary.cpp index 38f41d645c5..b4da314e96a 100644 --- a/core/io/resource_format_binary.cpp +++ b/core/io/resource_format_binary.cpp @@ -445,13 +445,12 @@ Error ResourceLoaderBinary::parse_variant(Variant &r_v) { WARN_PRINT("Broken external resource! (index out of size)"); r_v = Variant(); } else { - if (external_resources[erindex].cache.is_null()) { - //cache not here yet, wait for it? - if (use_sub_threads) { - Error err; - external_resources.write[erindex].cache = ResourceLoader::load_threaded_get(external_resources[erindex].path, &err); - - if (err != OK || external_resources[erindex].cache.is_null()) { + Ref &load_token = external_resources.write[erindex].load_token; + if (load_token.is_valid()) { // If not valid, it's OK since then we know this load accepts broken dependencies. + Error err; + Ref res = ResourceLoader::_load_complete(*load_token.ptr(), &err); + if (res.is_null()) { + if (!ResourceLoader::is_cleaning_tasks()) { if (!ResourceLoader::get_abort_on_missing_resources()) { ResourceLoader::notify_dependency_error(local_path, external_resources[erindex].path, external_resources[erindex].type); } else { @@ -459,12 +458,11 @@ Error ResourceLoaderBinary::parse_variant(Variant &r_v) { ERR_FAIL_V_MSG(error, "Can't load dependency: " + external_resources[erindex].path + "."); } } + } else { + r_v = res; } } - - r_v = external_resources[erindex].cache; } - } break; default: { ERR_FAIL_V(ERR_FILE_CORRUPT); @@ -684,28 +682,13 @@ Error ResourceLoaderBinary::load() { } external_resources.write[i].path = path; //remap happens here, not on load because on load it can actually be used for filesystem dock resource remap - - if (!use_sub_threads) { - external_resources.write[i].cache = ResourceLoader::load(path, external_resources[i].type); - - if (external_resources[i].cache.is_null()) { - if (!ResourceLoader::get_abort_on_missing_resources()) { - ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type); - } else { - error = ERR_FILE_MISSING_DEPENDENCIES; - ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + "."); - } - } - - } else { - Error err = ResourceLoader::load_threaded_request(path, external_resources[i].type, use_sub_threads, ResourceFormatLoader::CACHE_MODE_REUSE, local_path); - if (err != OK) { - if (!ResourceLoader::get_abort_on_missing_resources()) { - ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type); - } else { - error = ERR_FILE_MISSING_DEPENDENCIES; - ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + "."); - } + external_resources.write[i].load_token = ResourceLoader::_load_start(path, external_resources[i].type, use_sub_threads ? ResourceLoader::LOAD_THREAD_DISTRIBUTE : ResourceLoader::LOAD_THREAD_FROM_CURRENT, ResourceFormatLoader::CACHE_MODE_REUSE); + if (!external_resources[i].load_token.is_valid()) { + if (!ResourceLoader::get_abort_on_missing_resources()) { + ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type); + } else { + error = ERR_FILE_MISSING_DEPENDENCIES; + ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + "."); } } } diff --git a/core/io/resource_format_binary.h b/core/io/resource_format_binary.h index add7cdf2974..30f16649838 100644 --- a/core/io/resource_format_binary.h +++ b/core/io/resource_format_binary.h @@ -60,7 +60,7 @@ class ResourceLoaderBinary { String path; String type; ResourceUID::ID uid = ResourceUID::INVALID_ID; - Ref cache; + Ref load_token; }; bool using_named_scene_ids = false; diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 9af3a7daedc..a00593c6793 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -202,20 +202,72 @@ void ResourceFormatLoader::_bind_methods() { /////////////////////////////////// +// This should be robust enough to be called redundantly without issues. +void ResourceLoader::LoadToken::clear() { + thread_load_mutex.lock(); + + Thread *thread_to_destroy = nullptr; + + if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. + DEV_ASSERT(thread_load_tasks.has(local_path)); + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + thread_to_destroy = load_task.thread; + load_task.thread = nullptr; + thread_load_tasks.erase(local_path); + local_path.clear(); + } + + if (!user_path.is_empty()) { + DEV_ASSERT(user_load_tokens.has(user_path)); + user_load_tokens.erase(user_path); + user_path.clear(); + } + + thread_load_mutex.unlock(); + + // If thread is unused, destroy it here, locally, now the token data is consistent. + if (thread_to_destroy) { + if (thread_to_destroy->is_started()) { + thread_to_destroy->wait_to_finish(); + } + memdelete(thread_to_destroy); + } +} + +ResourceLoader::LoadToken::~LoadToken() { + clear(); +} + Ref ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress) { - bool found = false; + load_nesting++; + if (load_paths_stack.size()) { + thread_load_mutex.lock(); + HashMap::Iterator E = thread_load_tasks.find(load_paths_stack[load_paths_stack.size() - 1]); + if (E) { + E->value.sub_tasks.insert(p_path); + } + thread_load_mutex.unlock(); + } + load_paths_stack.push_back(p_path); // Try all loaders and pick the first match for the type hint + bool found = false; + Ref res; for (int i = 0; i < loader_count; i++) { if (!loader[i]->recognize_path(p_path, p_type_hint)) { continue; } found = true; - Ref res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode); - if (res.is_null()) { - continue; + res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode); + if (!res.is_null()) { + break; } + } + load_paths_stack.resize(load_paths_stack.size() - 1); + load_nesting--; + + if (!res.is_null()) { return res; } @@ -232,47 +284,45 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin void ResourceLoader::_thread_load_function(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; - load_task.loader_id = Thread::get_caller_id(); - - if (load_task.cond_var) { - //this is an actual thread, so wait for Ok from semaphore - thread_load_semaphore->wait(); //wait until its ok to start loading + // Thread-safe either if it's the current thread or a brand new one. + if (load_task.first_in_stack) { + if (!load_task.dependent_path.is_empty()) { + load_paths_stack.push_back(load_task.dependent_path); + } + } else { + DEV_ASSERT(load_task.dependent_path.is_empty()); } - load_task.resource = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); + // -- + + Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); + + thread_load_mutex.lock(); + + load_task.resource = res; load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0 - - thread_load_mutex->lock(); if (load_task.error != OK) { load_task.status = THREAD_LOAD_FAILED; } else { load_task.status = THREAD_LOAD_LOADED; } + if (load_task.cond_var) { - if (load_task.start_next && thread_waiting_count > 0) { - thread_waiting_count--; - //thread loading count remains constant, this ends but another one begins - thread_load_semaphore->post(); - } else { - thread_loading_count--; //no threads waiting, just reduce loading count - } - - print_lt("END: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); - load_task.cond_var->notify_all(); memdelete(load_task.cond_var); load_task.cond_var = nullptr; } if (load_task.resource.is_valid()) { - load_task.resource->set_path(load_task.local_path); + if (load_task.cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + load_task.resource->set_path(load_task.local_path); + } if (load_task.xl_remapped) { load_task.resource->set_as_translation_remapped(true); } #ifdef TOOLS_ENABLED - load_task.resource->set_edited(false); if (timestamp_on_load) { uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path); @@ -286,7 +336,16 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } } - thread_load_mutex->unlock(); + if (load_nesting == 0) { + thread_active_count--; + if (thread_waiting_count) { + thread_active_cond_var.notify_one(); + } + } + + print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); + + thread_load_mutex.unlock(); } static String _validate_local_path(const String &p_path) { @@ -299,91 +358,158 @@ static String _validate_local_path(const String &p_path) { return ProjectSettings::get_singleton()->localize_path(p_path); } } -Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode, const String &p_source_resource) { - String local_path = _validate_local_path(p_path); - thread_load_mutex->lock(); - - if (!p_source_resource.is_empty()) { - //must be loading from this resource - if (!thread_load_tasks.has(p_source_resource)) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "There is no thread loading source resource '" + p_source_resource + "'."); - } - //must not be already added as s sub tasks - if (thread_load_tasks[p_source_resource].sub_tasks.has(local_path)) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Thread loading source resource '" + p_source_resource + "' already is loading '" + local_path + "'."); - } - } - - if (thread_load_tasks.has(local_path)) { - thread_load_tasks[local_path].requests++; - if (!p_source_resource.is_empty()) { - thread_load_tasks[p_source_resource].sub_tasks.insert(local_path); - } - thread_load_mutex->unlock(); +Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { + thread_load_mutex.lock(); + if (user_load_tokens.has(p_path)) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + user_load_tokens[p_path]->reference(); // Additional request. + thread_load_mutex.unlock(); return OK; } + user_load_tokens[p_path] = nullptr; + thread_load_mutex.unlock(); + Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); + if (token.is_valid()) { + thread_load_mutex.lock(); + token->user_path = p_path; + token->reference(); // First request. + user_load_tokens[p_path] = token.ptr(); + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); + thread_load_mutex.unlock(); + return OK; + } else { + return FAILED; + } +} + +Ref ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { + if (r_error) { + *r_error = OK; + } + + Ref load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + if (!load_token.is_valid()) { + if (r_error) { + *r_error = FAILED; + } + return Ref(); + } + + Ref res = _load_complete(*load_token.ptr(), r_error); + return res; +} + +Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { + String local_path = _validate_local_path(p_path); + + Ref load_token; + bool must_not_register = false; + ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load. + ThreadLoadTask *load_task_ptr = nullptr; + bool run_on_current_thread = false; { + MutexLock thread_load_lock(thread_load_mutex); + + if (thread_load_tasks.has(local_path)) { + load_token = Ref(thread_load_tasks[local_path].load_token); + if (!load_token.is_valid()) { + // The token is dying (reached 0 on another thread). + // Ensure it's killed now so the path can be safely reused right away. + thread_load_tasks[local_path].load_token->clear(); + } else { + if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + return load_token; + } + } + } + + load_token.instantiate(); + load_token->local_path = local_path; + //create load task + { + ThreadLoadTask load_task; - ThreadLoadTask load_task; - - load_task.requests = 1; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); - load_task.local_path = local_path; - load_task.type_hint = p_type_hint; - load_task.cache_mode = p_cache_mode; - load_task.use_sub_threads = p_use_sub_threads; - - { //must check if resource is already loaded before attempting to load it in a thread - - if (load_task.loader_id == Thread::get_caller_id()) { - thread_load_mutex->unlock(); - ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Attempted to load a resource already being loaded from this thread, cyclic reference?"); + load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); + load_task.load_token = load_token.ptr(); + load_task.local_path = local_path; + load_task.type_hint = p_type_hint; + load_task.cache_mode = p_cache_mode; + load_task.use_sub_threads = p_thread_mode == LOAD_THREAD_DISTRIBUTE; + if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { + Ref existing = ResourceCache::get_ref(local_path); + if (existing.is_valid()) { + //referencing is fine + load_task.resource = existing; + load_task.status = THREAD_LOAD_LOADED; + load_task.progress = 1.0; + thread_load_tasks[local_path] = load_task; + return load_token; + } } - Ref existing = ResourceCache::get_ref(local_path); - - if (existing.is_valid()) { - //referencing is fine - load_task.resource = existing; - load_task.status = THREAD_LOAD_LOADED; - load_task.progress = 1.0; + // If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish unconditionally synchronously. + must_not_register = thread_load_tasks.has(local_path) && p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE; + if (must_not_register) { + load_token->local_path.clear(); + unregistered_load_task = load_task; + } else { + thread_load_tasks[local_path] = load_task; } + + load_task_ptr = must_not_register ? &unregistered_load_task : &thread_load_tasks[local_path]; } - if (!p_source_resource.is_empty()) { - thread_load_tasks[p_source_resource].sub_tasks.insert(local_path); + print_lt("REQUEST: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); + + run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; + + if (!run_on_current_thread && thread_active_count >= thread_active_max && load_nesting > 0) { + // No free slots for another thread, but this one is already active, so keep working here. + run_on_current_thread = true; } - thread_load_tasks[local_path] = load_task; - } + load_task_ptr->first_in_stack = run_on_current_thread ? load_nesting == 0 : true; - ThreadLoadTask &load_task = thread_load_tasks[local_path]; + if (load_task_ptr->first_in_stack) { + if (!run_on_current_thread && load_paths_stack.size()) { + // The paths stack is lost across thread boundaries, so we have to remember what was the topmost path. + load_task_ptr->dependent_path = load_paths_stack[load_paths_stack.size() - 1]; + } + if (thread_active_count >= thread_active_max) { + // Either the current or a new thread needs to wait for a free slot to become active. + thread_waiting_count++; + do { + thread_active_cond_var.wait(thread_load_lock); + } while (thread_active_count >= thread_active_max); + thread_waiting_count--; + } + thread_active_count++; + } - if (load_task.resource.is_null()) { //needs to be loaded in thread + if (cleaning_tasks) { + load_task_ptr->status = THREAD_LOAD_FAILED; + return load_token; + } - load_task.cond_var = memnew(ConditionVariable); - if (thread_loading_count < thread_load_max) { - thread_loading_count++; - thread_load_semaphore->post(); //we have free threads, so allow one + if (run_on_current_thread) { + load_task_ptr->loader_id = Thread::get_caller_id(); + if (must_not_register) { + load_token->res_if_unregistered = load_task_ptr->resource; + } } else { - thread_waiting_count++; + load_task_ptr->thread = memnew(Thread); + load_task_ptr->loader_id = load_task_ptr->thread->start(_thread_load_function, load_task_ptr); } - - print_lt("REQUEST: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); - - load_task.thread = memnew(Thread); - load_task.thread->start(_thread_load_function, &thread_load_tasks[local_path]); - load_task.loader_id = load_task.thread->get_id(); } - thread_load_mutex->unlock(); + if (run_on_current_thread) { + _thread_load_function(load_task_ptr); + } - return OK; + return load_token; } float ResourceLoader::_dependency_get_progress(const String &p_path) { @@ -409,13 +535,22 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) { } ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) { - String local_path = _validate_local_path(p_path); + MutexLock thread_load_lock(thread_load_mutex); - thread_load_mutex->lock(); - if (!thread_load_tasks.has(local_path)) { - thread_load_mutex->unlock(); + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); return THREAD_LOAD_INVALID_RESOURCE; } + + String local_path = _validate_local_path(p_path); + if (!thread_load_tasks.has(local_path)) { +#ifdef DEV_ENABLED + CRASH_NOW(); +#endif + // On non-dev, be defensive and at least avoid crashing (at this point at least). + return THREAD_LOAD_INVALID_RESOURCE; + } + ThreadLoadTask &load_task = thread_load_tasks[local_path]; ThreadLoadStatus status; status = load_task.status; @@ -423,198 +558,115 @@ ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const *r_progress = _dependency_get_progress(local_path); } - thread_load_mutex->unlock(); - return status; } Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_error) { - String local_path = _validate_local_path(p_path); - - MutexLock thread_load_lock(*thread_load_mutex); - if (!thread_load_tasks.has(local_path)) { - if (r_error) { - *r_error = ERR_INVALID_PARAMETER; - } - return Ref(); + if (r_error) { + *r_error = OK; } - ThreadLoadTask &load_task = thread_load_tasks[local_path]; + Ref res; + { + MutexLock thread_load_lock(thread_load_mutex); - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { - if (load_task.loader_id == Thread::get_caller_id()) { - // Load is in progress, but it's precisely this thread the one in charge. - // That means this is a cyclic load. - if (r_error) { - *r_error = ERR_BUSY; - } - return Ref(); - } else if (!load_task.cond_var) { - // Load is in progress, but a condition variable was never created for it. - // That happens when a load has been initiated with subthreads disabled, - // but now another load thread needs to interact with this one (either - // because of subthreads being used this time, or because it's simply a - // threaded load running on a different thread). - // Since we want to be notified when the load ends, we must create the - // condition variable now. - load_task.cond_var = memnew(ConditionVariable); - } - } - - //cond var still exists, meaning it's still loading, request poll - if (load_task.cond_var) { - { - // As we got a cond var, this means we are going to have to wait - // until the sub-resource is done loading - // - // As this thread will become 'blocked' we should "exchange" its - // active status with a waiting one, to ensure load continues. - // - // This ensures loading is never blocked and that is also within - // the maximum number of active threads. - - if (thread_waiting_count > 0) { - thread_waiting_count--; - thread_loading_count++; - thread_load_semaphore->post(); - - load_task.start_next = false; //do not start next since we are doing it here - } - - thread_suspended_count++; - - print_lt("GET: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count)); - } - - bool still_valid = true; - bool was_thread = load_task.thread; - do { - load_task.cond_var->wait(thread_load_lock); - if (!thread_load_tasks.has(local_path)) { //may have been erased during unlock and this was always an invalid call - still_valid = false; - break; - } - } while (load_task.cond_var); // In case of spurious wakeup. - - if (was_thread) { - thread_suspended_count--; - } - - if (!still_valid) { + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); if (r_error) { *r_error = ERR_INVALID_PARAMETER; } return Ref(); } - } - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - - load_task.requests--; - - if (load_task.requests == 0) { - if (load_task.thread) { //thread may not have been used - load_task.thread->wait_to_finish(); - memdelete(load_task.thread); + LoadToken *load_token = user_load_tokens[p_path]; + if (!load_token) { + // This happens if requested from one thread and rapidly querying from another. + if (r_error) { + *r_error = ERR_BUSY; + } + return Ref(); + } + res = _load_complete_inner(*load_token, r_error, thread_load_lock); + if (load_token->unreference()) { + memdelete(load_token); } - thread_load_tasks.erase(local_path); } - return resource; + print_lt("GET: user load tokens: " + itos(user_load_tokens.size())); + + return res; } -Ref ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { +Ref ResourceLoader::_load_complete(LoadToken &p_load_token, Error *r_error) { + MutexLock thread_load_lock(thread_load_mutex); + return _load_complete_inner(p_load_token, r_error, thread_load_lock); +} + +Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock> &p_thread_load_lock) { if (r_error) { - *r_error = ERR_CANT_OPEN; + *r_error = OK; } - String local_path = _validate_local_path(p_path); - - if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) { - thread_load_mutex->lock(); - - //Is it already being loaded? poll until done - if (thread_load_tasks.has(local_path)) { - Error err = load_threaded_request(p_path, p_type_hint); - if (err != OK) { - if (r_error) { - *r_error = err; - } - thread_load_mutex->unlock(); - return Ref(); - } - thread_load_mutex->unlock(); - - return load_threaded_get(p_path, r_error); - } - - //Is it cached? - - Ref existing = ResourceCache::get_ref(local_path); - - if (existing.is_valid()) { - thread_load_mutex->unlock(); - + if (!p_load_token.local_path.is_empty()) { + if (!thread_load_tasks.has(p_load_token.local_path)) { +#ifdef DEV_ENABLED + CRASH_NOW(); +#endif + // On non-dev, be defensive and at least avoid crashing (at this point at least). if (r_error) { - *r_error = OK; + *r_error = ERR_BUG; } - - return existing; //use cached - } - - //load using task (but this thread) - ThreadLoadTask load_task; - - load_task.requests = 1; - load_task.local_path = local_path; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); - load_task.type_hint = p_type_hint; - load_task.cache_mode = p_cache_mode; //ignore - load_task.loader_id = Thread::get_caller_id(); - - thread_load_tasks[local_path] = load_task; - - thread_load_mutex->unlock(); - - _thread_load_function(&thread_load_tasks[local_path]); - - return load_threaded_get(p_path, r_error); - - } else { - bool xl_remapped = false; - String path = _path_remap(local_path, &xl_remapped); - - if (path.is_empty()) { - ERR_FAIL_V_MSG(Ref(), "Remapping '" + local_path + "' failed."); - } - - print_verbose("Loading resource: " + path); - float p; - Ref res = _load(path, local_path, p_type_hint, p_cache_mode, r_error, false, &p); - - if (res.is_null()) { - print_verbose("Failed loading resource: " + path); return Ref(); } - if (xl_remapped) { - res->set_as_translation_remapped(true); + ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path]; + + if (load_task.status == THREAD_LOAD_IN_PROGRESS) { + if (load_task.loader_id == Thread::get_caller_id()) { + // Load is in progress, but it's precisely this thread the one in charge. + // That means this is a cyclic load. + if (r_error) { + *r_error = ERR_BUSY; + } + return Ref(); + } else if (!load_task.cond_var) { + // This is the first time some thread needs to wait for this one. + load_task.cond_var = memnew(ConditionVariable); + } + + // Wait for load to complete. + thread_suspended_count++; + + print_lt("GET: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); + + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.cond_var); + + thread_suspended_count--; } -#ifdef TOOLS_ENABLED - - res->set_edited(false); - if (timestamp_on_load) { - uint64_t mt = FileAccess::get_modified_time(path); - //printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt); - res->set_last_modified_time(mt); + if (cleaning_tasks) { + load_task.resource = Ref(); + load_task.error = FAILED; } -#endif - return res; + Ref resource = load_task.resource; + if (r_error) { + *r_error = load_task.error; + } + return resource; + } else { + // Special case of an unregistered task. + // The resource should have been loaded by now. + Ref resource = p_load_token.res_if_unregistered; + if (!resource.is_valid()) { + if (r_error) { + *r_error = FAILED; + } + } + return resource; } } @@ -958,32 +1010,43 @@ void ResourceLoader::clear_translation_remaps() { } void ResourceLoader::clear_thread_load_tasks() { - thread_load_mutex->lock(); + // Bring the thing down as quickly as possible without causing deadlocks or leaks. - for (KeyValue &E : thread_load_tasks) { - switch (E.value.status) { - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_LOADED: { - E.value.resource = Ref(); - } break; + thread_load_mutex.lock(); + cleaning_tasks = true; - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_IN_PROGRESS: { - if (E.value.thread != nullptr) { - E.value.thread->wait_to_finish(); - memdelete(E.value.thread); - E.value.thread = nullptr; + while (true) { + bool none_running = true; + if (thread_load_tasks.size()) { + for (KeyValue &E : thread_load_tasks) { + if (E.value.status == THREAD_LOAD_IN_PROGRESS) { + if (E.value.cond_var) { + E.value.cond_var->notify_all(); + memdelete(E.value.cond_var); + E.value.cond_var = nullptr; + } + none_running = false; } - E.value.resource = Ref(); - } break; - - case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_FAILED: - default: { - // do nothing } } + if (none_running) { + break; + } + thread_active_cond_var.notify_all(); + thread_load_mutex.unlock(); + OS::get_singleton()->delay_usec(1000); + thread_load_mutex.lock(); } + + for (KeyValue &E : user_load_tokens) { + memdelete(E.value); + } + user_load_tokens.clear(); + thread_load_tasks.clear(); - thread_load_mutex->unlock(); + cleaning_tasks = false; + thread_load_mutex.unlock(); } void ResourceLoader::load_path_remaps() { @@ -1080,21 +1143,20 @@ void ResourceLoader::remove_custom_loaders() { } } -void ResourceLoader::initialize() { - thread_load_mutex = memnew(SafeBinaryMutex); - thread_load_max = OS::get_singleton()->get_processor_count(); - thread_loading_count = 0; - thread_waiting_count = 0; - thread_suspended_count = 0; - thread_load_semaphore = memnew(Semaphore); +bool ResourceLoader::is_cleaning_tasks() { + MutexLock lock(thread_load_mutex); + return cleaning_tasks; } -void ResourceLoader::finalize() { - clear_thread_load_tasks(); - memdelete(thread_load_mutex); - memdelete(thread_load_semaphore); +void ResourceLoader::initialize() { + thread_active_max = OS::get_singleton()->get_processor_count(); + thread_active_count = 0; + thread_waiting_count = 0; + thread_suspended_count = 0; } +void ResourceLoader::finalize() {} + ResourceLoadErrorNotify ResourceLoader::err_notify = nullptr; void *ResourceLoader::err_notify_ud = nullptr; @@ -1105,16 +1167,22 @@ bool ResourceLoader::create_missing_resources_if_class_unavailable = false; bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; +thread_local int ResourceLoader::load_nesting = 0; +thread_local Vector ResourceLoader::load_paths_stack; + template <> thread_local uint32_t SafeBinaryMutex::count = 0; -SafeBinaryMutex *ResourceLoader::thread_load_mutex = nullptr; +SafeBinaryMutex ResourceLoader::thread_load_mutex; HashMap ResourceLoader::thread_load_tasks; -Semaphore *ResourceLoader::thread_load_semaphore = nullptr; +ConditionVariable ResourceLoader::thread_active_cond_var; -int ResourceLoader::thread_loading_count = 0; +int ResourceLoader::thread_active_count = 0; int ResourceLoader::thread_waiting_count = 0; int ResourceLoader::thread_suspended_count = 0; -int ResourceLoader::thread_load_max = 0; +int ResourceLoader::thread_active_max = 0; +bool ResourceLoader::cleaning_tasks = false; + +HashMap ResourceLoader::user_load_tokens; SelfList::List ResourceLoader::remapped_list; HashMap> ResourceLoader::translation_remaps; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 72c1f906531..0615ae63aaa 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -107,9 +107,30 @@ public: THREAD_LOAD_LOADED }; + enum LoadThreadMode { + LOAD_THREAD_FROM_CURRENT, + LOAD_THREAD_SPAWN_SINGLE, + LOAD_THREAD_DISTRIBUTE, + }; + + struct LoadToken : public RefCounted { + String local_path; + String user_path; + Ref res_if_unregistered; + + void clear(); + + virtual ~LoadToken(); + }; + static const int BINARY_MUTEX_TAG = 1; + static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode); + static Ref _load_complete(LoadToken &p_load_token, Error *r_error); + private: + static Ref _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock> &p_thread_load_lock); + static Ref loader[MAX_LOADERS]; static int loader_count; static bool timestamp_on_load; @@ -129,8 +150,7 @@ private: static SelfList::List remapped_list; friend class ResourceFormatImporter; - friend class ResourceInteractiveLoader; - // Internal load function. + static Ref _load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress); static ResourceLoadedCallback _loaded_callback; @@ -140,9 +160,12 @@ private: struct ThreadLoadTask { Thread *thread = nullptr; Thread::ID loader_id = 0; + bool first_in_stack = false; ConditionVariable *cond_var = nullptr; + LoadToken *load_token = nullptr; String local_path; String remapped_path; + String dependent_path; String type_hint; float progress = 0.0; ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS; @@ -151,24 +174,28 @@ private: Ref resource; bool xl_remapped = false; bool use_sub_threads = false; - bool start_next = true; - int requests = 0; HashSet sub_tasks; }; static void _thread_load_function(void *p_userdata); - static SafeBinaryMutex *thread_load_mutex; + + static thread_local int load_nesting; + static thread_local Vector load_paths_stack; + static SafeBinaryMutex thread_load_mutex; static HashMap thread_load_tasks; - static Semaphore *thread_load_semaphore; + static ConditionVariable thread_active_cond_var; + static int thread_active_count; static int thread_waiting_count; - static int thread_loading_count; static int thread_suspended_count; - static int thread_load_max; + static int thread_active_max; + static bool cleaning_tasks; + + static HashMap user_load_tokens; static float _dependency_get_progress(const String &p_path); public: - static Error load_threaded_request(const String &p_path, const String &p_type_hint = "", bool p_use_sub_threads = false, ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, const String &p_source_resource = String()); + static Error load_threaded_request(const String &p_path, const String &p_type_hint = "", bool p_use_sub_threads = false, ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE); static ThreadLoadStatus load_threaded_get_status(const String &p_path, float *r_progress = nullptr); static Ref load_threaded_get(const String &p_path, Error *r_error = nullptr); @@ -237,6 +264,8 @@ public: static void set_create_missing_resources_if_class_unavailable(bool p_enable); _FORCE_INLINE_ static bool is_creating_missing_resources_if_class_unavailable_enabled() { return create_missing_resources_if_class_unavailable; } + static bool is_cleaning_tasks(); + static void initialize(); static void finalize(); }; diff --git a/core/os/mutex.h b/core/os/mutex.h index 90cc1632e8f..cee0f8af749 100644 --- a/core/os/mutex.h +++ b/core/os/mutex.h @@ -119,8 +119,25 @@ class MutexLock { public: _ALWAYS_INLINE_ explicit MutexLock(const MutexT &p_mutex) : + lock(p_mutex.mutex){}; +}; + +// This specialization is needed so manual locking and MutexLock can be used +// at the same time on a SafeBinaryMutex. +template +class MutexLock> { + friend class ConditionVariable; + + std::unique_lock lock; + +public: + _ALWAYS_INLINE_ explicit MutexLock(const SafeBinaryMutex &p_mutex) : lock(p_mutex.mutex) { - } + SafeBinaryMutex::count++; + }; + _ALWAYS_INLINE_ ~MutexLock() { + SafeBinaryMutex::count--; + }; }; using Mutex = MutexImpl; // Recursive, for general use diff --git a/core/os/thread.cpp b/core/os/thread.cpp index 502f82aaef9..c067ad1a6ac 100644 --- a/core/os/thread.cpp +++ b/core/os/thread.cpp @@ -66,11 +66,12 @@ void Thread::callback(ID p_caller_id, const Settings &p_settings, Callback p_cal } } -void Thread::start(Thread::Callback p_callback, void *p_user, const Settings &p_settings) { - ERR_FAIL_COND_MSG(id != UNASSIGNED_ID, "A Thread object has been re-started without wait_to_finish() having been called on it."); +Thread::ID Thread::start(Thread::Callback p_callback, void *p_user, const Settings &p_settings) { + ERR_FAIL_COND_V_MSG(id != UNASSIGNED_ID, UNASSIGNED_ID, "A Thread object has been re-started without wait_to_finish() having been called on it."); id = id_counter.increment(); std::thread new_thread(&Thread::callback, id, p_settings, p_callback, p_user); thread.swap(new_thread); + return id; } bool Thread::is_started() const { diff --git a/core/os/thread.h b/core/os/thread.h index a769bb1df42..3e307adfffe 100644 --- a/core/os/thread.h +++ b/core/os/thread.h @@ -109,7 +109,7 @@ public: static Error set_name(const String &p_name); - void start(Thread::Callback p_callback, void *p_user, const Settings &p_settings = Settings()); + ID start(Thread::Callback p_callback, void *p_user, const Settings &p_settings = Settings()); bool is_started() const; ///< waits until thread is finished, and deallocates it. void wait_to_finish(); diff --git a/main/main.cpp b/main/main.cpp index 797aa32441a..86de6497d0a 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -3446,6 +3446,8 @@ void Main::cleanup(bool p_force) { movie_writer->end(); } + ResourceLoader::clear_thread_load_tasks(); + ResourceLoader::remove_custom_loaders(); ResourceSaver::remove_custom_savers(); @@ -3462,8 +3464,6 @@ void Main::cleanup(bool p_force) { ResourceLoader::clear_translation_remaps(); ResourceLoader::clear_path_remaps(); - ResourceLoader::clear_thread_load_tasks(); - ScriptServer::finish_languages(); // Sync pending commands that may have been queued from a different thread during ScriptServer finalization diff --git a/scene/resources/resource_format_text.cpp b/scene/resources/resource_format_text.cpp index c30e0093566..4807af3c272 100644 --- a/scene/resources/resource_format_text.cpp +++ b/scene/resources/resource_format_text.cpp @@ -150,32 +150,31 @@ Error ResourceLoaderText::_parse_ext_resource(VariantParser::Stream *p_stream, R String path = ext_resources[id].path; String type = ext_resources[id].type; + Ref &load_token = ext_resources[id].load_token; - if (ext_resources[id].cache.is_valid()) { - r_res = ext_resources[id].cache; - } else if (use_sub_threads) { - Ref res = ResourceLoader::load_threaded_get(path); + if (load_token.is_valid()) { // If not valid, it's OK since then we know this load accepts broken dependencies. + Ref res = ResourceLoader::_load_complete(*load_token.ptr(), &err); if (res.is_null()) { - if (ResourceLoader::get_abort_on_missing_resources()) { - error = ERR_FILE_MISSING_DEPENDENCIES; - error_text = "[ext_resource] referenced nonexistent resource at: " + path; - _printerr(); - err = error; - } else { - ResourceLoader::notify_dependency_error(local_path, path, type); + if (!ResourceLoader::is_cleaning_tasks()) { + if (ResourceLoader::get_abort_on_missing_resources()) { + error = ERR_FILE_MISSING_DEPENDENCIES; + error_text = "[ext_resource] referenced non-existent resource at: " + path; + _printerr(); + err = error; + } else { + ResourceLoader::notify_dependency_error(local_path, path, type); + } } } else { - ext_resources[id].cache = res; +#ifdef TOOLS_ENABLED + //remember ID for saving + res->set_id_for_path(path, id); +#endif r_res = res; } } else { - error = ERR_FILE_MISSING_DEPENDENCIES; - error_text = "[ext_resource] referenced non-loaded resource at: " + path; - _printerr(); - err = error; + r_res = Ref(); } - } else { - r_res = Ref(); } VariantParser::get_token(p_stream, token, line, r_err_str); @@ -462,48 +461,20 @@ Error ResourceLoaderText::load() { path = remaps[path]; } - ExtResource er; - er.path = path; - er.type = type; - - if (use_sub_threads) { - Error err = ResourceLoader::load_threaded_request(path, type, use_sub_threads, ResourceFormatLoader::CACHE_MODE_REUSE, local_path); - - if (err != OK) { - if (ResourceLoader::get_abort_on_missing_resources()) { - error = ERR_FILE_CORRUPT; - error_text = "[ext_resource] referenced broken resource at: " + path; - _printerr(); - return error; - } else { - ResourceLoader::notify_dependency_error(local_path, path, type); - } - } - - } else { - Ref res = ResourceLoader::load(path, type); - - if (res.is_null()) { - if (ResourceLoader::get_abort_on_missing_resources()) { - error = ERR_FILE_CORRUPT; - error_text = "[ext_resource] referenced nonexistent resource at: " + path; - _printerr(); - return error; - } else { - ResourceLoader::notify_dependency_error(local_path, path, type); - } + ext_resources[id].path = path; + ext_resources[id].type = type; + ext_resources[id].load_token = ResourceLoader::_load_start(path, type, use_sub_threads ? ResourceLoader::LOAD_THREAD_DISTRIBUTE : ResourceLoader::LOAD_THREAD_FROM_CURRENT, ResourceFormatLoader::CACHE_MODE_REUSE); + if (!ext_resources[id].load_token.is_valid()) { + if (ResourceLoader::get_abort_on_missing_resources()) { + error = ERR_FILE_CORRUPT; + error_text = "[ext_resource] referenced non-existent resource at: " + path; + _printerr(); + return error; } else { -#ifdef TOOLS_ENABLED - //remember ID for saving - res->set_id_for_path(local_path, id); -#endif + ResourceLoader::notify_dependency_error(local_path, path, type); } - - er.cache = res; } - ext_resources[id] = er; - error = VariantParser::parse_tag(&stream, lines, error_text, next_tag, &rp); if (error) { diff --git a/scene/resources/resource_format_text.h b/scene/resources/resource_format_text.h index 25001d80232..c35a594f727 100644 --- a/scene/resources/resource_format_text.h +++ b/scene/resources/resource_format_text.h @@ -48,7 +48,7 @@ class ResourceLoaderText { VariantParser::StreamFile stream; struct ExtResource { - Ref cache; + Ref load_token; String path; String type; };