# define INCLUDE_FILE_IO # include "dgd.h" # include "str.h" # include "array.h" # include "object.h" # include "interpret.h" # include "data.h" # include "comm.h" # include "call_out.h" # define CYCBUF_SIZE 128 /* cyclic buffer size, power of 2 */ # define CYCBUF_MASK (CYCBUF_SIZE - 1) /* cyclic buffer mask */ # define SWPERIOD 60 /* swaprate buffer size */ typedef struct { uindex handle; /* callout handle */ uindex oindex; /* index in object table */ Uint time; /* when to call */ } call_out; # define prev oindex # define next time static char co_layout[] = "uui"; typedef struct { uindex list; /* list */ uindex last; /* last in list */ } cbuf; static char cb_layout[] = "uu"; static call_out *cotab; /* callout table */ static uindex cotabsz; /* callout table size */ static uindex queuebrk; /* queue brk */ static uindex cycbrk; /* cyclic buffer brk */ static uindex flist; /* free list index */ static uindex nzero; /* # immediate callouts */ static uindex nshort; /* # short-term callouts, incl. nzero */ static cbuf c0first, c0next; /* immediate callouts */ static cbuf cycbuf[CYCBUF_SIZE]; /* cyclic buffer of callout lists */ static Uint timestamp; /* cycbuf start time */ static Uint timeout; /* time the last alarm came */ static Uint timediff; /* stored/actual time difference */ static int fragment; /* swap fragment */ static uindex swapped1[SWPERIOD]; /* swap info for last minute */ static uindex swapped5[SWPERIOD]; /* swap info for last five minutes */ static int swapidx1, swapidx5; /* index in swap info arrays */ static int incr5; /* 5 second period counter */ static uindex swaps5; /* swapout info for last 5 seconds */ static Uint swaprate1; /* swaprate per minute */ static Uint swaprate5; /* swaprate per 5 minutes */ /* * NAME: call_out->init() * DESCRIPTION: initialize callout handling */ void co_init(max, frag) unsigned int max; int frag; { if (max != 0) { cotab = ALLOC(call_out, max + 1); cotab[0].time = 0; /* sentinel for the heap */ cotab++; flist = 0; /* only if callouts are enabled */ timeout = timestamp = P_time(); timediff = 0; P_alarm(1); } c0first.list = c0next.list = 0; memset(cycbuf, '\0', sizeof(cycbuf)); cycbrk = cotabsz = max; queuebrk = 0; nzero = nshort = 0; fragment = frag; memset(swapped1, '\0', sizeof(swapped1)); memset(swapped5, '\0', sizeof(swapped5)); swapidx1 = swapidx5 = swaps5 = 0; incr5 = 5; swaprate1 = swaprate5 = 0; } /* * NAME: enqueue() * DESCRIPTION: put a callout in the queue */ static uindex enqueue(t) register Uint t; { register uindex i, j; register call_out *l; if (queuebrk == cycbrk) { error("Too many callouts"); } /* * create a free spot in the heap, and sift it upward */ i = ++queuebrk; l = cotab - 1; for (j = i >> 1; l[j].time > t; i = j, j >>= 1) { l[i] = l[j]; } /* return index of free spot */ return i - 1; } /* * NAME: dequeue() * DESCRIPTION: remove a callout from the queue */ static void dequeue(i) register uindex i; { register Uint t; register uindex j; register call_out *l; l = cotab - 1; i++; t = l[queuebrk].time; if (t < l[i].time) { /* sift upward */ for (j = i >> 1; l[j].time > t; i = j, j >>= 1) { l[i] = l[j]; } } else { /* sift downward */ for (j = i << 1; j < queuebrk; i = j, j <<= 1) { if (l[j].time > l[j + 1].time) { j++; } if (t <= l[j].time) { break; } l[i] = l[j]; } } /* put into place */ l[i] = l[queuebrk--]; } /* * NAME: newcallout() * DESCRIPTION: get a new callout for the cyclic buffer */ static uindex newcallout() { register uindex i; if (flist != 0) { /* get callout from free list */ i = flist; flist = cotab[i].next; } else { /* allocate new callout */ if (cycbrk == queuebrk || cycbrk == 1) { error("Too many callouts"); } i = --cycbrk; } nshort++; return i; } /* * NAME: freecallout() * DESCRIPTION: remove a callout from the cyclic buffer */ static void freecallout(i) register uindex i; { register call_out *l; l = &cotab[i]; l->handle = 0; /* mark as unused */ if (i == cycbrk) { /* * callout at the edge */ while (++cycbrk != cotabsz && (++l)->handle == 0) { /* followed by free callout */ if (cycbrk == flist) { /* first in the free list */ flist = l->next; } else { /* connect previous to next */ cotab[l->prev].next = l->next; if (l->next != 0) { /* connect next to previous */ cotab[l->next].prev = l->prev; } } } } else { /* add to free list */ if (flist != 0) { /* link next to current */ cotab[flist].prev = i; } /* link to next */ l->next = flist; flist = i; } --nshort; } /* * NAME: call_out->new() * DESCRIPTION: add a new callout */ uindex co_new(obj, str, delay, f, nargs) object *obj; string *str; Int delay; frame *f; int nargs; { Uint t; uindex i; register call_out *co; if (cotabsz == 0) { /* * Call_outs are disabled. Return immediately. */ return 0; } if (delay == 0) { /* * immediate callout */ i = newcallout(); if (c0next.list == 0) { /* first one in list */ c0next.list = i; } else { /* add to list */ cotab[c0next.last].next = i; } co = &cotab[c0next.last = i]; co->next = 0; nzero++; t = 0; } else { /* * delayed callout */ t = P_time(); if (t + delay < t) { error("Too long delay"); } t += delay; if (t <= timeout) { t = timeout + 1; } if (t < timestamp + CYCBUF_SIZE) { register cbuf *cyc; /* add to cyclic buffer */ i = newcallout(); cyc = &cycbuf[t & CYCBUF_MASK]; if (cyc->list == 0) { /* first one in list */ cyc->list = i; } else { /* add to list */ cotab[cyc->last].next = i; } co = &cotab[cyc->last = i]; co->next = 0; } else { /* put in queue */ i = enqueue(t); co = &cotab[i]; co->time = t; } t -= timediff; } co->handle = d_new_call_out(o_dataspace(obj), str, t, f, nargs); co->oindex = obj->index; return co->handle; } /* * NAME: rmshort() * DESCRIPTION: remove a short-term callout */ bool rmshort(cyc, i, handle) register cbuf *cyc; register uindex i, handle; { register uindex j, k; register call_out *l; k = cyc->list; if (k != 0) { /* * this time-slot is in use */ l = cotab; if (l[k].oindex == i && l[k].handle == handle) { /* first element in list */ cyc->list = l[k].next; freecallout(k); return TRUE; } if (k != cyc->last) { /* * list contains more than 1 element */ j = k; k = l[j].next; do { if (l[k].oindex == i && l[k].handle == handle) { /* found it */ if (k == cyc->last) { /* last element of the list */ l[cyc->last = j].next = 0; } else { /* connect previous to next */ l[j].next = l[k].next; } freecallout(k); return TRUE; } j = k; } while ((k=l[j].next) != 0); } } return FALSE; } /* * NAME: call_out->del() * DESCRIPTION: remove a callout */ Int co_del(obj, handle) object *obj; register unsigned int handle; { register uindex i; register call_out *l; Uint t; int nargs; /* * get the callout */ if (d_get_call_out(o_dataspace(obj), handle, &t, (frame *) NULL, &nargs) == (string *) NULL) { /* no such callout */ return -1; } i = obj->index; if (t == 0) { /* * immediate callout */ rmshort(&c0first, i, handle) || rmshort(&c0next, i, handle); --nzero; return 0; } t += timediff; if (t < timestamp + CYCBUF_SIZE) { /* * try to find the callout in the cyclic buffer */ if (rmshort(&cycbuf[t & CYCBUF_MASK], i, handle)) { return t - timeout; } } /* * Not found in the cyclic buffer; it <must> be in the queue. */ l = cotab; for (;;) { if (l->oindex == i && l->handle == handle) { dequeue(l - cotab); return t - timeout; } l++; # ifdef DEBUG if (l == cotab + queuebrk) { fatal("failed to remove callout"); } # endif } } /* * NAME: call_out->list() * DESCRIPTION: return an array with the callouts of an object */ array *co_list(data, obj) dataspace *data; object *obj; { return d_list_callouts(data, o_dataspace(obj), timeout - timediff); } /* * NAME: call_out->call() * DESCRIPTION: call expired callouts */ void co_call(f) frame *f; { register uindex i, handle; object *obj; string *str; Uint t; int nargs; if (c0first.list == 0) { c0first = c0next; c0next.list = 0; } if (c0first.list != 0) { /* * immediate callouts */ while (ec_push((ec_ftn) errhandler)) { endthread(); } while ((i=c0first.list) != 0) { c0first.list = cotab[i].next; handle = cotab[i].handle; obj = &otable[cotab[i].oindex]; freecallout(i); --nzero; str = d_get_call_out(o_dataspace(obj), handle, &t, f, &nargs); if (i_call(f, obj, str->text, str->len, TRUE, nargs)) { /* function exists */ i_del_value(f->sp++); str_del((f->sp++)->u.string); } else { /* function doesn't exist */ str_del((f->sp++)->u.string); } endthread(); } ec_pop(); } if (P_timeout()) { /* * delayed callouts */ if ((t=P_time()) <= timeout) { return; } timeout = t; while (ec_push((ec_ftn) errhandler)) { endthread(); } for (;;) { if (queuebrk > 0 && cotab[0].time <= timeout) { /* * queued callout */ handle = cotab[0].handle; obj = &otable[cotab[0].oindex]; dequeue(0); } else if (timestamp <= timeout && (i=cycbuf[timestamp & CYCBUF_MASK].list) != 0) { /* * next from cyclic buffer list */ cycbuf[timestamp & CYCBUF_MASK].list = cotab[i].next; handle = cotab[i].handle; obj = &otable[cotab[i].oindex]; freecallout(i); } else if (timestamp < timeout) { /* * check next list */ timestamp++; continue; } else { /* * no more callouts to do: * set the alarm for the next round */ if (fragment > 0) { uindex swaps1; /* * swap out a fragment of all control and data blocks */ swaps1 = d_swapout(fragment); swaprate1 += swaps1 - swapped1[swapidx1]; swapped1[swapidx1++] = swaps1; if (swapidx1 == SWPERIOD) { swapidx1 = 0; } swaps5 += swaps1; swaprate5 += swaps1; if (--incr5 == 0) { swaprate5 -= swapped5[swapidx5]; swapped5[swapidx5++] = swaps5; if (swapidx5 == SWPERIOD) { swapidx5 = 0; } swaps5 = 0; incr5 = 5; } } /* allow as much time for non-callouts as for callouts */ t = P_time(); P_alarm((t <= timeout || !comm_active()) ? 1 : t - timeout); break; } str = d_get_call_out(o_dataspace(obj), handle, &t, f, &nargs); if (i_call(f, obj, str->text, str->len, TRUE, nargs)) { /* function exists */ i_del_value(f->sp++); str_del((f->sp++)->u.string); } else { /* function doesn't exist */ str_del((f->sp++)->u.string); } endthread(); } ec_pop(); } } /* * NAME: call_out->info() * DESCRIPTION: give information about callouts */ void co_info(n1, n2) uindex *n1, *n2; { *n1 = nshort; *n2 = queuebrk; } /* * NAME: call_out->ready() * DESCRIPTION: return TRUE if callouts are ready to run */ bool co_ready() { return (nzero != 0); } /* * NAME: call_out->swaprate1 * DESCRIPTION: return the number of objects swapped out per minute */ long co_swaprate1() { return swaprate1; } /* * NAME: call_out->swaprate5 * DESCRIPTION: return the number of objects swapped out per 5 minutes */ long co_swaprate5() { return swaprate5; } typedef struct { uindex cotabsz; /* callout table size */ uindex queuebrk; /* queue brk */ uindex cycbrk; /* cyclic buffer brk */ uindex flist; /* free list index */ uindex nshort; /* # of short-term callouts */ uindex nlong0; /* # of long-term callouts and imm. callouts */ Uint timestamp; /* time the last alarm came */ Uint timediff; /* accumulated time difference */ } dump_header; static char dh_layout[] = "uuuuuuii"; /* * NAME: call_out->dump * DESCRIPTION: dump callout table */ bool co_dump(fd) int fd; { dump_header dh; register uindex list, last; register cbuf *cb; int ret; /* fill in header */ dh.cotabsz = cotabsz; dh.queuebrk = queuebrk; dh.cycbrk = cycbrk; dh.flist = flist; dh.nshort = nshort; dh.nlong0 = queuebrk + nzero; dh.timestamp = timestamp; dh.timediff = timediff; /* deal with immediate callouts */ if (nzero != 0) { if (c0first.list != 0) { list = c0first.list; if (c0next.list != 0) { cotab[c0first.last].next = c0next.list; last = c0next.last; } else { last = c0first.last; } } else { list = c0next.list; last = c0next.last; } cb = &cycbuf[timestamp & CYCBUF_MASK]; cotab[last].next = cb->list; cb->list = list; } /* write header and callouts */ ret = (write(fd, (char *) &dh, sizeof(dump_header)) > 0 && (queuebrk == 0 || write(fd, (char *) cotab, queuebrk * sizeof(call_out)) > 0) && (cycbrk == cotabsz || write(fd, (char *) (cotab + cycbrk), (cotabsz - cycbrk) * sizeof(call_out)) > 0) && write(fd, (char *) cycbuf, CYCBUF_SIZE * sizeof(cbuf)) > 0); /* fix up immediate callouts */ if (nzero != 0) { cb->list = cotab[last].next; if (c0first.list != 0) { cotab[c0first.last].next = 0; } if (c0next.list != 0) { cotab[c0next.last].next = 0; } } return ret; } /* * NAME: call_out->restore * DESCRIPTION: restore callout table */ void co_restore(fd, t) int fd; register Uint t; { register uindex i, offset, last; register call_out *co; register cbuf *cb; dump_header dh; cbuf buffer[CYCBUF_SIZE]; /* read and check header */ conf_dread(fd, (char *) &dh, dh_layout, (Uint) 1); queuebrk = dh.queuebrk; offset = cotabsz - dh.cotabsz; cycbrk = dh.cycbrk + offset; if (queuebrk > cycbrk + offset || cycbrk == 0) { error("Restored too many callouts"); } /* read tables */ conf_dread(fd, (char *) cotab, co_layout, (Uint) queuebrk); conf_dread(fd, (char *) (cotab + cycbrk), co_layout, (Uint) cotabsz - cycbrk); conf_dread(fd, (char *) buffer, cb_layout, (Uint) CYCBUF_SIZE); flist = dh.flist; nshort = dh.nshort; nzero = dh.nlong0 - dh.queuebrk; timestamp = t; t -= dh.timestamp; timediff = dh.timediff + t; /* patch callouts in queue */ for (i = queuebrk, co = cotab; i > 0; --i, co++) { co->time += t; } /* cycle around cyclic buffer */ t &= CYCBUF_MASK; memcpy(cycbuf + t, buffer, (unsigned int) (CYCBUF_SIZE - t) * sizeof(cbuf)); memcpy(cycbuf, buffer + CYCBUF_SIZE - t, (unsigned int) t * sizeof(cbuf)); if (offset != 0) { /* patch callout references */ if (flist != 0) { flist += offset; } for (i = CYCBUF_SIZE, cb = cycbuf; i > 0; --i, cb++) { if (cb->list != 0) { cb->list += offset; cb->last += offset; } } for (i = cotabsz - cycbrk, co = cotab + cycbrk; i > 0; --i, co++) { if (co->handle == 0) { co->prev += offset; } if (co->next != 0) { co->next += offset; } } } /* fix up immediate callouts */ if (nzero != 0) { cb = &cycbuf[timestamp & CYCBUF_MASK]; c0next.list = cb->list; for (i = nzero - 1, last = cb->list; i != 0; --i) { last = cotab[last].next; } cb->list = cotab[last].next; cotab[last].next = 0; } }