Commit 6891fdec authored by Christian Wulf's avatar Christian Wulf

added awaitThreadState();

fixed DynamicTaskFarmStageTest;
worked on TaskFarmSchedulerImpl
parent ba5d1ecc
...@@ -251,14 +251,17 @@ public abstract class AbstractStage { ...@@ -251,14 +251,17 @@ public abstract class AbstractStage {
/** /**
* Declares this stage to be executed by an own thread. * Declares this stage to be executed by an own thread.
* <p>
* Depends on the used scheduler.
*/ */
public void declareActive() { public void declareActive() {
if (getCurrentState() == StageState.STARTED) { // if (getCurrentState() == StageState.STARTED) {
// TODO implement so that active/passive can be changed even at runtime // TODO implement so that active/passive can be changed even at runtime
// requires: volatile isActive // implemented for the dynamic task farm stage, but not yet for arbitrary stages
// requires: to declare further stages active (cascading) // requires: volatile isActive
throw new UnsupportedOperationException("Declaring a stage 'active' at runtime is not yet supported."); // requires: to declare further stages active (cascading)
} // throw new UnsupportedOperationException("Declaring a stage 'active' at runtime is not yet supported.");
// }
// serves as acknowledgement and thus must be set at the end // serves as acknowledgement and thus must be set at the end
this.isActive = true; this.isActive = true;
...@@ -266,6 +269,8 @@ public abstract class AbstractStage { ...@@ -266,6 +269,8 @@ public abstract class AbstractStage {
/** /**
* Declares this stage to be executed by the thread of its predecessor stage. * Declares this stage to be executed by the thread of its predecessor stage.
* <p>
* Depends on the used scheduler.
*/ */
public void declarePassive() { public void declarePassive() {
// TODO implement so that active/passive can be changed even at runtime // TODO implement so that active/passive can be changed even at runtime
......
...@@ -17,6 +17,7 @@ package teetime.framework; ...@@ -17,6 +17,7 @@ package teetime.framework;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
/** /**
* Represents a exception, which is thrown by an analysis, * Represents a exception, which is thrown by an analysis,
...@@ -33,8 +34,7 @@ public class ExecutionException extends RuntimeException { ...@@ -33,8 +34,7 @@ public class ExecutionException extends RuntimeException {
private final Map<Thread, List<Exception>> exceptions; private final Map<Thread, List<Exception>> exceptions;
public ExecutionException(final Map<Thread, List<Exception>> exceptions) { public ExecutionException(final Map<Thread, List<Exception>> exceptions) {
super("3003 - " + ((exceptions.size() == 1) ? exceptions.toString() : exceptions.size()) super(); // we do not pass a message. Instead, we override getMesage()
+ " error(s) occurred while execution. Check thrown exception(s).");
this.exceptions = exceptions; this.exceptions = exceptions;
} }
...@@ -49,4 +49,23 @@ public class ExecutionException extends RuntimeException { ...@@ -49,4 +49,23 @@ public class ExecutionException extends RuntimeException {
return exceptions; return exceptions;
} }
@Override
public String getMessage() {
StringBuilder builder = new StringBuilder();
builder.append("3003 - " + exceptions.size() + " error(s) occurred while execution.");
for (Entry<Thread, List<Exception>> threadExceptions : exceptions.entrySet()) {
builder.append("\n\t");
builder.append(threadExceptions.getKey());
builder.append(": ");
for (Exception exception : threadExceptions.getValue()) {
builder.append("\n\t\t");
builder.append(exception.toString());
}
}
builder.append("\n");
return builder.toString();
}
} }
...@@ -112,13 +112,14 @@ public class DynamicTaskFarmStage<I, O, T extends ITaskFarmDuplicable<I, O>> ext ...@@ -112,13 +112,14 @@ public class DynamicTaskFarmStage<I, O, T extends ITaskFarmDuplicable<I, O>> ext
LOGGER.debug("Adding stage (current amount of stages: {})", getWorkerStages().size()); LOGGER.debug("Adding stage (current amount of stages: {})", getWorkerStages().size());
} }
if (!getMerger().isActive()) { TaskFarmScheduler scheduler = configuration.getScheduler();
getMerger().declareActive(); if (!scheduler.isActive(getMerger())) {
scheduler.declareActive(getMerger());
} }
AbstractStage basicEnclosedStage = getBasicEnclosedStage().getInputPort().getOwningStage(); AbstractStage basicEnclosedStage = getBasicEnclosedStage().getInputPort().getOwningStage();
if (!basicEnclosedStage.isActive()) { if (!scheduler.isActive(basicEnclosedStage)) {
basicEnclosedStage.declareActive(); scheduler.declareActive(basicEnclosedStage);
} }
final ITaskFarmDuplicable<I, O> newStage = getBasicEnclosedStage().duplicate(); final ITaskFarmDuplicable<I, O> newStage = getBasicEnclosedStage().duplicate();
...@@ -129,12 +130,14 @@ public class DynamicTaskFarmStage<I, O, T extends ITaskFarmDuplicable<I, O>> ext ...@@ -129,12 +130,14 @@ public class DynamicTaskFarmStage<I, O, T extends ITaskFarmDuplicable<I, O>> ext
getDistributor().addPortActionRequest(distributorPortAction); getDistributor().addPortActionRequest(distributorPortAction);
distributorPortAction.waitForCompletion(); distributorPortAction.waitForCompletion();
System.out.println("distributorPortAction completed");
final CreatePortActionMerger<O> mergerPortAction = new CreatePortActionMerger<O>(newStage.getOutputPort(), final CreatePortActionMerger<O> mergerPortAction = new CreatePortActionMerger<O>(newStage.getOutputPort(),
getPipeCapacity()); getPipeCapacity());
getMerger().addPortActionRequest(mergerPortAction); getMerger().addPortActionRequest(mergerPortAction);
mergerPortAction.waitForCompletion(); mergerPortAction.waitForCompletion();
System.out.println("mergerPortAction completed");
// the validating and the starting signal is sent by the create action // the validating and the starting signal is sent by the create action
RuntimeServiceFacade.INSTANCE.startWithinNewThread(getDistributor(), newStage.getInputPort().getOwningStage()); RuntimeServiceFacade.INSTANCE.startWithinNewThread(getDistributor(), newStage.getInputPort().getOwningStage());
......
package teetime.stage.taskfarm; package teetime.stage.taskfarm;
import teetime.framework.AbstractStage; import java.util.List;
import teetime.framework.TeeTimeService;
import teetime.framework.*;
public class TaskFarmSchedulerImpl implements TaskFarmScheduler { public class TaskFarmSchedulerImpl implements TaskFarmScheduler {
...@@ -15,20 +16,31 @@ public class TaskFarmSchedulerImpl implements TaskFarmScheduler { ...@@ -15,20 +16,31 @@ public class TaskFarmSchedulerImpl implements TaskFarmScheduler {
@Override @Override
public void declareActive(final AbstractStage stage) { public void declareActive(final AbstractStage stage) {
List<InputPort<?>> inputPorts = StageFacade.INSTANCE.getInputPorts(stage);
for (InputPort<?> inputPort : inputPorts) {
OutputPort<?> sourcePort = inputPort.getPipe().getSourcePort();
// FIXME connect a synched pipe between sourcePort-->inputPort
// new BoundedSynchedPipe<T>(sourcePort, inputPort, capacity);
// sourcePort.sendSignal(new ValidatingSignal());
// sourcePort.sendSignal(new StartingSignal());
// FIXME the executing thread must also be informed about the new pipe impl of the sourcePort
}
scheduler.startStageAtRuntime(stage);
stage.declareActive(); stage.declareActive();
memoryBarrier = true; memoryBarrier = true;
} }
@Override @Override
public void declarePassive(final AbstractStage stage) { public void declarePassive(final AbstractStage stage) {
stage.declarePassive(); throw new UnsupportedOperationException("Method is not used by the task farm.");
memoryBarrier = true;
} }
@Override @Override
public boolean isActive(final AbstractStage stage) { public boolean isActive(final AbstractStage stage) {
memoryBarrier = stage.isActive(); boolean isActive = memoryBarrier;
return memoryBarrier; isActive = stage.isActive();
return isActive;
} }
} }
...@@ -18,6 +18,8 @@ package teetime.stage.taskfarm; ...@@ -18,6 +18,8 @@ package teetime.stage.taskfarm;
import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.lang.Thread.State;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
...@@ -27,6 +29,7 @@ import teetime.framework.TeeTimeService; ...@@ -27,6 +29,7 @@ import teetime.framework.TeeTimeService;
import teetime.framework.scheduling.pushpullmodel.PushPullScheduling; import teetime.framework.scheduling.pushpullmodel.PushPullScheduling;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.Counter; import teetime.stage.Counter;
import teetime.testutil.AssertHelper;
public class DynamicTaskFarmStageTest { public class DynamicTaskFarmStageTest {
...@@ -36,7 +39,7 @@ public class DynamicTaskFarmStageTest { ...@@ -36,7 +39,7 @@ public class DynamicTaskFarmStageTest {
final Integer[] elements = { 1, 2, 3 }; final Integer[] elements = { 1, 2, 3 };
final ElementTrigger<Integer> producer = new ElementTrigger<Integer>(elements); final ElementTrigger<Integer> producer = new ElementTrigger<Integer>(elements);
final DynamicTaskFarmStage<Integer, Integer, Counter<Integer>> dynamicTaskFarmStage = new DynamicTaskFarmStage<Integer, Integer, Counter<Integer>>( final DynamicTaskFarmStage<Integer, Integer, Counter<Integer>> dynamicTaskFarmStage = new DynamicTaskFarmStage<>(
new Counter<Integer>(), 1); new Counter<Integer>(), 1);
final CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(); final CollectorSink<Integer> collectorSink = new CollectorSink<Integer>();
...@@ -56,13 +59,39 @@ public class DynamicTaskFarmStageTest { ...@@ -56,13 +59,39 @@ public class DynamicTaskFarmStageTest {
producer.trigger(); producer.trigger();
assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(false)); assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(false));
dynamicTaskFarmStage.addStageAtRuntime(); Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
dynamicTaskFarmStage.addStageAtRuntime();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Thread 1 finished");
}
});
thread.start();
AssertHelper.awaitThreadStatus(thread, State.WAITING);
producer.trigger(); producer.trigger();
// assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(true)); // TODO uncomment if "declareActive at runtime" is implemented assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(true));
thread = new Thread(new Runnable() {
@Override
public void run() {
try {
dynamicTaskFarmStage.removeStageAtRuntime();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Thread 2 finished");
}
});
thread.start();
AssertHelper.awaitThreadStatus(thread, State.WAITING);
dynamicTaskFarmStage.removeStageAtRuntime();
producer.trigger(); producer.trigger();
// assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(false)); // TODO uncomment if "declareActive at runtime" is implemented // assertThat(dynamicTaskFarmStage.getMerger().isActive(), is(false));
execution.abortEventually(); execution.abortEventually();
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
*/ */
package teetime.stage.taskfarm; package teetime.stage.taskfarm;
import java.util.*; import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -24,12 +26,13 @@ import teetime.framework.AbstractProducerStage; ...@@ -24,12 +26,13 @@ import teetime.framework.AbstractProducerStage;
public class ElementTrigger<E> extends AbstractProducerStage<E> { public class ElementTrigger<E> extends AbstractProducerStage<E> {
private static final Object TRIGGER = new Object(); private static final Object TRIGGER = new Object();
private static final Object TERMINATE_TRIGGER = null; private static final Object TERMINATE_TRIGGER = new Object();
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(); private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
private final Collection<E> elements; private final Collection<E> elements;
private Iterator<E> iterator; private Iterator<E> iterator;
@SafeVarargs
public ElementTrigger(final E... elements) { public ElementTrigger(final E... elements) {
this(Arrays.asList(elements)); this(Arrays.asList(elements));
} }
......
...@@ -15,9 +15,11 @@ ...@@ -15,9 +15,11 @@
*/ */
package teetime.testutil; package teetime.testutil;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat; import static org.junit.Assert.*;
import java.lang.Thread.State;
public final class AssertHelper { public final class AssertHelper {
...@@ -40,4 +42,10 @@ public final class AssertHelper { ...@@ -40,4 +42,10 @@ public final class AssertHelper {
} }
} }
} }
public static void awaitThreadStatus(final Thread thread, final State state) throws InterruptedException {
while (thread.getState() != state) {
Thread.sleep(1);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment