Commit ca40988a authored by Christian Wulf's avatar Christian Wulf

Analysis now returns its threads' exceptions;

Added FiniteSignalPassingTest
parent a6b27b97
......@@ -56,7 +56,6 @@ public abstract class AbstractStage implements StageWithPort {
// return outputPort.send(element);
}
@SuppressWarnings("unchecked")
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
if (null == outputPort.getPipe()) { // if port is unconnected
......
package teetime.variant.methodcallWithPorts.framework.core;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Analysis {
import teetime.util.Pair;
public class Analysis implements UncaughtExceptionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class);
......@@ -16,6 +21,8 @@ public class Analysis {
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>();
public Analysis(final Configuration configuration) {
this.configuration = configuration;
}
......@@ -37,17 +44,24 @@ public class Analysis {
}
}
public void start() {
/**
*
* @return a map of thread/throwable pair
*/
public Collection<Pair<Thread, Throwable>> start() {
// start analysis
for (Thread thread : this.consumerThreads) {
thread.setUncaughtExceptionHandler(this);
thread.start();
}
for (Thread thread : this.finiteProducerThreads) {
thread.setUncaughtExceptionHandler(this);
thread.start();
}
for (Thread thread : this.infiniteProducerThreads) {
thread.setUncaughtExceptionHandler(this);
thread.start();
}
......@@ -75,9 +89,16 @@ public class Analysis {
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
return this.exceptions;
}
public Configuration getConfiguration() {
return this.configuration;
}
@Override
public void uncaughtException(final Thread t, final Throwable e) {
this.exceptions.add(Pair.of(t, e));
}
}
......@@ -42,6 +42,9 @@ public class RunnableStage implements Runnable {
TerminatingSignal terminatingSignal = new TerminatingSignal();
this.stage.onSignal(terminatingSignal, null);
} catch (Error e) {
this.logger.error("Terminating thread due to the following exception: ", e);
throw e;
} catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e);
throw e;
......
package teetime.variant.methodcallWithPorts.examples.loopStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
public class Countdown extends ProducerStage<Void> {
private final InputPort<Integer> countdownInputPort = this.createInputPort();
private final OutputPort<Integer> newCountdownOutputPort = this.createOutputPort();
private final Integer initialCountdown;
public Countdown(final Integer initialCountdown) {
this.initialCountdown = initialCountdown;
}
@Override
public void onStarting() {
this.countdownInputPort.getPipe().add(this.initialCountdown);
super.onStarting();
}
@Override
protected void execute() {
Integer countdown = this.countdownInputPort.receive();
if (countdown == 0) {
this.send(this.outputPort, null);
this.terminate();
} else {
this.send(this.newCountdownOutputPort, --countdown);
}
}
public InputPort<Integer> getCountdownInputPort() {
return this.countdownInputPort;
}
public OutputPort<Integer> getNewCountdownOutputPort() {
return this.newCountdownOutputPort;
}
}
package teetime.variant.methodcallWithPorts.examples.loopStage;
import static org.junit.Assert.assertEquals;
import java.util.Collection;
import org.junit.Test;
import teetime.util.Pair;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
public class FiniteSignalPassingTest {
@Test(timeout = 5000)
// may not run infinitely, so we set an arbitrary timeout that is high enough
public void testStartSignalDoesNotEndUpInInfiniteLoop() throws Exception {
LoopStageAnalysisConfiguration configuration = new LoopStageAnalysisConfiguration();
Analysis analysis = new Analysis(configuration);
analysis.init();
Collection<Pair<Thread, Throwable>> exceptions = analysis.start();
assertEquals(0, exceptions.size());
}
}
package teetime.variant.methodcallWithPorts.examples.loopStage;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class LoopStageAnalysisConfiguration extends Configuration {
public LoopStageAnalysisConfiguration() {
Countdown countdown = new Countdown(10);
PipeFactory.INSTANCE.create(ThreadCommunication.INTRA)
.connectPorts(countdown.getNewCountdownOutputPort(), countdown.getCountdownInputPort());
this.getFiniteProducerStages().add(countdown);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment