/* ** j###t ########## #### #### ** j###t ########## #### #### ** j###T "###L J###" ** ######P' ########## ######### ** ######k, ########## T######T ** ####~###L #### ** #### q###L ########## .##### ** #### \###L ########## #####" */ package key; import key.util.Bits; import key.util.LinkedList; import java.net.*; import java.io.*; import java.util.*; import java.lang.*; public final class Registry implements Externalizable { private static final long serialVersionUID = 8124646714892406403L; public static final int FILES_PER_DIRECTORY = 100; public static final int INITIAL_SIZE = 1000; public static final int CAPACITY_INCREMENT = 500; // if latency sleep time is 5 minutes, a value // of 10 here will checkpoint atoms every 5m * 10 = // 50 minutes, or once an hour. public static final int CHECKPOINT_EVERY_N_LATENCY_TICKS = 10; // this field initialised by Key during startup public static Registry instance; protected static File baseDirectory; protected transient Object[] elementData; protected int[] timestamps; //protected java.lang.ref.WeakReference[] onlineReferences; //protected long elementFlags[]; protected int highestNewId; protected int timestampUpto; /** * Free indexes in the registry. A bit is set to 1 if * that index is not being used. (Therefore, we initially * assume that every index is being used) */ protected transient Bits freeSpace; public Registry() { init(); freeSpace = new Bits( INITIAL_SIZE ); } public String statistics() { StringBuffer sb = new StringBuffer(); sb.append( "Registry\n\n" ); sb.append( "current timestamp: " ); sb.append( timestampUpto ); sb.append( "\n\n" ); sb.append( highestNewId ); sb.append( " allocated slots, " ); sb.append( (elementData.length-highestNewId) ); sb.append( " slots remaining.\n\n" ); sb.append( "freespace: " ); sb.append( freeSpace.toString() ); sb.append( "\n\n" ); return( sb.toString() ); } public void readExternal( ObjectInput from ) throws IOException { try { int count = from.readInt(); timestampUpto = from.readInt(); ensureCapacity( count ); highestNewId = count; for( int i = 0; i < count; i++ ) timestamps[i] = from.readInt(); for( int i = 0; i < count; i++ ) { try { Atom a = (Atom) from.readObject(); if( a != null ) { // if you're getting this exception a lot and need to recover // the db, just take out this check, and the system will still // load, even if the references might be -everywhere-. if( a.index != i ) throw new UnexpectedResult( "database corrupted" ); setElementAt( a, i ); //System.out.println( "coreload #" + a.index + ": " + a.getName() ); } } catch( ClassNotFoundException e ) { Log.error( "Registry", e ); } } freeSpace = (Bits) from.readObject(); } catch( ClassNotFoundException e ) { throw new UnexpectedResult( e.toString() ); } } public void writeExternal( ObjectOutput to ) throws IOException { Bits bs = (Bits) freeSpace.clone(); to.writeInt( highestNewId ); to.writeInt( timestampUpto ); for( int i = 0; i < highestNewId; i++ ) { // if this is a temporary atom, we should // clear the timestamp in the saved file. // this way might be inefficient - we could // clone this array, preprocess the objects // below, and write it out last, instead. if( elementData[ i ] instanceof TemporaryAtom ) to.writeInt( -1 ); else to.writeInt( timestamps[i] ); } for( int i = 0; i < highestNewId; i++ ) { Object o = elementData[ i ]; if( o instanceof Atom ) to.writeObject( o ); else { // we don't write anything about distinct atoms to.writeObject( null ); if( o instanceof TemporaryAtom ) { // mark this as free space in the saved file bs.set( i ); } } } to.writeObject( bs ); } private void init() { elementData = new Object[ INITIAL_SIZE ]; timestamps = new int[ INITIAL_SIZE ]; //onlineReferences = new java.lang.ref.WeakReference[ INITIAL_SIZE ]; highestNewId = 0; timestampUpto = 0; if( instance == null ) instance = this; else throw new UnexpectedResult( "Attempting to create an additional Registry" ); } private void ensureCapacity( int minCapacity ) { if( elementData == null ) init(); int oldCapacity = elementData.length; if( minCapacity < oldCapacity ) return; Object oldData[] = elementData; int oldTS[] = timestamps; int newCapacity = oldCapacity + CAPACITY_INCREMENT; if( newCapacity < minCapacity ) { newCapacity = minCapacity; } elementData = new Object[ newCapacity ]; timestamps = new int[ newCapacity ]; if( highestNewId > 0 ) { System.arraycopy( oldData, 0, elementData, 0, (highestNewId-1) ); System.arraycopy( oldTS, 0, timestamps, 0, (highestNewId-1) ); } } /** ensure that the timestamp is valid before calling this routine */ private final Object elementAt( int index ) { if( index >= highestNewId ) { throw new ArrayIndexOutOfBoundsException(index + " >= " + highestNewId); } return elementData[ index ]; } //public Reference getReplacementFor( Reference r ) //{ // //} private final void setElementAt( Object obj, int index ) { if( index >= highestNewId ) { throw new UnexpectedResult( "index >= highestNewId in registry" ); /* ensureCapacity( index ); highestNewId = index+1; */ } elementData[index] = obj; freeSpace.clear( index ); } private final int getNewIndex() { // scan to see if we have any free if( freeSpace != null ) { int lowestFree = freeSpace.firstSet(); if( lowestFree != Integer.MAX_VALUE ) { if( freeSpace.get( lowestFree ) ) { freeSpace.clear( lowestFree ); //System.out.println( "REGISTRY: re-using slot #" + lowestFree + " (old ts=" + timestamps[lowestFree] ); return( lowestFree ); } else throw new UnexpectedResult( "firstSet not behaving correctly in Bits" ); } //else //System.out.println( "REGISTRY: freeSpace.firstSet() couldn't find any free space" ); } //else //System.out.println( "REGISTRY: freeSpace is null" ); //System.out.println( "REGISTRY: allocating new id #" + highestNewId ); int s = highestNewId; highestNewId++; if( highestNewId > elementData.length ) ensureCapacity( highestNewId ); return( s ); } /** * for newly constructed atoms, generates a new index for them, * sets it on the atom, sets it in the registry, and returns it. */ synchronized int allocateNewIndex( Atom a ) { int newIdx = getNewIndex(); int ts = timestampUpto++; elementData[ newIdx ] = a; timestamps[ newIdx ] = ts; a.setIndex( newIdx, ts ); return( newIdx ); } synchronized int allocateTemporaryIndex( Atom a ) { int newIdx = getNewIndex(); int ts = timestampUpto++; elementData[ newIdx ] = new TemporaryAtom( a ); timestamps[ newIdx ] = ts; a.setIndex( newIdx, ts ); return( newIdx ); } synchronized void upgradeTemporaryIndex( int index, int ts ) { ensureValidReference( index, ts ); Object o = elementData[ index ]; if( o instanceof TemporaryAtom ) { TemporaryAtom ta = (TemporaryAtom) o; elementData[ index ] = ta.actual; } } /** * Makes this Atom temporary (if it is in the main db at the moment) */ void makeTemporary( Atom a ) { int index = a.index; int ts = a.timestamp; Object n = elementData[ index ]; if( n instanceof TemporaryAtom ) return; elementData[ index ] = new TemporaryAtom( a ); if( n == null || n instanceof LoadedAtom ) deleteFileFor( index ); if( n instanceof LoadedAtom ) { LoadedAtom la = (LoadedAtom) n; Key.getLatencyCache().removeFromCache( la ); la.actual = null; } } void deleteIfTemporary( Atom a ) { int index = a.index; int ts = a.timestamp; if( isReferenceValid( index, ts ) ) { Object n = elementData[ index ]; if( n instanceof TemporaryAtom ) { if( a == ((TemporaryAtom)n).actual ) { delete( a ); } else throw new UnexpectedResult( "deleteIfTemporary found unmatching index/timestamp -> atom pairs" ); } } } /** * Used to write a loaded atom to disk (actually write it * now, as opposed to waiting for it to be swapped out) */ synchronized void write( int index, int ts ) { ensureValidReference( index, ts ); Object o = elementAt( index ); if( o instanceof LoadedAtom ) { writeLoadedAtom( (LoadedAtom) o ); System.out.println( "write forced for #" + ((LoadedAtom)o).actual.toString() ); } } void syncDistinct( int index, int ts ) { synchronized( Key.getLatencyCache() ) { synchronized( this ) { ensureValidReference( index, ts ); Object o = elementAt( index ); LoadedAtom la = null; if( o instanceof LoadedAtom ) { la = ((LoadedAtom)o); } else { Atom a = null; if( o instanceof Atom ) a = (Atom)o; else if( o instanceof TemporaryAtom ) a = ((TemporaryAtom)o).actual; if( a == null ) return; // build a new loaded atom and write it in... la = buildLoadedAtom( a, index ); // now, if we sync something distinctly, we should // also sync all of it's part atoms distinctly (otherwise // we're storing part of the atom in the main database) Factory.distinctSyncFields( a ); // if we're a container, sync children if( a instanceof Container ) { Container c = (Container) a; for( Enumeration e = c.elements(); e.hasMoreElements(); ) { Atom child = (Atom) e.nextElement(); syncDistinct( child.index, child.timestamp ); } } } // now write the loaded atom // writeLoadedAtom( la ); } } } /** * All Atom's should call this when they are loaded from disk */ synchronized void registerIndex( Atom a, int idx, int ts ) { ensureValidReference( idx, ts ); if( idx > elementData.length ) { throw new UnexpectedResult( "index >= elementData.length in registry" ); /* highestNewId = idx+1; ensureCapacity( highestNewId ); */ } elementData[ idx ] = a; } public final void ensureValidReference( int index, int ts ) { try { if( timestamps[ index ] != ts ) throw new OutOfDateReferenceException( "index " + index + " has a timestamp of " + timestamps[ index ] + ", not " + ts ); } catch( IndexOutOfBoundsException e ) { throw new OutOfDateReferenceException( "index " + index + " is out of bounds" ); } } public final boolean isReferenceValid( int index, int ts ) { return( timestamps[ index ] == ts ); } private final void clearIndex( int index ) { freeSpace.set( index ); elementData[ index ] = null; timestamps[ index ] = -1; } /** * for when we're deleting an atom forever - removes it from the * registry. Call a.dispose(), not registry.delete() to delete * an object. */ void delete( Atom a ) { synchronized( Key.getLatencyCache() ) { synchronized( this ) { // already been deleted if( !isReferenceValid( a.index, a.timestamp ) ) { System.out.println( a.getName() + " has already been deleted" ); return; } // check if locked after decrement if( !a.willbeLockedAfterDecrement() ) { //System.out.println( "Deleting atom " + a.toString() ); // clean up transient: // - take out of scapes & notify lists // - disconnect if player a.clearTransient(); LoadedAtom la = null; int index = a.index; { Object o = null; o = elementData[ a.index ]; if( o instanceof LoadedAtom ) { la = (LoadedAtom) o; // remove it from the latent cache Key.getLatencyCache().removeFromCache( la ); } } // make the atom unavailable if( index < elementData.length ) clearIndex( index ); // decrement implicit reference counts // important for the next step! a.prepareForSwap(); // delete fields if they're atomic, delete // contents if it is a non-reference container a.delete(); // delete distinct file if( la != null ) deleteFileFor( index ); } else { throw new UnexpectedResult( "Could not delete " + a.getKey() + ": Atom is implicitly locked" ); } }} } private void deleteFileFor( int index ) { File f = getFileFor( index ); f.delete(); //Log.debug( this, "REGISTRY: Deleted distinct file " + f.toString() ); } /** * used to swap an atom out of memory. doesn't need * to remove the item from the latent cache, since * the LC itself handles this when swapout is called, * which is only ever from deallocate() in LoadedAtom. */ void swapout( LoadedAtom la ) { if( la.actual == null ) return; synchronized( Key.getLatencyCache() ) { synchronized( this ) { Atom a = la.actual; boolean notrunning = !Key.isRunning(); //System.out.println( "trying to swap out " + a.getKey() ); if( a.canSwap() ) { // check if locked after decrement if( !a.willbeLockedAfterDecrement() ) { //System.out.println( "Swapping out atom " + a.toString() ); // clean up transient: // - take out of scapes & notify lists // - disconnect if player a.clearTransient(); // decrement implicit reference counts a.prepareForSwap(); // sync writeLoadedAtom( la ); // make the atom unavailable if( la.index < elementData.length ) elementData[ la.index ] = null; // clear all references to it la.actual = null; a = null; } } if( a != null && notrunning ) { // sync writeLoadedAtom( la ); } }} } /** * Attempts to reduce the amount of memory in use by * swapping out all loaded atoms (that can be swapped) * to disk. * <BR> * <B>For use by players with the beyond bit only.</B> */ public void compress() { Player p = Player.getCurrent(); if( p != null ) { if( !p.isBeyond() ) throw new AccessViolationException( this, "You may not attempt to compress the registry." ); } LatentCache lc = Key.getLatencyCache(); int c = 0; synchronized( lc ) { synchronized( this ) { for( int i = 0; i < elementData.length; i++ ) { Object o = elementData[i]; if( o instanceof LoadedAtom ) { //lc.resetModify(); //lc.interrupt(); lc.deallocateNow( (LatentlyCached) o ); c++; } } } } /* System.out.println( "registry compress: " + c + " items deallocated. " + lc.countEntries() + " latency entries" ); int atoms, containers, scapes, players, lists; atoms = Atom.getTotalAtoms(); containers = Container.getTotalContainers(); atoms -= containers; scapes = Scape.getTotalScapes(); containers -= scapes; players = Player.getTotalPlayers(); containers -= players; lists = LinkedList.getTotalLists(); System.out.println( "Totals: " + atoms + " atoms, " + containers + " containers, " + scapes + " scapes, " + players + " players, " + lists + " lists" ); */ } void swapout( Atom a ) { LatentCache lc = Key.getLatencyCache(); synchronized( lc ) { synchronized( this ) { Object o = elementData[ a.index ]; if( o instanceof LoadedAtom ) { //lc.resetModify(); //lc.interrupt(); lc.deallocateNow( (LatentlyCached) o ); } } } } void writeLoadedAtom( LoadedAtom la ) { Factory.storeObject( la.actual, getFileFor( la.index ), true ); //System.out.println( "Wrote #" + la.index ); key.sql.Storage.storeObject( la.actual ); } LoadedAtom buildLoadedAtom( Atom a, int idx ) { LoadedAtom la = new LoadedAtom( idx, a ); Key.getLatencyCache().addToCache( la ); setElementAt( la, idx ); return( la ); } public static final int STORAGE_UNLOADED = 0; public static final int STORAGE_DB = 1; public static final int STORAGE_LOADED = 2; public static final int STORAGE_TEMPORARY = 3; public static final int STORAGE_UNKNOWN = 3; int getStorageTypeIndex( int index, int ts ) { ensureValidReference( index, ts ); Object o = elementAt( index ); if( o == null ) return( STORAGE_UNLOADED ); else if( o instanceof Atom ) return( STORAGE_DB ); else if( o instanceof LoadedAtom ) return( STORAGE_LOADED ); else if( o instanceof TemporaryAtom ) return( STORAGE_TEMPORARY ); else return( STORAGE_UNKNOWN ); } String getStorageType( int index, int ts ) { ensureValidReference( index, ts ); Object o = elementAt( index ); if( o == null ) return( "Not loaded (Distinct)" ); else if( o instanceof Atom ) return( "Database" ); else if( o instanceof LoadedAtom ) return( "Distinct" ); else if( o instanceof TemporaryAtom ) return( "Temporary" ); else return( "Unknown" ); } /** * Returns the atom iff it is in memory already */ Atom getIfInDatabase( int index, int ts ) { ensureValidReference( index, ts ); Object o = elementAt( index ); if( o instanceof Atom ) return( (Atom) o ); else if( o instanceof LoadedAtom ) return( ((LoadedAtom)o).actual ); else return null; } /** * A special 'get' routine that is used by Search.java in order * to match user entered id #'s. */ synchronized Atom get( int idx ) { if( idx >= 0 && idx < elementData.length ) { int ts = timestamps[ idx ]; if( ts != -1 ) return( get( idx, ts ) ); } return( null ); } /** * retrieves this atom from disk */ public synchronized Atom get( int idx, int timestamp ) { ensureValidReference( idx, timestamp ); /* * - throw an exception, don't return null if( !isReferenceValid( idx, timestamp ) ) return null; */ Object o = elementAt( idx ); if( o instanceof Atom ) return( (Atom)o ); else if( o instanceof LoadedAtom ) { LoadedAtom la = (LoadedAtom) o; la.active = true; return( la.actual ); } else if( o instanceof TemporaryAtom ) { return( ((TemporaryAtom)o).actual ); } else { Atom a; File f = getFileFor( idx ); //System.out.println( "--- Loading #" + idx ); // a read atom will call 'makeTemporaryAvailable' // during this call, which is a little bit of overhead, // but not drastic. we limit it by allowTemporaryAvailability = true; a = (Atom) Factory.loadObject( f ); allowTemporaryAvailability = false; if( a == null ) { System.out.println( "REGISTRY: Could not load #" + idx + ", clearing..." ); Log.debug( this, "REGISTRY: Could not load #" + idx + ", clearing..." ); clearIndex( idx ); return( null ); } LoadedAtom la = buildLoadedAtom( a, idx ); //System.out.println( "Loaded #" + idx + " (" + a.getKey() + ")" ); return( a ); } } /** * This code is called from Atom.readObject() in order * to make this atom available to the registry as soon * as possible. This is necessary when circular * stored implicit references occur. See the comments * in KeyInputStream.resolveObject(), as well as Atom.readObject() * also. * * This method must be called only after a.timestamp and a.index * have been correctly loaded. */ synchronized void makeTemporarilyAvailable( Atom a ) { if( allowTemporaryAvailability ) { ensureValidReference( a.index, a.timestamp ); setElementAt( new TemporaryAtom( a ), a.index ); } } private transient boolean allowTemporaryAvailability; private File getFileFor( int index ) { File directory = new File( baseDirectory, Integer.toString( index / 150 ) ); if( !directory.exists() ) directory.mkdir(); return( new File( directory, Integer.toString( index ) ) ); } boolean indexLoaded( int idx, int timestamp ) { if( !isReferenceValid( idx, timestamp ) ) return( false ); if( idx >= highestNewId ) return( false ); else return( elementData[ idx ] != null ); } final class TemporaryAtom { Atom actual; public TemporaryAtom( Atom a ) { actual = a; } } final class LoadedAtom implements LatentlyCached { int index; boolean active; Atom actual; int sinceSyncCount = 0; public LoadedAtom( int idx, Atom a ) { index = idx; actual = a; active = true; } public void deallocate() { // this call will set 'actual' to true Registry.instance.swapout( this ); } public boolean modified() { //System.out.println( "Registry: " + actual.getName() + " active=" + active + " canSwap=" + actual.canSwap() + " willbeLocked=" + actual.willbeLockedAfterDecrement() ); // the shortcutting here prevents the expensive willbe operation // from being called every-time: it only happens when it is needed if( actual == null ) return( false ); else return( active || !actual.canSwap() || actual.willbeLockedAfterDecrement() ); } public void resetModify() { active = false; sinceSyncCount++; if( sinceSyncCount > CHECKPOINT_EVERY_N_LATENCY_TICKS ) { sinceSyncCount = 0; //Log.debug( "Registry", "checkpointing " + actual.getId() ); Registry.instance.writeLoadedAtom( this ); } } public String toString() { return( "LoadedAtom #" + index + " (" + actual.getKey() + ")" ); } } }