/
ColdCore-3.0a8/
ColdCore-3.0a8/src/
new object $scheduler: $utilities;

var $root created_on = 810294797;
var $root flags = ['methods, 'code, 'variables, 'core];
var $root inited = 1;
var $root managed = [$scheduler];
var $root manager = $scheduler;
var $scheduler blocked_tasks = #[];
var $scheduler expected_lag = 0;
var $scheduler server_lag = 0;
var $scheduler sub_schedulers = [$heart];
var $scheduler suspended_tasks = #[];
var $scheduler task_index = 2;
var $scheduler task_queue = [];

public method .add_sub_scheduler() {
    arg object;
    
    if (!($sys.is_admin(sender())))
        throw(~perm, "Only admins may add sub schedulers.");
    if (type(object) != 'objnum)
        throw(~type, "Object must be a dbref.");
    sub_schedulers += [object];
};

public method .add_task() {
    arg time, method, @args;
    var task, i, j, tid, flags, x, y, task_time;
    
    if ((type(time) != 'integer) || ((type(method) != 'symbol) || (type(args) != 'list)))
        throw(~type, "Arguments are not an integer, symbol, and list.");
    if (time < 0)
        throw(~time, "Time is negative.");
    if (time > 31536000)
        throw(~time, "Try to schedule a task LESS than a year from now?");
    
    // get a new db task id
    ++task_index;
    tid = task_index;
    
    // flags can be set in a system only add_task, for now set them as 'system
    flags = ['system];
    task_time = time();
    task = [tid, time + task_time, task_time, sender(), caller(), method, flags, args];
    .add_to_task_queue(task);
    return tid;
};

private method .add_to_task_queue() {
    arg task;
    
    refresh();
    
    // We don't want to run out of tics
    task_queue = $heap.push(task_queue, task, 2);
};

public method .block_task() {
    arg ident;
    var tasks;
    
    // I want to be atomic!
    // Add the task_id to the queue of blocked tasks for this identifier.
    if (blocked_tasks.contains(ident))
        tasks = (blocked_tasks[ident]) + [task_id()];
    else
        tasks = [task_id()];
    blocked_tasks = blocked_tasks.add(ident, tasks);
    
    // And go to sleep until we are woken.
    $sys.suspend();
};

public method .cancel() {
    arg task_id;
    var objs;
    
    if ((objs = (| suspended_tasks[task_id] |))) {
        if ((!(sender() in objs)) && (!($sys.is_system(sender()))))
            throw(~perm, (sender() + " may not cancel task ") + task_id);
        suspended_tasks = dict_del(suspended_tasks, task_id);
    }
    return (> cancel(task_id) <);
};

root method .core_scheduler() {
    suspended_tasks = #[];
    task_queue = [];
};

private method .del_from_task_queue() {
    arg i;
    
    refresh();
    
    // this must *not* throw
    task_queue = $heap.del(task_queue, i, 2);
};

public method .del_sub_scheduler() {
    arg object;
    var pos, s;
    
    if (!($sys.is_admin(sender())))
        throw(~perm, "Only admins may delete sub schedulers.");
    if (type(object) != 'objnum)
        throw(~type, "Object must be a dbref.");
    pos = object in sub_schedulers;
    if (!pos)
        throw(~objnf, "Object not a sub scheduler.");
    s = [];
    if (pos > 1)
        s += [sub_schedulers.subrange(1, pos - 1)];
    if (s < (sub_schedulers.length()))
        s += [sub_schedulers.subrange(pos + 1)];
    sub_schedulers = s;
};

public method .del_task() {
    arg tid;
    var task, n;
    
    // sender must be system, for now.
    if (type(tid) != 'integer)
        throw(~type, "Task Identification must be an integer");
    for task in (task_queue) {
        n++;
        if ((task[1]) == tid) {
            ((sender() == (task[4])) && (caller() == (task[5]))) || (> .perms(sender(), 'system) <);
            .del_from_task_queue(n);
            return 1;
        }
    }
    throw(~tasknf, "No task found by that TID");
};

public method .has_blocked_tasks() {
    arg ident;
    
    return blocked_tasks.contains(ident);
};

public method .pulse() {
    var task, sub, t;
    
    // called by $sys.heartbeat
    if (caller() != $sys)
        throw(~perm, "Sender is not system");
    t = time();
    while (task_queue && (t > ((task_queue[1])[2]))) {
        task = task_queue[1];
        catch any
            (> (task[4]).as_this_run(task[4], task[6], task[8]) <);
        with
            (| ((task[4]).manager()).tell_traceback(traceback()) |);
        .remove_first_task();
    }
    
    // call sub schedulers 
    for sub in (sub_schedulers)
        (| sub.pulse() |);
};

private method .remove_first_task() {
    // sender must be an agent, or admin.
    catch any {
        .del_from_task_queue(1);
    } with {
        // $sys.log(traceback());
    }
    if (!task_queue)
        task_index = 1;
};

public method .resume() {
    arg task_id, @return_value;
    var objs;
    
    if ((objs = (| suspended_tasks[task_id] |))) {
        if ((!(sender() in objs)) && (!($sys.is_system(sender()))))
            throw(~perm, (sender() + " may not resume task ") + task_id);
        suspended_tasks = dict_del(suspended_tasks, task_id);
    }
    return (> resume(task_id, @return_value) <);
};

protected method .resume_job() {
    arg tid;
    
    (> .resume(tid) <);
};

public method .sleep() {
    arg howlong;
    
    .add_task(howlong, 'resume_job, task_id());
    .suspend(this());
};

public method .suspend() {
    arg @objs;
    
    objs += [user()];
    suspended_tasks = dict_add(suspended_tasks, task_id(), objs);
    return (> suspend() <);
};

public method .suspended_task() {
    arg task;
    
    return (> suspended_tasks[task] <);
};

public method .sys_add_task() {
    arg time, method, sender, caller, flags, @args;
    var task, i, j, tid, x, y, tmpq, task_time;
    
    // use `@info $scheduler' for more information. 
    // [tid, time, time(), sender(), caller(), method, flags, args]
    //
    if (!($sys.is_agent(sender())))
        throw(~perm, "Sender is not an agent or admin, use .add_task()");
    if ((type(time) != 'integer) || ((type(method) != 'symbol) || (type(args) != 'list)))
        throw(~type, "Arguments are not an integer, symbol, and list.");
    if (time < 1)
        throw(~time, "Time is negative.");
    if (time > 31536000)
        throw(~time, "Try to schedule a task LESS than a year from now?");
    if (!valid(sender))
        throw(~type, "The argument for sender is not a valid object");
    if (!valid(caller))
        throw(~type, "The argument for caller is not a valid object");
    if (type(flags) != 'list)
        throw(~type, "Send flags as a list of symbols");
    
    //
    ++task_index;
    tid = task_index;
    task_time = time();
    
    // flags can be set in a system only add_task, for now set them as 'system
    task = [tid, task_time + time, task_time, sender, caller, method, flags, args];
    .add_to_task_queue(task);
    return tid;
};

public method .task_info() {
    arg @args;
    
    (> .perms(sender(), 'system) <);
    return (> task_info(@args) <);
};

public method .task_queue() {
    // sender must be system, for now
    (> .perms(sender(), 'system) <);
    return task_queue;
};

public method .unblock_task() {
    arg ident;
    var tasks;
    
    // I want to be atomic!
    // The caller should have checked first, but we will fail silently.
    if (!(.has_blocked_tasks(ident)))
        return;
    
    // Get the blocked tasks queue for this identifier.
    tasks = blocked_tasks[ident];
    
    // If this is the last blocked task, then clear the queue, otherwise
    // just delete the task_id that we are resuming.
    if ((tasks.length()) == 1)
        blocked_tasks = blocked_tasks.del(ident);
    else
        blocked_tasks = blocked_tasks.add(ident, tasks.delete(1));
    
    // Wake it up and go.
    $sys.resume(tasks[1]);
};