From Commodious Owl, 7 Years ago, written in Plain Text.
- view diff
Embed
  1. Scheduler
  2.  
  3. ks_scheduler_t, uses ks_thread_pool_t internally, and likely one regular thread dedicated to polling a sorted list of scheduled tasks to determine if a thread pool job need to be started from ks_scheduler_task_job to grab the next task to run and reschedule as appropriate
  4.  
  5. A scheduled task callback needs the ability to return a delay for when to schedule running again, or some reserved value (0) to not reschedule the task indicating it is finished
  6.  
  7. To use a thread pool to run each scheduled task callback will require a small stub ks_scheduler_task_job to capture the return value and lock the scheduler to readd to the sorted list
  8.  
  9. Scheduled task entry should be recycled internally to avoid reallocating entries frequently, however it is more important that thread pool jobs are updated to recycle internally
  10.  
  11.  
  12. Example:
  13. The scheduler would allocate it's own internal pool, similar to ks_thread_pool_t, and would be stored within the blade_handle_t
  14.  
  15. A session would create a new scheduled task which replaces the thread running the internal state machine, each iteration of the task representing one iteration of the current thread loop
  16.  
  17. Since a session would reschedule on every iteration, and does not own the thread, it is able to cleanup without waiting on a thread to join/destroy owned by the session, this allows the session to clean itself up when the state machine reaches cleanup
  18.  
  19. If a session reaches a CLEANUP state, blade_session_destroy() can be called and 0 returned to the scheduler, without a deadlock occuring normally caused by waiting for the running thread to finish executing when it attempts to join in auto cleanup for a ks_thread_t
  20.  
  21.  
  22. struct ks_scheduler_s {
  23.         ks_pool_t *pool; // internal only, for mutex and tasks etc, the ks_scheduler_t would be allocated in the pool owned by blade_handle_t for example, so that the race condition of the scheduler is handled by initiating from a different pool
  24.         ks_scheduler_flags_t flags; // reserved for future use
  25.         ks_thread_t *dispatch_thread; // runs ks_scheduler_dispatch
  26.         ks_bool_t shutdown; // terminates the dispatch_thread after waiting for any currently running tasks to finish, no further scheduled tasks will be executed
  27.  
  28.         ks_thread_pool_t *tpool; // runs ks_scheduler_task_job which grabs the next scheduled task, moves it to the running list, executes it, removes from running list, then if the return schedules it again it adds back to the scheduled list in the right order
  29.  
  30.         ks_mutex_t *mutex;
  31.         // the following lists could use faster sorted lists, but it is expected most tasks can find their position by searching from the tail end within a handful of nodes
  32.         ks_scheduler_task_t *scheduled_first; // manual linked lists to avoid allocations in a simclist for nodes, we recycle our own here keeping it all efficient
  33.         ks_scheduler_task_t *scheduled_last; // can check last first, if tasks are later than this, they can just be appended to the end of the list, otherwise search back to find next scheduled time
  34.  
  35.         ks_scheduler_task_t *running_first; // maintains the list of running tasks separately from scheduled for sudden cleanup which requires waiting for this list to empty first
  36.         ks_scheduler_task_t *running_last; // keep the last to make unlinking efficient when a task in the middle of the list finishes next
  37.  
  38.         ks_scheduler_task_t *recycled; // keep a recycled list of task structures to avoid allocations, but only need a single linked list/queue here, and pop from the top as needed
  39. };
  40.  
  41. struct ks_scheduler_task_s {
  42.         ks_pool_t *pool; // maintain the pool (from the scheduler) that this task was allocated from, this is just for convenience in cleanup and to avoid exposing a function to get the pool from the scheduler
  43.         ks_scheduler_t *scheduler; // maintain the scheduler this task belongs to
  44.  
  45.         ks_scheduler_task_t *prev; // linked list pointers, only need one set as each task participates in only one list at a time
  46.         ks_scheduler_task_t *next;
  47.  
  48.         ks_time_t runat; // the next time the dispatcher should run an interation of this task by starting a thread pool job for ks_scheduler_task_job
  49.         void *data; // passed in when creating a task, and passed to the callback, IE: ks_session_t*, not the ID because a session cannot be destroyed without the task terminating first (and avoid the id hash lookup and contention every 500ms for each session)
  50.  
  51.         ks_scheduler_task_callback_t callback; // the callback to execute when runat is exceeded
  52.         // consider adding additional optional callback for when final completion occurs (when callback returns a time of 0, a finalizer optional callback to be called once in the thread pool task job after the last run)
  53. };
  54.  
  55. ks_time_t (*ks_scheduler_task_callback_t)(void *data); // data taken from the task, the return value dictates when to reschedule the next iteration, 0 can be used to indicate not to terminate the task and not further reschedule
  56.  
  57. Standard ks_scheduler_create(), ks_scheduler_destroy(), ks_scheduler_startup(), ks_scheduler_shutdown(), internal link/unlink functions
  58. Standard ks_scheduler_task_create(), ks_scheduler_task_destroy(), ks_scheduler_task_initialize()
  59.  
  60. ks_status_t ks_scheduler_run(ks_scheduler_t *scheduler, ks_time_t startat, ks_scheduler_task_callback_t callback)
  61. {
  62.         ks_status_t ret = KS_STATUS_SUCCESS;
  63.         ks_scheduler_task_t *task = NULL;
  64.         lock(scheduler);
  65.         if (scheduler->recycled) {
  66.                 task = scheduler->recycled;
  67.                 scheduler->recycled = scheduler->recycled->next;
  68.         } else {
  69.                 ks_scheduler_task_create(&task, scheduler->pool, scheduler);
  70.         }
  71.         ks_scheduler_task_initialize(task, startat, callback);
  72.         link_to_scheduler(task);
  73.         unlock(scheduler);
  74.         return ret;
  75. }
  76.  
  77. static void *ks_scheduler_dispatch(ks_thread_t *thread, void *data)
  78. {
  79.         while (!scheduler->shutdown) {
  80.                 lock(scheduler);
  81.                 if (scheduler->shutdown) break;
  82.                 ks_bool_t ran = KS_FALSE;
  83.                 if (scheduler->scheduled_first && scheduler->scheduled_first->runat <= ks_time_now()) {
  84.                         ks_scheduler_task_t *task = scheduler->scheduled_first;
  85.                         unlink_from_scheduled(task);
  86.                         link_to_running(task);
  87.                         ran = KS_TRUE;
  88.                         start_thread_pool_job(task);
  89.                 }
  90.                 unlock(scheduler);
  91.                 if (!ran) ks_sleep(500); // sleep for half a millisecond when idle, precision on task scheduling beyond this is probably not neccessary... could be configured as part of scheduler creation though
  92.         }
  93.         return NULL;
  94. }
  95.  
  96. static void *ks_scheduler_task_job(ks_thread_t *thread, void *data)
  97. {
  98.         ks_scheduler_task_t *task = data;
  99.         task->runat = task->callback(task->data);
  100.         lock(task->scheduler);
  101.         unlink_from_running(task);
  102.         if (task->runat) link_to_scheduled(task);
  103.         else {
  104.                 // if added, run finalizer callback here
  105.                 link_to_recycled(task);
  106.         }
  107.         unlock(task->scheduler);
  108.         return NULL;
  109. }