Files
banan-os/kernel/kernel/Scheduler.cpp
Bananymous 9084d9305c Kernel: Change preemption condition
Instead of keeping track of the current time and rescheduling when
interval has passed, keep track of the next expected reschedule time.
This prevents theoretically missing every second pre-emption when
scheduler's timer is interrupting at same rate as the interval.
2026-04-11 03:30:52 +03:00

722 lines
21 KiB
C++

#include <BAN/Optional.h>
#include <BAN/Sort.h>
#include <kernel/InterruptController.h>
#include <kernel/Lock/Mutex.h>
#include <kernel/Process.h>
#include <kernel/Scheduler.h>
#include <kernel/SchedulerQueueNode.h>
#include <kernel/Thread.h>
#include <kernel/Timer/Timer.h>
#define SCHEDULER_ASSERT 1
#if SCHEDULER_ASSERT == 0
#undef ASSERT
#define ASSERT(...)
#endif
namespace Kernel
{
static constexpr uint64_t s_reschedule_interval_ns = 10'000'000;
static constexpr uint64_t s_load_balance_interval_ns = 1'000'000'000;
static BAN::Atomic<uint8_t> s_schedulers_initialized { 0 };
struct ProcessorInfo
{
uint64_t idle_time_ns { s_load_balance_interval_ns };
uint32_t max_load_threads { 0 };
};
static SpinLock s_processor_info_time_lock;
static BAN::Array<ProcessorInfo, 0xFF> s_processor_infos;
static BAN::Atomic<size_t> s_next_processor_index { 0 };
void SchedulerQueue::add_thread_to_back(Node* node)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
node->next = nullptr;
node->prev = m_tail;
(m_tail ? m_tail->next : m_head) = node;
m_tail = node;
}
void SchedulerQueue::add_thread_with_wake_time(Node* node)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
if (m_tail == nullptr || node->wake_time_ns >= m_tail->wake_time_ns)
return add_thread_to_back(node);
Node* next = m_head;
Node* prev = nullptr;
while (next && node->wake_time_ns > next->wake_time_ns)
{
prev = next;
next = next->next;
}
node->next = next;
node->prev = prev;
(next ? next->prev : m_tail) = node;
(prev ? prev->next : m_head) = node;
}
template<typename F>
SchedulerQueue::Node* SchedulerQueue::remove_with_condition(F callback)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
for (Node* node = m_head; node; node = node->next)
{
if (!callback(node))
continue;
remove_node(node);
return node;
}
return nullptr;
}
void SchedulerQueue::remove_node(Node* node)
{
(node->prev ? node->prev->next : m_head) = node->next;
(node->next ? node->next->prev : m_tail) = node->prev;
node->prev = nullptr;
node->next = nullptr;
}
SchedulerQueue::Node* SchedulerQueue::front()
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
ASSERT(!empty());
return m_head;
}
SchedulerQueue::Node* SchedulerQueue::pop_front()
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
if (empty())
return nullptr;
Node* result = m_head;
m_head = m_head->next;
(m_head ? m_head->prev : m_tail) = nullptr;
result->next = nullptr;
return result;
}
BAN::ErrorOr<Scheduler*> Scheduler::create()
{
auto* scheduler = new Scheduler();
if (scheduler == nullptr)
return BAN::Error::from_errno(ENOMEM);
return scheduler;
}
BAN::ErrorOr<void> Scheduler::initialize()
{
m_idle_thread = TRY(Thread::create_kernel([](void*) { asm volatile("1: hlt; jmp 1b"); }, nullptr));
ASSERT(m_idle_thread);
size_t processor_index = 0;
for (; processor_index < Processor::count(); processor_index++)
if (Processor::id_from_index(processor_index) == Processor::current_id())
break;
ASSERT(processor_index < Processor::count());
// each CPU does load balance at different times. This calulates the offset to other CPUs
m_last_load_balance_ns = s_load_balance_interval_ns * processor_index / Processor::count();
m_idle_ns = -m_last_load_balance_ns;
s_schedulers_initialized++;
while (s_schedulers_initialized < Processor::count())
__builtin_ia32_pause();
if (Processor::count() > 1)
Processor::set_smp_enabled();
m_next_reschedule_ns = SystemTimer::get().ns_since_boot();
return {};
}
void Scheduler::add_current_to_most_loaded(SchedulerQueue* target_queue)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
bool has_current = false;
for (auto& info : m_most_loaded_threads)
{
if (info.node == m_current)
{
info.queue = target_queue;
has_current = true;
break;
}
}
if (!has_current)
{
size_t index = 0;
for (; index < m_most_loaded_threads.size() - 1; index++)
if (m_most_loaded_threads[index].node == nullptr)
break;
m_most_loaded_threads[index].queue = target_queue;
m_most_loaded_threads[index].node = m_current;
}
BAN::sort::sort(m_most_loaded_threads.begin(), m_most_loaded_threads.end(),
[](const ThreadInfo& a, const ThreadInfo& b) -> bool
{
if (a.node == nullptr || b.node == nullptr)
return a.node;
return a.node->time_used_ns > b.node->time_used_ns;
}
);
}
void Scheduler::update_most_loaded_node_queue(SchedulerQueue::Node* node, SchedulerQueue* target_queue)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
for (auto& info : m_most_loaded_threads)
{
if (info.node == node)
{
info.queue = target_queue;
break;
}
}
}
void Scheduler::remove_node_from_most_loaded(SchedulerQueue::Node* node)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
size_t i = 0;
for (; i < m_most_loaded_threads.size(); i++)
if (m_most_loaded_threads[i].node == node)
break;
for (; i < m_most_loaded_threads.size() - 1; i++)
m_most_loaded_threads[i] = m_most_loaded_threads[i + 1];
m_most_loaded_threads.back().node = nullptr;
m_most_loaded_threads.back().queue = nullptr;
}
void Scheduler::reschedule(YieldRegisters* yield_registers)
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
// If there are no other threads in run queue, reschedule can be no-op :)
if (m_run_queue.empty() && (!m_current || !m_current->blocked) && current_thread().state() == Thread::State::Executing)
return;
if (m_current == nullptr)
m_idle_ns += SystemTimer::get().ns_since_boot() - m_idle_start_ns;
else
{
switch (m_current->thread->state())
{
case Thread::State::Terminated:
remove_node_from_most_loaded(m_current);
if (&PageTable::current() != &PageTable::kernel())
PageTable::kernel().load();
delete m_current->thread;
delete m_current;
m_thread_count--;
break;
case Thread::State::Executing:
{
const uint64_t current_ns = SystemTimer::get().ns_since_boot();
m_current->thread->yield_registers() = *yield_registers;
m_current->time_used_ns += current_ns - m_current->last_start_ns;
add_current_to_most_loaded(m_current->blocked ? &m_block_queue : &m_run_queue);
if (!m_current->blocked)
m_run_queue.add_thread_to_back(m_current);
else
m_block_queue.add_thread_with_wake_time(m_current);
break;
}
case Thread::State::NotStarted:
ASSERT(!m_current->blocked);
m_current->time_used_ns = 0;
remove_node_from_most_loaded(m_current);
m_run_queue.add_thread_to_back(m_current);
break;
}
}
while ((m_current = m_run_queue.pop_front()))
{
if (m_current->thread->state() != Thread::State::Terminated)
break;
remove_node_from_most_loaded(m_current);
if (&PageTable::current() != &PageTable::kernel())
PageTable::kernel().load();
delete m_current->thread;
delete m_current;
m_thread_count--;
}
if (m_current == nullptr)
{
if (&PageTable::current() != &PageTable::kernel())
PageTable::kernel().load();
*yield_registers = m_idle_thread->yield_registers();
m_idle_thread->m_state = Thread::State::Executing;
m_idle_start_ns = SystemTimer::get().ns_since_boot();
return;
}
update_most_loaded_node_queue(m_current, nullptr);
auto* thread = m_current->thread;
auto& page_table = thread->has_process() ? thread->process().page_table() : PageTable::kernel();
if (&PageTable::current() != &page_table)
page_table.load();
if (thread->state() == Thread::State::NotStarted)
{
thread->m_state = Thread::State::Executing;
thread->set_cpu_time_start();
}
if (thread->is_userspace())
{
const vaddr_t kernel_stack_top = thread->kernel_stack_top();
Processor::gdt().set_tss_stack(kernel_stack_top);
Processor::set_thread_syscall_stack(kernel_stack_top);
Processor::load_segments();
}
(Processor::get_current_sse_thread() == thread)
? Processor::enable_sse()
: Processor::disable_sse();
*yield_registers = thread->yield_registers();
m_current->last_start_ns = SystemTimer::get().ns_since_boot();
}
void Scheduler::wake_up_sleeping_threads()
{
const uint64_t current_ns = SystemTimer::get().ns_since_boot();
while (!m_block_queue.empty() && current_ns >= m_block_queue.front()->wake_time_ns)
{
auto* node = m_block_queue.pop_front();
{
SpinLockGuard _(node->blocker_lock);
if (node->blocker)
node->blocker->remove_blocked_thread(node);
}
node->blocked = false;
update_most_loaded_node_queue(node, &m_run_queue);
m_run_queue.add_thread_to_back(node);
}
}
void Scheduler::reschedule_if_idle()
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
if (m_current != nullptr)
return;
if (m_run_queue.empty())
wake_up_sleeping_threads();
if (!m_run_queue.empty())
Processor::yield();
}
extern "C" void scheduler_on_yield(YieldRegisters* yield_registers)
{
Processor::scheduler().reschedule(yield_registers);
}
void Scheduler::timer_interrupt()
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
if (Processor::is_smp_enabled())
do_load_balancing();
wake_up_sleeping_threads();
if (SystemTimer::get().ns_since_boot() >= m_next_reschedule_ns)
{
m_next_reschedule_ns += s_reschedule_interval_ns;
Processor::yield();
}
}
void Scheduler::unblock_thread(SchedulerQueue::Node* node)
{
auto state = Processor::get_interrupt_state();
Processor::set_interrupt_state(InterruptState::Disabled);
if (node->processor_id == Processor::current_id())
{
if (!node->blocked)
return;
if (node != m_current)
m_block_queue.remove_node(node);
{
SpinLockGuard _(node->blocker_lock);
if (node->blocker)
node->blocker->remove_blocked_thread(node);
}
node->blocked = false;
if (node != m_current)
m_run_queue.add_thread_to_back(node);
update_most_loaded_node_queue(node, &m_run_queue);
}
else
{
Processor::send_smp_message(node->processor_id, {
.type = Processor::SMPMessage::Type::UnblockThread,
.unblock_thread = node
});
}
Processor::set_interrupt_state(state);
}
void Scheduler::add_thread(SchedulerQueue::Node* node)
{
auto state = Processor::get_interrupt_state();
Processor::set_interrupt_state(InterruptState::Disabled);
ASSERT(node->processor_id == Processor::current_id());
if (!node->blocked)
m_run_queue.add_thread_to_back(node);
else
m_block_queue.add_thread_with_wake_time(node);
if (auto* thread = node->thread; thread->is_userspace() && thread->has_process())
thread->update_processor_index_address();
m_thread_count++;
Processor::set_interrupt_state(state);
}
ProcessorID Scheduler::find_least_loaded_processor() const
{
ProcessorID least_loaded_id = Processor::current_id();
uint64_t most_idle_ns = m_idle_ns;
uint32_t least_max_load_threads = static_cast<uint32_t>(-1);
for (uint8_t i = 0; i < Processor::count(); i++)
{
const auto processor_id = Processor::id_from_index(i);
if (processor_id == Processor::current_id())
continue;
const auto& info = s_processor_infos[processor_id.as_u32()];
if (info.idle_time_ns < most_idle_ns || info.max_load_threads > least_max_load_threads)
continue;
least_loaded_id = processor_id;
most_idle_ns = info.idle_time_ns;
least_max_load_threads = info.max_load_threads;
}
return least_loaded_id;
}
void Scheduler::do_load_balancing()
{
ASSERT(Processor::get_interrupt_state() == InterruptState::Disabled);
const uint64_t current_ns = SystemTimer::get().ns_since_boot();
if (current_ns < m_last_load_balance_ns + s_load_balance_interval_ns)
return;
if (m_current == nullptr)
{
m_idle_ns += current_ns - m_idle_start_ns;
m_idle_start_ns = current_ns;
}
else
{
m_current->time_used_ns += current_ns - m_current->last_start_ns;
m_current->last_start_ns = current_ns;
add_current_to_most_loaded(nullptr);
}
if constexpr(DEBUG_SCHEDULER)
{
const uint64_t duration_ns = current_ns - m_last_load_balance_ns;
const uint64_t processing_ns = duration_ns - m_idle_ns;
{
const uint64_t load_percent_x1000 = BAN::Math::div_round_up<uint64_t>(processing_ns * 100'000, duration_ns);
dprintln("CPU {}: { 2}.{3}% ({} threads)", Processor::current_id(), load_percent_x1000 / 1000, load_percent_x1000 % 1000, m_thread_count);
}
if (m_current)
{
const char* name = "<unknown>";
if (m_current->thread->has_process() && *m_current->thread->process().name())
name = m_current->thread->process().name();
const uint64_t load_percent_x1000 = BAN::Math::div_round_up<uint64_t>(m_current->time_used_ns * 100'000, processing_ns);
dprintln(" tid { 2}: { 3}.{3}% <{}> current", m_current->thread->tid(), load_percent_x1000 / 1000, load_percent_x1000 % 1000, name);
}
m_run_queue.remove_with_condition(
[&](SchedulerQueue::Node* node)
{
const uint64_t load_percent_x1000 = BAN::Math::div_round_up<uint64_t>(node->time_used_ns * 100'000, processing_ns);
dprintln(" tid { 2}: { 3}.{3}% active", node->thread->tid(), load_percent_x1000 / 1000, load_percent_x1000 % 1000);
return false;
}
);
m_block_queue.remove_with_condition(
[&](SchedulerQueue::Node* node)
{
const uint64_t load_percent_x1000 = BAN::Math::div_round_up<uint64_t>(node->time_used_ns * 100'000, processing_ns);
dprintln(" tid { 2}: { 3}.{3}% blocked", node->thread->tid(), load_percent_x1000 / 1000, load_percent_x1000 % 1000);
return false;
}
);
}
if (!s_processor_info_time_lock.try_lock_interrupts_disabled())
{
dprintln_if(DEBUG_SCHEDULER, "Load balancing cannot keep up");
return;
}
if (m_idle_ns == 0 && m_should_calculate_max_load_threads)
{
const auto& most_loaded_thread = m_most_loaded_threads.front();
if (most_loaded_thread.node == nullptr || most_loaded_thread.node->time_used_ns == 0)
s_processor_infos[Processor::current_id().as_u32()].max_load_threads = 0;
else
{
const uint64_t duration_ns = current_ns - m_last_load_balance_ns;
const uint64_t max_thread_load_x1000 = 1000 * most_loaded_thread.node->time_used_ns / duration_ns;
if (max_thread_load_x1000 == 0)
s_processor_infos[Processor::current_id().as_u32()].max_load_threads = 0;
else
{
const uint64_t max_load_thread_count = ((2000 / max_thread_load_x1000) + 1) / 2;
s_processor_infos[Processor::current_id().as_u32()].max_load_threads = max_load_thread_count;
}
}
}
constexpr auto absolute_difference_u64 = [](uint64_t a, uint64_t b) { return (a < b) ? (b - a) : (a - b); };
for (size_t i = 1; i < m_most_loaded_threads.size(); i++)
{
auto& thread_info = m_most_loaded_threads[i];
if (thread_info.node == nullptr)
break;
if (thread_info.node == m_current || thread_info.queue == nullptr)
continue;
auto least_loaded_id = find_least_loaded_processor();
if (least_loaded_id == Processor::current_id())
break;
auto& most_idle_info = s_processor_infos[least_loaded_id.as_u32()];
auto& my_info = s_processor_infos[Processor::current_id().as_u32()];
if (m_idle_ns == 0)
{
if (my_info.max_load_threads == 0)
break;
if (most_idle_info.idle_time_ns == 0)
{
if (most_idle_info.max_load_threads + 1 > my_info.max_load_threads - 1)
break;
my_info.max_load_threads -= 1;
most_idle_info.max_load_threads += 1;
dprintln_if(DEBUG_SCHEDULER, "CPU {}: sending tid {} to CPU {} (max load)", Processor::current_id(), thread_info.node->thread->tid(), least_loaded_id);
}
else
{
my_info.max_load_threads -= 1;
most_idle_info.idle_time_ns = 0;
most_idle_info.max_load_threads = 1;
dprintln_if(DEBUG_SCHEDULER, "CPU {}: sending tid {} to CPU {}", Processor::current_id(), thread_info.node->thread->tid(), least_loaded_id);
}
}
else
{
const uint64_t my_current_proc_ns = s_load_balance_interval_ns - BAN::Math::min(s_load_balance_interval_ns, m_idle_ns);
const uint64_t other_current_proc_ns = s_load_balance_interval_ns - BAN::Math::min(s_load_balance_interval_ns, most_idle_info.idle_time_ns);
const uint64_t current_proc_diff_ns = absolute_difference_u64(my_current_proc_ns, other_current_proc_ns);
const uint64_t my_new_proc_ns = my_current_proc_ns - BAN::Math::min(thread_info.node->time_used_ns, my_current_proc_ns);
const uint64_t other_new_proc_ns = other_current_proc_ns + thread_info.node->time_used_ns;
const uint64_t new_proc_diff_ns = absolute_difference_u64(my_new_proc_ns, other_new_proc_ns);
// require 10% decrease between CPU loads to do send thread to other CPU
if (new_proc_diff_ns >= current_proc_diff_ns || (100 * (current_proc_diff_ns - new_proc_diff_ns) / current_proc_diff_ns) < 10)
continue;
most_idle_info.idle_time_ns -= BAN::Math::min(thread_info.node->time_used_ns, most_idle_info.idle_time_ns);
m_idle_ns += thread_info.node->time_used_ns;
dprintln_if(DEBUG_SCHEDULER, "CPU {}: sending tid {} to CPU {}", Processor::current_id(), thread_info.node->thread->tid(), least_loaded_id);
}
if (auto* thread = thread_info.node->thread; thread == Processor::get_current_sse_thread())
{
Processor::enable_sse();
thread->save_sse();
Processor::set_current_sse_thread(nullptr);
Processor::disable_sse();
}
thread_info.node->time_used_ns = 0;
{
auto& my_queue = (thread_info.queue == &m_run_queue) ? m_run_queue : m_block_queue;
my_queue.remove_node(thread_info.node);
m_thread_count--;
}
thread_info.node->processor_id = least_loaded_id;
Processor::send_smp_message(least_loaded_id, {
.type = Processor::SMPMessage::Type::NewThread,
.new_thread = thread_info.node
});
thread_info.node = nullptr;
thread_info.queue = nullptr;
if (m_idle_ns == 0)
break;
}
s_processor_infos[Processor::current_id().as_u32()].idle_time_ns = m_idle_ns;
s_processor_info_time_lock.unlock(InterruptState::Disabled);
if (m_current)
m_current->time_used_ns = 0;
for (auto& thread_info : m_most_loaded_threads)
thread_info = {};
m_run_queue .remove_with_condition([&](SchedulerQueue::Node* node) { node->time_used_ns = 0; return false; });
m_block_queue.remove_with_condition([&](SchedulerQueue::Node* node) { node->time_used_ns = 0; return false; });
m_idle_ns = 0;
m_should_calculate_max_load_threads = true;
m_last_load_balance_ns += s_load_balance_interval_ns;
}
BAN::ErrorOr<void> Scheduler::bind_thread_to_processor(Thread* thread, ProcessorID processor_id)
{
ASSERT(thread->m_scheduler_node == nullptr);
auto* new_node = new SchedulerQueue::Node(thread);
if (new_node == nullptr)
return BAN::Error::from_errno(ENOMEM);
ASSERT(processor_id != PROCESSOR_NONE);
new_node->processor_id = processor_id;
thread->m_scheduler_node = new_node;
return {};
}
BAN::ErrorOr<void> Scheduler::add_thread(Thread* thread)
{
if (thread->m_scheduler_node == nullptr)
{
const size_t processor_index = s_next_processor_index++ % Processor::count();
const auto processor_id = Processor::id_from_index(processor_index);
TRY(bind_thread_to_processor(thread, processor_id));
}
auto* node = thread->m_scheduler_node;
if (node->processor_id == Processor::current_id())
add_thread(node);
else
{
Processor::send_smp_message(node->processor_id, {
.type = Processor::SMPMessage::Type::NewThread,
.new_thread = node
});
}
return {};
}
void Scheduler::block_current_thread(ThreadBlocker* blocker, uint64_t wake_time_ns, BaseMutex* mutex)
{
auto state = Processor::get_interrupt_state();
Processor::set_interrupt_state(InterruptState::Disabled);
ASSERT(m_current->processor_id == Processor::current_id());
ASSERT(!m_current->blocked);
m_current->blocked = true;
m_current->wake_time_ns = wake_time_ns;
{
SpinLockGuard _(m_current->blocker_lock);
if (blocker)
blocker->add_thread_to_block_queue(m_current);
}
update_most_loaded_node_queue(m_current, &m_block_queue);
uint32_t lock_depth = 0;
if (mutex != nullptr)
{
ASSERT(mutex->is_locked() && mutex->locker() == m_current->thread->tid());
lock_depth = mutex->lock_depth();
}
for (uint32_t i = 0; i < lock_depth; i++)
mutex->unlock();
Processor::yield();
Processor::set_interrupt_state(state);
for (uint32_t i = 0; i < lock_depth; i++)
mutex->lock();
}
void Scheduler::unblock_thread(Thread* thread)
{
unblock_thread(thread->m_scheduler_node);
}
Thread& Scheduler::current_thread()
{
if (m_current)
return *m_current->thread;
return *m_idle_thread;
}
Thread& Scheduler::idle_thread()
{
return *m_idle_thread;
}
pid_t Scheduler::current_tid() const
{
return m_current ? m_current->thread->tid() : 0;
}
bool Scheduler::is_idle() const
{
return m_current == nullptr;
}
}