Commit 26d25f26 authored by Christian Wulf's avatar Christian Wulf

Merge branch 'issue-303-Finding-producer-stages-in-not-connected-architecture' into 'master'

Issue 303 finding producer stages in not connected architecture



See merge request !83
parents cdbfb33d a6da1a4a
......@@ -54,40 +54,34 @@ abstract class AbstractRunnableStage implements Runnable {
logger.debug("Executing runnable stage...");
try {
beforeStageExecution();
if (stage.getOwningContext() == null) {
throw new IllegalArgumentException("Argument stage may not have a nullable owning context");
}
stopWatch.start();
try {
beforeStageExecution();
if (stage.getOwningContext() == null) {
throw new IllegalArgumentException("Argument stage may not have a nullable owning context");
}
stopWatch.start();
try {
while (!stage.shouldBeTerminated()) {
executeStage();
}
} catch (TerminateException e) {
stage.abort();
stage.getOwningContext().abortConfigurationRun();
} finally {
stopWatch.end();
durationsInNs.put(stage, stopWatch.getDurationInNs());
afterStageExecution();
while (!stage.shouldBeTerminated()) {
executeStage();
}
} catch (RuntimeException e) {
logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
throw e;
} catch (InterruptedException e) {
// logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
stage.getExceptionListener().reportException(e, stage);
}
} finally {
if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
stage.getOwningContext().getThreadService().getRunnableCounter().dec();
} catch (TerminateException e) {
stage.abort();
stage.getOwningContext().abortConfigurationRun();
} finally {
stopWatch.end();
durationsInNs.put(stage, stopWatch.getDurationInNs());
afterStageExecution();
}
} catch (RuntimeException e) {
logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
throw e;
} catch (InterruptedException e) {
// logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
stage.getExceptionListener().reportException(e, stage);
}
if (logger.isDebugEnabled()) {
logger.debug("Finished runnable stage. (" + stage.getId() + ")");
logger.debug("Finished runnable stage. ({})", stage.getId());
}
}
......@@ -97,12 +91,4 @@ abstract class AbstractRunnableStage implements Runnable {
protected abstract void afterStageExecution();
// static AbstractRunnableStage create(final AbstractStage stage) {
// if (stage.getInputPorts().size() > 0) {
// return new RunnableConsumerStage(stage);
// } else {
// return new RunnableProducerStage(stage);
// }
// }
}
......@@ -15,6 +15,10 @@
*/
package teetime.framework;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import teetime.framework.exceptionHandling.AbstractExceptionListenerFactory;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
......@@ -30,10 +34,10 @@ public class Configuration extends CompositeStage {
private final AbstractExceptionListenerFactory<?> factory;
private final ConfigurationContext context;
private final Set<AbstractStage> startStages;
private boolean initialized;
private boolean executed;
private AbstractStage startStage;
public Configuration() {
this(new TerminatingExceptionListenerFactory());
......@@ -42,6 +46,7 @@ public class Configuration extends CompositeStage {
public Configuration(final AbstractExceptionListenerFactory<?> factory) {
this.factory = factory;
this.context = new ConfigurationContext(this);
this.startStages = new HashSet<AbstractStage>();
}
boolean isInitialized() {
......@@ -71,12 +76,12 @@ public class Configuration extends CompositeStage {
* A custom pipe instance
*/
public void registerCustomPipe(final AbstractPipe<?> pipe) {
startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing
startStages.add(pipe.getSourcePort().getOwningStage()); // memorize all source stages as starting point for traversing
}
@Override
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
startStage = sourcePort.getOwningStage(); // memorize an arbitrary stage as starting point for traversing
startStages.add(sourcePort.getOwningStage()); // memorize all source stages as starting point for traversing
super.connectPorts(sourcePort, targetPort, capacity);
}
......@@ -84,8 +89,8 @@ public class Configuration extends CompositeStage {
return context;
}
AbstractStage getStartStage() {
return startStage;
public Collection<AbstractStage> getStartStages() {
return startStages;
}
}
......@@ -33,8 +33,21 @@ public class TeeTimeThread extends Thread {
@Override
public void start() {
synchronized (this) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc();
if (runnable.stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc();
}
super.start();
}
}
@Override
public void run() {
try {
super.run();
} finally {
if (runnable.stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().dec();
}
}
}
}
......@@ -15,6 +15,8 @@
*/
package teetime.framework;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
......@@ -53,30 +55,33 @@ class ThreadService extends AbstractService<ThreadService> { // NOPMD
@Override
void onInitialize() {
AbstractStage startStage = configuration.getStartStage();
Collection<AbstractStage> startStages = configuration.getStartStages();
Set<AbstractStage> newThreadableStages = initialize(startStage);
Set<AbstractStage> newThreadableStages = initialize(startStages);
startThreads(newThreadableStages);
}
void startStageAtRuntime(final AbstractStage newStage) {
newStage.declareActive();
List<AbstractStage> newStages = Arrays.asList(newStage);
Set<AbstractStage> newThreadableStages = initialize(newStage);
Set<AbstractStage> newThreadableStages = initialize(newStages);
startThreads(newThreadableStages);
sendStartingSignal(newThreadableStages);
}
// extracted for runtime use
private Set<AbstractStage> initialize(final AbstractStage startStage) {
if (startStage == null) {
private Set<AbstractStage> initialize(final Collection<AbstractStage> startStages) {
if (startStages.isEmpty()) {
throw new IllegalStateException("The start stage may not be null.");
}
A1ThreadableStageCollector stageCollector = new A1ThreadableStageCollector();
Traverser traversor = new Traverser(stageCollector, Direction.BOTH);
traversor.traverse(startStage);
for (AbstractStage startStage : startStages) {
traversor.traverse(startStage);
}
Set<AbstractStage> newThreadableStages = stageCollector.getThreadableStages();
......@@ -90,7 +95,9 @@ class ThreadService extends AbstractService<ThreadService> { // NOPMD
A3PipeInstantiation pipeVisitor = new A3PipeInstantiation();
traversor = new Traverser(pipeVisitor, Direction.BOTH);
traversor.traverse(startStage);
for (AbstractStage startStage : startStages) {
traversor.traverse(startStage);
}
A4StageAttributeSetter attributeSetter = new A4StageAttributeSetter(configuration, newThreadableStages);
attributeSetter.setAttributes();
......
package teetime.examples.clocktermination;
import teetime.framework.Configuration;
import teetime.stage.Clock;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.Sink;
/**
* This configuration is special because it did not terminate in previous versions of teetime.
* In these versions, all stages were collected which were reachable from the stage which was connected first by <code>connectPorts()</code>.
* Since the <code>firstProducer</code> cannot be reached by this stage (here: the clock stage),
* the <code>firstProducer</code> is not recognized as a stage and especially not as a producer stage.
* Since the clock is an infinite producer, the execution waits for its termination an infinite amount of time.
*
* @author Christian Wulf
*
*/
public class ClockTerminationConfig extends Configuration {
public ClockTerminationConfig() {
InitialElementProducer<Integer> firstProducer = new InitialElementProducer<Integer>(0, 1);
Clock clock = new Clock();
Sink<Integer> firstSink = new Sink<Integer>();
Sink<Long> secondSink = new Sink<Long>();
connectPorts(firstProducer.getOutputPort(), firstSink.getInputPort());
connectPorts(clock.getOutputPort(), secondSink.getInputPort());
}
}
package teetime.examples.clocktermination;
import org.junit.Test;
import teetime.framework.Configuration;
import teetime.framework.Execution;
public class ClockTerminationConfigTest {
@Test(timeout = 1000)
public void executeWithoutTimeout() throws Exception {
ClockTerminationConfig configuration = new ClockTerminationConfig();
Execution<Configuration> execution = new Execution<Configuration>(configuration);
execution.executeBlocking();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment