diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index f852e8d3824..c68191554c3 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -476,9 +476,6 @@ Ref ResourceLoader::_load_start(const String &p_path, if (run_on_current_thread) { load_task_ptr->thread_id = Thread::get_caller_id(); - if (must_not_register) { - load_token->res_if_unregistered = load_task_ptr->resource; - } } else { load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); } @@ -486,6 +483,9 @@ Ref ResourceLoader::_load_start(const String &p_path, if (run_on_current_thread) { _thread_load_function(load_task_ptr); + if (must_not_register) { + load_token->res_if_unregistered = load_task_ptr->resource; + } } return load_token; @@ -613,14 +613,33 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro return Ref(); } - if (load_task.task_id != 0 && !load_task.awaited) { - // Loading thread is in the worker pool and still not awaited. + if (load_task.task_id != 0) { + // Loading thread is in the worker pool. load_task.awaited = true; thread_load_mutex.unlock(); - WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); - thread_load_mutex.lock(); + Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + if (err == ERR_BUSY) { + // The WorkerThreadPool has scheduled tasks in a way that the current load depends on + // another one in a lower stack frame. Restart such load here. When the stack is eventually + // unrolled, the original load will have been notified to go on. +#ifdef DEV_ENABLED + print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now."); +#endif + // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's + // an ongoing load for that resource and wait for it again. This value forces a new load. + Ref token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); + Ref resource = _load_complete(*token.ptr(), &err); + if (r_error) { + *r_error = err; + } + thread_load_mutex.lock(); + return resource; + } else { + DEV_ASSERT(err == OK); + thread_load_mutex.lock(); + } } else { - // Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread. + // Loading thread is main or user thread. if (!load_task.cond_var) { load_task.cond_var = memnew(ConditionVariable); } diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index e59ab3d6ae8..afe6ecd1b3d 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -51,6 +51,23 @@ void WorkerThreadPool::_process_task_queue() { void WorkerThreadPool::_process_task(Task *p_task) { bool low_priority = p_task->low_priority; + int pool_thread_index = -1; + Task *prev_low_prio_task = nullptr; // In case this is recursively called. + + if (!use_native_low_priority_threads) { + pool_thread_index = thread_ids[Thread::get_caller_id()]; + ThreadData &curr_thread = threads[pool_thread_index]; + task_mutex.lock(); + p_task->pool_thread_index = pool_thread_index; + if (low_priority) { + low_priority_tasks_running++; + prev_low_prio_task = curr_thread.current_low_prio_task; + curr_thread.current_low_prio_task = p_task; + } else { + curr_thread.current_low_prio_task = nullptr; + } + task_mutex.unlock(); + } if (p_task->group) { // Handling a group @@ -126,21 +143,36 @@ void WorkerThreadPool::_process_task(Task *p_task) { p_task->callable.callp(nullptr, 0, ret, ce); } + task_mutex.lock(); p_task->completed = true; - p_task->done_semaphore.post(); + for (uint8_t i = 0; i < p_task->waiting; i++) { + p_task->done_semaphore.post(); + } + if (!use_native_low_priority_threads) { + p_task->pool_thread_index = -1; + } + task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. } - if (!use_native_low_priority_threads && low_priority) { - // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. + // Task may have been freed by now (all callers notified). + p_task = nullptr; + + if (!use_native_low_priority_threads) { bool post = false; task_mutex.lock(); - if (low_priority_task_queue.first()) { - Task *low_prio_task = low_priority_task_queue.first()->self(); - low_priority_task_queue.remove(low_priority_task_queue.first()); - task_queue.add_last(&low_prio_task->task_elem); - post = true; - } else { + ThreadData &curr_thread = threads[pool_thread_index]; + curr_thread.current_low_prio_task = prev_low_prio_task; + if (low_priority) { low_priority_threads_used--; + low_priority_tasks_running--; + // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. + if (_try_promote_low_priority_task()) { + post = true; + } + + if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { + _prevent_low_prio_saturation_deadlock(); + } } task_mutex.unlock(); if (post) { @@ -198,6 +230,35 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { } } +bool WorkerThreadPool::_try_promote_low_priority_task() { + if (low_priority_task_queue.first()) { + Task *low_prio_task = low_priority_task_queue.first()->self(); + low_priority_task_queue.remove(low_priority_task_queue.first()); + task_queue.add_last(&low_prio_task->task_elem); + low_priority_threads_used++; + return true; + } else { + return false; + } +} + +void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { + if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { +#ifdef DEV_ENABLED + print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task."); +#endif + // In order not to create dependency cycles, we can only schedule the next one. + // We'll keep doing the same until the deadlock is broken, + SelfList *to_promote = low_priority_task_queue.first(); + if (to_promote) { + low_priority_task_queue.remove(to_promote); + task_queue.add_last(to_promote); + low_priority_threads_used++; + task_available_semaphore.post(); + } + } +} + WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); } @@ -238,66 +299,113 @@ bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const { return completed; } -void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { +Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task_mutex.lock(); Task **taskp = tasks.getptr(p_task_id); if (!taskp) { task_mutex.unlock(); - ERR_FAIL_MSG("Invalid Task ID"); // Invalid task + ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task } Task *task = *taskp; - if (task->waiting) { - String description = task->description; - task_mutex.unlock(); - if (description.is_empty()) { - ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task - } else { - ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task + if (!task->completed) { + if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. + int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; + if (caller_pool_th_index == task->pool_thread_index) { + // Deadlock prevention. + // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well + // and another task was run to make use of the thread in the meantime, with enough bad luck as to + // the need to wait for the original task arose in turn. + // In other words, the task we want to wait for is buried in the stack. + // Let's report the caller about the issue to it handles as it sees fit. + task_mutex.unlock(); + return ERR_BUSY; + } } - } - task->waiting = true; + task->waiting++; - task_mutex.unlock(); - - if (use_native_low_priority_threads && task->low_priority) { - task->low_priority_thread->wait_to_finish(); - - task_mutex.lock(); - native_thread_allocator.free(task->low_priority_thread); - } else { - int *index = thread_ids.getptr(Thread::get_caller_id()); - - if (index) { - // We are an actual process thread, we must not be blocked so continue processing stuff if available. - bool must_exit = false; - while (true) { - if (task->done_semaphore.try_wait()) { - // If done, exit - break; - } - if (!must_exit && task_available_semaphore.try_wait()) { - if (exit_threads) { - must_exit = true; - } else { - // Solve tasks while they are around. - _process_task_queue(); - continue; + bool is_low_prio_waiting_for_another = false; + if (!use_native_low_priority_threads) { + // Deadlock prevention: + // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, + // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued + // low-prio task to the schedule queue so it can run and break the "impasse". + // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more + // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph + // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be + // an issue in the real world, a further fix can be applied against that. + if (task->low_priority) { + bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; + if (awaiter_is_a_low_prio_task) { + is_low_prio_waiting_for_another = true; + low_priority_tasks_awaiting_others++; + if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { + _prevent_low_prio_saturation_deadlock(); } } - OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. } - } else { + } + + task_mutex.unlock(); + + if (use_native_low_priority_threads && task->low_priority) { task->done_semaphore.wait(); + } else { + bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); + if (current_is_pool_thread) { + // We are an actual process thread, we must not be blocked so continue processing stuff if available. + bool must_exit = false; + while (true) { + if (task->done_semaphore.try_wait()) { + // If done, exit + break; + } + if (!must_exit) { + if (task_available_semaphore.try_wait()) { + if (exit_threads) { + must_exit = true; + } else { + // Solve tasks while they are around. + _process_task_queue(); + continue; + } + } else if (!use_native_low_priority_threads && task->low_priority) { + // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. + task_mutex.lock(); + bool post = _try_promote_low_priority_task(); + task_mutex.unlock(); + if (post) { + task_available_semaphore.post(); + } + } + } + OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. + } + } else { + task->done_semaphore.wait(); + } } task_mutex.lock(); + if (is_low_prio_waiting_for_another) { + low_priority_tasks_awaiting_others--; + } + + task->waiting--; + } + + if (task->waiting == 0) { + if (use_native_low_priority_threads && task->low_priority) { + task->low_priority_thread->wait_to_finish(); + native_thread_allocator.free(task->low_priority_thread); + } + tasks.erase(p_task_id); + task_allocator.free(task); } - tasks.erase(p_task_id); - task_allocator.free(task); task_mutex.unlock(); + return OK; } WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { @@ -429,7 +537,7 @@ void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_pr if (p_use_native_threads_low_priority) { max_low_priority_threads = 0; } else { - max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count); + max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); } use_native_low_priority_threads = p_use_native_threads_low_priority; diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index d47c6ad7147..d4d93877658 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -81,10 +81,11 @@ private: bool completed = false; Group *group = nullptr; SelfList task_elem; - bool waiting = false; // Waiting for completion + uint32_t waiting = 0; bool low_priority = false; BaseTemplateUserdata *template_userdata = nullptr; Thread *low_priority_thread = nullptr; + int pool_thread_index = -1; void free_template_userdata(); Task() : @@ -104,6 +105,7 @@ private: struct ThreadData { uint32_t index; Thread thread; + Task *current_low_prio_task = nullptr; }; TightLocalVector threads; @@ -116,6 +118,8 @@ private: bool use_native_low_priority_threads = false; uint32_t max_low_priority_threads = 0; uint32_t low_priority_threads_used = 0; + uint32_t low_priority_tasks_running = 0; + uint32_t low_priority_tasks_awaiting_others = 0; uint64_t last_task = 1; @@ -127,6 +131,9 @@ private: void _post_task(Task *p_task, bool p_high_priority); + bool _try_promote_low_priority_task(); + void _prevent_low_prio_saturation_deadlock(); + static WorkerThreadPool *singleton; TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); @@ -169,7 +176,7 @@ public: TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String()); bool is_task_completed(TaskID p_task_id) const; - void wait_for_task_completion(TaskID p_task_id); + Error wait_for_task_completion(TaskID p_task_id); template GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) { diff --git a/doc/classes/WorkerThreadPool.xml b/doc/classes/WorkerThreadPool.xml index ace5f95506e..136c6279d7b 100644 --- a/doc/classes/WorkerThreadPool.xml +++ b/doc/classes/WorkerThreadPool.xml @@ -100,10 +100,13 @@ - + Pauses the thread that calls this method until the task with the given ID is completed. + Returns [constant @GlobalScope.OK] if the task could be successfully awaited. + Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of). + Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others.