Commit e60fd647 authored by Christian Wulf's avatar Christian Wulf

refactored DivideAndConquerStage

parent a8854ba6
......@@ -198,7 +198,7 @@ public abstract class AbstractStage implements StateLoggable {
}
/**
* @threadsafe
* <i>This method is threadsafe.</i>
*/
public StageState getCurrentState() {
return currentState;
......
......@@ -154,6 +154,9 @@ class DivideAndConquerRecursivePipe<P extends AbstractDivideAndConquerProblem<P,
S secondSolution = solve(dividedProblem.rightProblem); // recursive call
solution = firstSolution.combine(secondSolution);
}
// solution = (S) ((QuicksortProblem) problem).solveDirectly();
return solution;
}
}
......@@ -24,12 +24,13 @@ import teetime.framework.divideandconquer.AbstractDivideAndConquerProblem;
import teetime.framework.divideandconquer.AbstractDivideAndConquerSolution;
import teetime.framework.divideandconquer.DividedDCProblem;
import teetime.framework.signal.ISignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
/**
* A stage to solve divide and conquer problems
*
* @author Robin Mohr
* @author Robin Mohr, Christian Wulf
*
* @param <P>
* type of elements that represent a problem to be solved.
......@@ -39,8 +40,10 @@ import teetime.framework.signal.TerminatingSignal;
*/
public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P, S>, S extends AbstractDivideAndConquerSolution<S>> extends AbstractStage {
private final AtomicInteger numInstances;
private int threshold;
private static final int DEFAULT_THRESHOLD = Runtime.getRuntime().availableProcessors();
private final AtomicInteger numCopiedInstances;
private final int threshold;
private boolean firstExecution;
private final IntObjectMap<S> solutionBuffer = new IntObjectHashMap<S>();
......@@ -54,30 +57,37 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
private final OutputPort<P> rightOutputPort = this.createOutputPort();
/**
* Creates a new divide and conquer stage and connects the additional in- and output ports with {@link teetime.framework.DivideAndConquerRecursivePipe}.
*
* Creates a new divide and conquer stage and connects the additional in- and output ports with {@link teetime.framework.DivideAndConquerRecursivePipe} with a
* default threshold of {@code Runtime.getRuntime().availableProcessors()}.
*/
public DivideAndConquerStage() {
this(new AtomicInteger(0), Runtime.getRuntime().availableProcessors() - 1);
this(DEFAULT_THRESHOLD);
}
/**
* Creates a new divide and conquer stage and connects the additional in- and output ports with {@link teetime.framework.DivideAndConquerRecursivePipe}.
*/
public DivideAndConquerStage(final int threshold) {
this(new AtomicInteger(-1), threshold);
}
/**
*
* @param numInstances
* @param numCopiedInstances
* shared atomic counter
* @param threshold
* positive number indicated the maximal number of threads
*/
DivideAndConquerStage(final AtomicInteger numInstances, final int threshold) {
DivideAndConquerStage(final AtomicInteger numCopiedInstances, final int threshold) {
new DivideAndConquerRecursivePipe<P, S>(this.leftOutputPort, this.leftInputPort);
new DivideAndConquerRecursivePipe<P, S>(this.rightOutputPort, this.rightInputPort);
this.numInstances = numInstances;
this.numCopiedInstances = numCopiedInstances;
// threshold should be odd: 1 for the root and each 2 for the divided instances
this.threshold = (threshold % 2 != 0) ? threshold : threshold - 1; // NOPMD (reasonable use of the ternary operator)
this.firstExecution = true;
numInstances.incrementAndGet();
logger.debug("New number of instances: {}", numInstances.get());
numCopiedInstances.incrementAndGet();
logger.debug("New number of instances: {}", numCopiedInstances.get());
}
@Override
......@@ -92,6 +102,8 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
// logger.debug("left: {}, right: {}", leftInputPort.isClosed(), rightInputPort.isClosed());
leftOutputPort.getPipe().close();
rightOutputPort.getPipe().close();
// leftOutputPort.sendSignal(new TerminatingSignal());
// rightOutputPort.sendSignal(new TerminatingSignal());
}
returnNoElement();
......@@ -103,20 +115,12 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
if (signal instanceof TerminatingSignal && inputPort == this.inputPort) {
leftOutputPort.getPipe().close();
rightOutputPort.getPipe().close();
// leftOutputPort.sendSignal(signal);
// rightOutputPort.sendSignal(signal);
}
super.onSignal(signal, inputPort);
}
/**
* Sets the threshold for parallelism to the specified value.
*
* @param threshold
* Number of new threads to create.
*/
public void setThreshold(final int threshold) {
this.threshold = threshold;
}
/**
* @return <code>InputPort</code>
*/
......@@ -224,24 +228,28 @@ public class DivideAndConquerStage<P extends AbstractDivideAndConquerProblem<P,
return problem != null;
}
/**
* A method to add a new copy (new instance) of this stage to the configuration, which should be executed in a own thread.
*/
private void createCopies() {
DivideAndConquerStageFactory.getInstance().makeCopy(leftOutputPort, leftInputPort, this);
DivideAndConquerStageFactory.getInstance().makeCopy(rightOutputPort, rightInputPort, this);
if (isThresholdReached()) {
new DivideAndConquerRecursivePipe<P, S>(leftOutputPort, leftInputPort);
new DivideAndConquerRecursivePipe<P, S>(rightOutputPort, rightInputPort);
} else {
copy(leftOutputPort, leftInputPort, this);
copy(rightOutputPort, rightInputPort, this);
}
this.firstExecution = false;
}
protected boolean isThresholdReached() {
return threshold - numInstances.get() <= 0;
private void copy(final OutputPort<P> outputPort, final InputPort<S> inputPort, final DivideAndConquerStage<P, S> callingStage) {
DivideAndConquerStage<P, S> newStage = new DivideAndConquerStage<P, S>(numCopiedInstances, threshold);
DynamicConfigurationContext.INSTANCE.connectPorts(outputPort, newStage.getInputPort());
DynamicConfigurationContext.INSTANCE.connectPorts(newStage.getOutputPort(), inputPort);
outputPort.sendSignal(new StartingSignal());
RuntimeServiceFacade.INSTANCE.startWithinNewThread(callingStage, newStage);
}
AtomicInteger getNumInstances() { // NOPMD (package-private)
return numInstances;
private boolean isThresholdReached() {
return threshold - numCopiedInstances.get() <= 0;
}
int getThreshold() { // NOPMD (package-private)
return threshold;
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework;
import teetime.framework.divideandconquer.AbstractDivideAndConquerProblem;
import teetime.framework.divideandconquer.AbstractDivideAndConquerSolution;
import teetime.framework.signal.StartingSignal;
/**
* Used by {@link teetime.framework.DivideAndConquerStage} for thread-safe instantiation.
*
* @author Robin Mohr
*
*/
public class DivideAndConquerStageFactory {
private DivideAndConquerStageFactory() {}
/**
* This inner class is used for thread-safe initialization of the {@link teetime.framework.divideandconquer.DivideAndConquerStageFactory}
*/
private static class Initialization {
private static final DivideAndConquerStageFactory INSTANCE = new DivideAndConquerStageFactory();
}
/**
* Returns the instance or creates a new one if none is present.
*/
public static DivideAndConquerStageFactory getInstance() {
return Initialization.INSTANCE;
}
/**
* Receives and processes incoming problems to divide or solve.
*
* @param inputPort
* The <code>InputPort</code> to connect the new stage to.
*
* @param outputPort
* The <code>OutputPort</code> to connect the new stage to.
*
* @param callingStage
* The existing Stage to connect the new Stage to.
*
* @param <P>
* Type of problem.
* @param <S>
* Type of solution.
*
*/
protected synchronized <P extends AbstractDivideAndConquerProblem<P, S>, S extends AbstractDivideAndConquerSolution<S>> void makeCopy(
final OutputPort<P> outputPort,
final InputPort<S> inputPort, final DivideAndConquerStage<P, S> callingStage) {
if (callingStage.isThresholdReached()) {
new DivideAndConquerRecursivePipe<P, S>(outputPort, inputPort);
} else {
DivideAndConquerStage<P, S> newStage = new DivideAndConquerStage<P, S>(callingStage.getNumInstances(), callingStage.getThreshold());
DynamicConfigurationContext.INSTANCE.connectPorts(outputPort, newStage.getInputPort());
DynamicConfigurationContext.INSTANCE.connectPorts(newStage.getOutputPort(), inputPort);
outputPort.sendSignal(new StartingSignal());
RuntimeServiceFacade.INSTANCE.startWithinNewThread(callingStage, newStage);
}
}
}
......@@ -128,7 +128,9 @@ public final class Execution<T extends Configuration> {
}
}
// TODO: implement
/**
* Terminates all producer stages, interrupts all threads, and waits for a graceful termination.
*/
public void abortEventually() {
configurationContext.abortConfigurationRun();
waitForTermination();
......
......@@ -20,7 +20,7 @@ import java.util.Map;
/**
* Represents a exception, which is thrown by an analysis,
* if any problems occured within its execution.
* if any problems occurred within its execution.
* A collection of thrown exceptions within the analysis
* can be retrieved with {@link #getThrownExceptions()}.
*
......@@ -33,7 +33,8 @@ public class ExecutionException extends RuntimeException {
private final Map<Thread, List<Exception>> exceptions;
public ExecutionException(final Map<Thread, List<Exception>> exceptions) {
super("3003 - " + ((exceptions.size() == 1) ? exceptions.toString() : exceptions.size()) + " error(s) occurred while execution. Check thrown exception(s).");
super("3003 - " + ((exceptions.size() == 1) ? exceptions.toString() : exceptions.size())
+ " error(s) occurred while execution. Check thrown exception(s).");
this.exceptions = exceptions;
}
......
......@@ -25,11 +25,12 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class Identifiable {
private static AtomicInteger nextId = new AtomicInteger();
private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
private final int identifier;
protected Identifiable() {
this.identifier = nextId.incrementAndGet();
this.identifier = ID_GENERATOR.incrementAndGet();
}
protected Identifiable(final int newID) {
......
......@@ -15,13 +15,13 @@
*/
package teetime.framework.exceptionHandling;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractExceptionListenerFactory<T extends AbstractExceptionListener> {
private final Map<Thread, List<Exception>> threadExceptionsMap = new HashMap<Thread, List<Exception>>();
private final Map<Thread, List<Exception>> threadExceptionsMap = new ConcurrentHashMap<Thread, List<Exception>>();
protected abstract T createInstance();
......
......@@ -113,6 +113,45 @@ public final class QuicksortProblem extends AbstractDivideAndConquerProblem<Quic
new QuicksortProblem(this.getID(), lowPointer, high, numbers));
}
public QuicksortSolution solveDirectly() {
solveDirectly(numbers, low, high);
return baseSolve();
}
private void solveDirectly(final int[] numbers, final int low, final int high) {
final int middle = low + (high - low) / 2; // pick the pivot
final int pivot = numbers[middle];
// make left < pivot and right > pivot
int lowPointer = low;
int highPointer = high;
while (lowPointer <= highPointer) {
while (numbers[lowPointer] < pivot) {
lowPointer++;
}
while (numbers[highPointer] > pivot) {
highPointer--;
}
if (lowPointer <= highPointer) {
int temp = numbers[lowPointer];
numbers[lowPointer] = numbers[highPointer];
numbers[highPointer] = temp;
lowPointer++;
highPointer--;
}
}
// recursively sort two sub parts
if (low < highPointer) {
solveDirectly(numbers, low, highPointer);
}
if (lowPointer < high) {
solveDirectly(numbers, lowPointer, high);
}
}
@Override
public QuicksortSolution baseSolve() {
return new QuicksortSolution(
......@@ -121,4 +160,12 @@ public final class QuicksortProblem extends AbstractDivideAndConquerProblem<Quic
this.high,
this.numbers);
}
public QuicksortSolution baseSolve(final int low, final int high, final int[] numbers) {
return new QuicksortSolution(
this.getID(),
low,
high,
numbers);
}
}
<configuration>
<!-- do not ship a log configuration with teetime -->
</configuration>
\ No newline at end of file
<configuration>
<statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>teetime.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<append>false</append>
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder
by default -->
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %level %logger - %msg%n</pattern>
</encoder>
</appender>
<logger name="teetime.framework" level="TRACE" />
<logger name="util.TimingsReader" level="TRACE">
<appender-ref ref="FILE" />
</logger>
<logger name="util.BucketTimingsReader" level="TRACE">
<appender-ref ref="FILE" />
</logger>
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
\ No newline at end of file
......@@ -29,10 +29,9 @@ public class QuicksortConfiguration extends Configuration {
public QuicksortConfiguration(final List<QuicksortProblem> inputs, final List<QuicksortSolution> results) {
// set up stages
InitialElementProducer<QuicksortProblem> initialElementProducer = new InitialElementProducer<QuicksortProblem>(inputs);
DivideAndConquerStage<QuicksortProblem, QuicksortSolution> quicksortStage = new DivideAndConquerStage<QuicksortProblem, QuicksortSolution>();
DivideAndConquerStage<QuicksortProblem, QuicksortSolution> quicksortStage = new DivideAndConquerStage<QuicksortProblem, QuicksortSolution>(2);
CollectorSink<QuicksortSolution> collectorSink = new CollectorSink<QuicksortSolution>(results);
quicksortStage.declareActive();
quicksortStage.setThreshold(2); // set parallelism level to 2
// connect ports
connectPorts(initialElementProducer.getOutputPort(), quicksortStage.getInputPort());
......
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.testutil;
import java.util.Random;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment