Commit a8854ba6 authored by Christian Wulf's avatar Christian Wulf

fixed DivideAndConquerStage

parent 659bd58c
......@@ -80,9 +80,7 @@ abstract class AbstractRunnableStage implements Runnable {
stage.getExceptionListener().reportException(e, stage);
}
if (logger.isDebugEnabled()) {
logger.debug("Finished runnable stage. ({})", stage.getId());
}
logger.debug("Finished runnable stage. ({})", stage.getId());
}
protected abstract void beforeStageExecution() throws InterruptedException;
......
......@@ -107,6 +107,7 @@ public abstract class AbstractStage implements StateLoggable {
INSTANCES_COUNTER.clear();
}
// only used by consumer stages
protected final void returnNoElement() {
// If the stage get null-element it can't be active. If it's the first time
// after being active the according time stamp is saved so that one can gather
......@@ -196,6 +197,9 @@ public abstract class AbstractStage implements StateLoggable {
return outputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
}
/**
* @threadsafe
*/
public StageState getCurrentState() {
return currentState;
}
......@@ -225,6 +229,7 @@ public abstract class AbstractStage implements StateLoggable {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
return;
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
try {
signal.trigger(this);
......@@ -236,7 +241,6 @@ public abstract class AbstractStage implements StateLoggable {
for (OutputPort<?> outputPort : outputPorts.getOpenedPorts()) {
outputPort.sendSignal(signal);
}
}
}
......@@ -264,11 +268,11 @@ public abstract class AbstractStage implements StateLoggable {
final boolean signalAlreadyReceived = this.triggeredSignalTypes.contains(signal.getClass());
if (signalAlreadyReceived) {
if (logger.isTraceEnabled()) {
logger.trace("Got signal again: " + signal + " from input port: " + inputPort);
logger.trace("Got signal again: {} from input port: {}", signal, inputPort);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " from input port: " + inputPort);
logger.trace("Got signal: {} from input port: {}", signal, inputPort);
}
this.triggeredSignalTypes.add(signal.getClass());
}
......@@ -471,7 +475,7 @@ public abstract class AbstractStage implements StateLoggable {
protected void abort() {
this.terminateStage();
this.getOwningThread().interrupt();
};
}
protected boolean shouldBeTerminated() {
return (getCurrentState() == StageState.TERMINATING);
......@@ -509,22 +513,6 @@ public abstract class AbstractStage implements StateLoggable {
inputPorts.addPortRemovedListener(inputPortRemovedListener);
}
private String getSimpleClassName() {
String simpleName = this.getClass().getSimpleName();
if (simpleName.isEmpty()) {
simpleName = this.getClass().getSuperclass().getSimpleName();
}
return simpleName;
}
protected int getInstanceCount() {
String simpleClassName = getSimpleClassName();
Integer numInstances = INSTANCES_COUNTER.get(simpleClassName);
if (null == numInstances) {
numInstances = 0;
}
return numInstances;
}
//
// /**
// * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors.
......
......@@ -15,6 +15,8 @@
*/
package teetime.framework;
import java.util.concurrent.atomic.AtomicInteger;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.IntObjectMap;
......@@ -37,12 +39,9 @@ import teetime.framework.signal.TerminatingSignal;
*/
public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P, S>, S extends AbstractDivideAndConquerSolution<S>> extends AbstractStage {
private final AtomicInteger numInstances;
private int threshold;
private boolean firstExecution;
private int problemsReceived;
private int solutionsSent;
private boolean signalsSent;
private final IntObjectMap<S> solutionBuffer = new IntObjectHashMap<S>();
......@@ -59,12 +58,53 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
*
*/
public DivideAndConquerStage() {
this(new AtomicInteger(0), Runtime.getRuntime().availableProcessors() - 1);
}
/**
*
* @param numInstances
* shared atomic counter
* @param threshold
* positive number indicated the maximal number of threads
*/
DivideAndConquerStage(final AtomicInteger numInstances, final int threshold) {
new DivideAndConquerRecursivePipe<P, S>(this.leftOutputPort, this.leftInputPort);
new DivideAndConquerRecursivePipe<P, S>(this.rightOutputPort, this.rightInputPort);
this.threshold = Runtime.getRuntime().availableProcessors();
this.numInstances = numInstances;
// threshold should be odd: 1 for the root and each 2 for the divided instances
this.threshold = (threshold % 2 != 0) ? threshold : threshold - 1; // NOPMD (reasonable use of the ternary operator)
this.firstExecution = true;
this.solutionsSent = 0;
this.problemsReceived = 0;
numInstances.incrementAndGet();
logger.debug("New number of instances: {}", numInstances.get());
}
@Override
protected void execute() {
boolean receivedLeftSolution = checkForSolutions(leftInputPort);
boolean receivedRightSolution = checkForSolutions(rightInputPort);
boolean receivedNewProblem = checkForProblems(inputPort);
// logger.trace("Received: {} {} {}", receivedLeftSolution, receivedRightSolution, receivedNewProblem);
// logger.trace("Closed input ports: {} {} {}", leftInputPort.isClosed(), rightInputPort.isClosed(), inputPort.isClosed());
if (!(receivedLeftSolution || receivedRightSolution || receivedNewProblem)) {
if (inputPort.isClosed()) { // check explicitly when this stage is active
// logger.debug("left: {}, right: {}", leftInputPort.isClosed(), rightInputPort.isClosed());
leftOutputPort.getPipe().close();
rightOutputPort.getPipe().close();
}
returnNoElement();
}
}
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (signal instanceof TerminatingSignal && inputPort == this.inputPort) {
leftOutputPort.getPipe().close();
rightOutputPort.getPipe().close();
}
super.onSignal(signal, inputPort);
}
/**
......@@ -119,33 +159,6 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
return this.rightOutputPort;
}
@Override
protected void execute() {
checkForSolutions(leftInputPort);
checkForSolutions(rightInputPort);
checkForProblems(inputPort);
checkForTermination();
}
/**
* Checks whether or not to terminate this stage and all child stages.
* The stage will termintate if there is no more input to process, a <code>TerminatingSignal</code> has been received and all child stages are terminated.
*/
private void checkForTermination() {
if (this.inputPort.isClosed() && solutionsSent > 0) { // no more input, time to terminate child stages
if (!signalsSent && problemsReceived == solutionsSent) {
this.getleftOutputPort().sendSignal(new TerminatingSignal());// send signal to terminate child stages first
this.getrightOutputPort().sendSignal(new TerminatingSignal());
this.signalsSent = true;
}
if (this.leftInputPort.isClosed() && this.rightInputPort.isClosed()) {// all child stages terminated
final ISignal signal = new TerminatingSignal();
this.getOutputPort().sendSignal(signal); // terminate stages following the DC stage
this.returnNoElement(); // terminate this stage
}
}
}
/**
* Receives and processes incoming solutions to combine or send to the next stage.
*
......@@ -153,22 +166,21 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
* The <code>InputPort</code> to receive solutions from.
*
* @return
* <code>true</code> if there was input to receive, <code>false</code> otherwise
* <code>true</code> if there was input to receive, <code>false</code> otherwise
*/
private boolean checkForSolutions(final InputPort<S> port) {
S solution = port.receive();
final S solution = port.receive();
if (solution != null) {
int solutionID = solution.getID();
if (isInBuffer(solutionID)) {
S bufferedSolution = getSolutionFromBuffer(solutionID);
S combinedSolution = solution.combine(bufferedSolution);
outputPort.send(combinedSolution);
this.solutionsSent++;
} else {
addToBuffer(solutionID, solution);
}
}
return solution == null;
return solution != null;
}
private S getSolutionFromBuffer(final int solutionID) {
......@@ -192,16 +204,14 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
* The <code>InputPort</code> to receive problems from.
*
* @return
* <code>true</code> if there was input to receive, <code>false</code> otherwise
* <code>true</code> if there was input to receive, <code>false</code> otherwise
*/
private boolean checkForProblems(final InputPort<P> port) {
P problem = port.receive();
final P problem = port.receive();
if (problem != null) {
this.problemsReceived++;
if (problem.isBaseCase()) {
S solution = problem.baseSolve();
this.getOutputPort().send(solution);
this.solutionsSent++;
} else {
if (firstExecution) {
createCopies();
......@@ -211,38 +221,27 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
this.getrightOutputPort().send(dividedProblem.rightProblem); // second recursive call
}
}
return problem == null;
return problem != null;
}
/**
* A method to add a new copy (new instance) of this stage to the configuration, which should be executed in a own thread.
*
*/
private void createCopies() {
DivideAndConquerStageFactory.getInstance().makeCopy(leftOutputPort, leftInputPort, this);
DivideAndConquerStageFactory.getInstance().makeCopy(rightOutputPort, rightInputPort, this);
if (this.inputPort.isClosed()) {
this.leftOutputPort.sendSignal(new TerminatingSignal());
this.rightOutputPort.sendSignal(new TerminatingSignal());
}
this.firstExecution = false;
}
protected boolean isThresholdReached() {
return this.threshold - this.getInstanceCount() <= 0;
return threshold - numInstances.get() <= 0;
}
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort) && !(signal instanceof TerminatingSignal)) {
try {
signal.trigger(this);
} catch (Exception e) {
this.getOwningContext().abortConfigurationRun();
}
for (OutputPort<?> outputPort : getOutputPorts()) {
outputPort.sendSignal(signal);
}
}
AtomicInteger getNumInstances() { // NOPMD (package-private)
return numInstances;
}
int getThreshold() { // NOPMD (package-private)
return threshold;
}
}
......@@ -27,10 +27,6 @@ import teetime.framework.signal.StartingSignal;
*/
public class DivideAndConquerStageFactory {
private enum parallelismMode {
ALL, DIVIDE, COMBINE
};
private DivideAndConquerStageFactory() {}
/**
......@@ -71,7 +67,7 @@ public class DivideAndConquerStageFactory {
if (callingStage.isThresholdReached()) {
new DivideAndConquerRecursivePipe<P, S>(outputPort, inputPort);
} else {
DivideAndConquerStage<P, S> newStage = new DivideAndConquerStage<P, S>();
DivideAndConquerStage<P, S> newStage = new DivideAndConquerStage<P, S>(callingStage.getNumInstances(), callingStage.getThreshold());
DynamicConfigurationContext.INSTANCE.connectPorts(outputPort, newStage.getInputPort());
DynamicConfigurationContext.INSTANCE.connectPorts(newStage.getOutputPort(), inputPort);
outputPort.sendSignal(new StartingSignal());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment