Commit a6da1a4a authored by Christian Wulf's avatar Christian Wulf

fixed incrementation of the runnable counter

parent a7170c5e
......@@ -54,40 +54,34 @@ abstract class AbstractRunnableStage implements Runnable {
logger.debug("Executing runnable stage...");
try {
beforeStageExecution();
if (stage.getOwningContext() == null) {
throw new IllegalArgumentException("Argument stage may not have a nullable owning context");
}
stopWatch.start();
try {
beforeStageExecution();
if (stage.getOwningContext() == null) {
throw new IllegalArgumentException("Argument stage may not have a nullable owning context");
}
stopWatch.start();
try {
while (!stage.shouldBeTerminated()) {
executeStage();
}
} catch (TerminateException e) {
stage.abort();
stage.getOwningContext().abortConfigurationRun();
} finally {
stopWatch.end();
durationsInNs.put(stage, stopWatch.getDurationInNs());
afterStageExecution();
while (!stage.shouldBeTerminated()) {
executeStage();
}
} catch (RuntimeException e) {
logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
throw e;
} catch (InterruptedException e) {
// logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
stage.getExceptionListener().reportException(e, stage);
}
} finally {
if (stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
stage.getOwningContext().getThreadService().getRunnableCounter().dec();
} catch (TerminateException e) {
stage.abort();
stage.getOwningContext().abortConfigurationRun();
} finally {
stopWatch.end();
durationsInNs.put(stage, stopWatch.getDurationInNs());
afterStageExecution();
}
} catch (RuntimeException e) {
logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
throw e;
} catch (InterruptedException e) {
// logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e);
stage.getExceptionListener().reportException(e, stage);
}
if (logger.isDebugEnabled()) {
logger.debug("Finished runnable stage. (" + stage.getId() + ")");
logger.debug("Finished runnable stage. ({})", stage.getId());
}
}
......@@ -97,12 +91,4 @@ abstract class AbstractRunnableStage implements Runnable {
protected abstract void afterStageExecution();
// static AbstractRunnableStage create(final AbstractStage stage) {
// if (stage.getInputPorts().size() > 0) {
// return new RunnableConsumerStage(stage);
// } else {
// return new RunnableProducerStage(stage);
// }
// }
}
......@@ -33,8 +33,21 @@ public class TeeTimeThread extends Thread {
@Override
public void start() {
synchronized (this) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc();
if (runnable.stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().inc();
}
super.start();
}
}
@Override
public void run() {
try {
super.run();
} finally {
if (runnable.stage.getTerminationStrategy() != TerminationStrategy.BY_INTERRUPT) {
runnable.stage.getOwningContext().getThreadService().getRunnableCounter().dec();
}
}
}
}
......@@ -7,8 +7,7 @@ import teetime.framework.Execution;
public class ClockTerminationConfigTest {
// @Test(timeout = 500)
@Test
@Test(timeout = 1000)
public void executeWithoutTimeout() throws Exception {
ClockTerminationConfig configuration = new ClockTerminationConfig();
Execution<Configuration> execution = new Execution<Configuration>(configuration);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment