package com.planet_ink.coffee_mud.core.threads; import java.util.Collection; import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import com.planet_ink.coffee_mud.Common.interfaces.Session; import com.planet_ink.coffee_mud.core.CMLib; import com.planet_ink.coffee_mud.core.Log; import com.planet_ink.coffee_mud.core.collections.Pair; import com.planet_ink.coffee_mud.core.collections.PairSVector; import com.planet_ink.coffee_mud.core.collections.SLinkedList; import com.planet_ink.coffee_mud.core.collections.STreeMap; import com.planet_ink.coffee_mud.core.collections.STreeSet; import com.planet_ink.coffee_mud.core.collections.UniqueEntryBlockingQueue; import com.planet_ink.coffee_mud.core.interfaces.TickableGroup; /* Copyright 2010-2019 Bo Zimmerman Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ public class CMThreadPoolExecutor extends ThreadPoolExecutor { protected Map<Runnable,Thread> active = new HashMap<Runnable,Thread>(); protected long timeoutMillis; protected CMThreadFactory threadFactory; protected int queueSize = 0; protected String poolName = "Pool"; protected volatile long lastRejectTime = 0; protected volatile int rejectCount = 0; protected static class CMArrayBlockingQueue<E> extends ArrayBlockingQueue<E>{ private static final long serialVersionUID = -4557809818979881831L; public CMThreadPoolExecutor executor = null; public CMArrayBlockingQueue(final int capacity) { super(capacity); } @Override public boolean offer(final E o) { final int allWorkingThreads = executor.getActiveCount() + super.size(); return (allWorkingThreads < executor.getPoolSize()) && super.offer(o); } } public CMThreadPoolExecutor(final String poolName, final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final long timeoutMins, final int queueSize) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new CMArrayBlockingQueue<Runnable>(queueSize)); ((CMArrayBlockingQueue<Runnable>)this.getQueue()).executor=this; timeoutMillis=timeoutMins * 60 * 1000; this.poolName=poolName; threadFactory=new CMThreadFactory(poolName); setThreadFactory(threadFactory); this.queueSize=queueSize; setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (final InterruptedException e) { throw new RejectedExecutionException(e); } } }); } @Override protected void beforeExecute(final Thread t, final Runnable r) { synchronized(active) { if(t instanceof CMFactoryThread) ((CMFactoryThread)t).setRunnable(r); active.put(r,t); } } @Override protected void afterExecute(final Runnable r, final Throwable t) { synchronized(active) { final Thread th=active.get(r); if(th instanceof CMFactoryThread) ((CMFactoryThread)th).setRunnable(null); active.remove(r); } } @Override public int getActiveCount() { return active.size(); } public boolean isActive(final Runnable r) { return active.containsKey(r); } public boolean isActiveOrQueued(final Runnable r) { return active.containsKey(r) || getQueue().contains(r); } @Override public void execute(final Runnable r) { try { if(this.getQueue().contains(r)) return; super.execute(r); if((rejectCount>0)&&(System.currentTimeMillis()-lastRejectTime)>5000) { Log.warnOut(rejectCount+" Pool_"+poolName,"Threads rejected."); rejectCount=0; } } catch(final RejectedExecutionException e) { if(r instanceof CMRunnable) { final Collection<CMRunnable> runsKilled = getTimeoutOutRuns(1); for(final CMRunnable runnable : runsKilled) { if(runnable instanceof Session) { final Session S=(Session)runnable; final StringBuilder sessionInfo=new StringBuilder(""); sessionInfo.append("status="+S.getStatus()+" "); sessionInfo.append("active="+S.activeTimeMillis()+" "); sessionInfo.append("online="+S.getMillisOnline()+" "); sessionInfo.append("lastloop="+(System.currentTimeMillis()-S.getInputLoopTime())+" "); sessionInfo.append("addr="+S.getAddress()+" "); sessionInfo.append("mob="+((S.mob()==null)?"null":S.mob().Name())); Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+sessionInfo.toString()); } else if(runnable instanceof TickableGroup) { final TickableGroup G=(TickableGroup)runnable; Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+G.getName()+"-"+G.getStatus()+"\n\r"); } else Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+runnable.toString()); } } lastRejectTime=System.currentTimeMillis(); rejectCount++; } } public Collection<CMRunnable> getTimeoutOutRuns(final int maxToKill) { final LinkedList<CMRunnable> timedOut=new LinkedList<CMRunnable>(); if(timeoutMillis<=0) return timedOut; final LinkedList<Thread> killedOut=new LinkedList<Thread>(); synchronized(active) { try { for (final Runnable runnable : active.keySet()) { if(runnable instanceof CMRunnable) { final CMRunnable cmRunnable=(CMRunnable)runnable; final Thread thread=active.get(runnable); if(cmRunnable.activeTimeMillis() > timeoutMillis) { if(timedOut.size() >= maxToKill) { CMRunnable leastWorstOffender=null; for(final CMRunnable r : timedOut) { if((leastWorstOffender != null) &&(r.activeTimeMillis() < leastWorstOffender.activeTimeMillis())) leastWorstOffender=r; } if(leastWorstOffender!=null) { if(cmRunnable.activeTimeMillis() < leastWorstOffender.activeTimeMillis()) continue; else timedOut.remove(leastWorstOffender); } } timedOut.add(cmRunnable); killedOut.add(thread); } } } } catch(final Exception e) { } } try { while(killedOut.size()>0) { final Thread t = killedOut.remove(); active.remove(t); CMLib.killThread(t,100,3); } } catch(final Exception e) { } return timedOut; } }