Commit 1a954821 authored by Christian Wulf's avatar Christian Wulf

fixed bugs due to the new signal concept

parent 78182435
.handlers = java.util.logging.ConsoleHandler
.level = WARNING
.level = ALL
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = FINE
teetime.variant.methodcallWithPorts.framework.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
teetime.variant.methodcallWithPorts.stage.level = FINE
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -107,6 +107,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
......@@ -116,11 +119,12 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
break;
}
this.getOutputPort().sendSignal(signal);
this.outputPort.sendSignal(signal);
}
protected void onFinished() {
// empty default implementation
this.onIsPipelineHead();
}
@Override
......
package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
......
......@@ -35,7 +35,9 @@ public class OutputPort<T> {
}
public void sendSignal(final Signal signal) {
this.pipe.setSignal(signal);
if (this.pipe != null) { // if the output port is connected with a pipe
this.pipe.setSignal(signal);
}
}
}
......@@ -68,25 +68,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// headStage.sendFinishedSignalToAllSuccessorStages();
// this.updateRescheduable(headStage);
}
private final void updateRescheduable(final StageWithPort<?, ?> stage) {
StageWithPort<?, ?> currentStage = stage;
do {
this.firstStageIndex++;
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
// if (currentStage == null) { // loop reaches the last stage
if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
this.setReschedulable(false);
this.cleanUp();
return;
}
currentStage = this.stages[this.firstStageIndex];
currentStage.onIsPipelineHead();
} while (!currentStage.isReschedulable());
this.setReschedulable(true);
}
// this.setReschedulable(headStage.isReschedulable());
}
// private final void updateRescheduable(final StageWithPort<?, ?> stage) {
// StageWithPort<?, ?> currentStage = stage;
// do {
// this.firstStageIndex++;
// // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
// // if (currentStage == null) { // loop reaches the last stage
// if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
// this.setReschedulable(false);
// this.cleanUp();
// return;
// }
// currentStage = this.stages[this.firstStageIndex];
// currentStage.onIsPipelineHead();
// } while (!currentStage.isReschedulable());
//
// this.setReschedulable(true);
// }
@Override
public void onIsPipelineHead() {
......@@ -133,12 +135,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public boolean isReschedulable() {
return this.reschedulable;
// return this.reschedulable;
return this.firstStage.isReschedulable();
}
public void setReschedulable(final boolean reschedulable) {
this.reschedulable = reschedulable;
}
// public void setReschedulable(final boolean reschedulable) {
// this.reschedulable = reschedulable;
// }
@Override
public InputPort<I> getInputPort() {
......@@ -166,7 +169,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port.");
this.firstStage.onSignal(signal, inputPort);
}
}
package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
public ProducerStage() {
......@@ -9,6 +8,10 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
@Override
public void executeWithPorts() {
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
this.execute5(null);
// if (!this.getOutputPort().pipe.isEmpty()) {
......
......@@ -15,6 +15,8 @@ public class RunnableStage<I> implements Runnable {
@Override
public void run() {
this.logger.debug("Executing runnable stage...");
try {
this.stage.onStart();
......
......@@ -6,7 +6,9 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
@Override
public void setSignal(final Signal signal) {
this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
if (this.getTargetPort() != null) {
this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort());
}
}
}
......@@ -49,13 +49,14 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
@Override
protected void execute5(final T element) {
this.elements.add(element);
if ((this.elements.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.elements.size());
}
if (this.elements.size() > 90000) {
// System.out.println("size > 90000: " + this.elements.size());
}
// if (this.elements.size() > 90000) {
// // System.out.println("size > 90000: " + this.elements.size());
// }
}
}
......@@ -6,7 +6,9 @@ import java.util.List;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public final class Distributor<T> extends AbstractStage<T, T> {
......@@ -50,6 +52,25 @@ public final class Distributor<T> extends AbstractStage<T, T> {
// this.outputPortList.clear();
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<?> op : this.outputPorts) {
op.sendSignal(signal);
}
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
......
......@@ -53,23 +53,6 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
this.inputObjectCreator = inputObjectCreator;
}
// @Override
// protected void execute3() {
// if (this.numInputObjects == 0) {
// // this.getOutputPort().send((T) END_SIGNAL);
// return;
// }
//
// try {
// final T newObject = this.inputObjectCreator.call();
// this.numInputObjects--;
//
// // this.getOutputPort().send(newObject);
// } catch (final Exception e) {
// throw new IllegalStateException(e);
// }
// }
@Override
protected void execute4(final CommittableQueue<Void> elements) {
this.execute5(null);
......@@ -77,6 +60,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override
protected void execute5(final Void element) {
// this.logger.debug("Executing object producer...");
T newObject = null;
newObject = this.inputObjectCreator.create();
this.numInputObjects--;
......@@ -90,10 +75,4 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
this.send(newObject);
}
// @Override
// public void onIsPipelineHead() {
// // this.getOutputPort().pipe = null; // no performance increase
// super.onIsPipelineHead();
// }
}
......@@ -7,7 +7,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
public class Relay<T> extends AbstractStage<T, T> {
private SpScPipe<T> inputPipe;
private SpScPipe<T> cachedCastedInputPipe;
public Relay() {
this.setReschedulable(true);
......@@ -18,7 +18,7 @@ public class Relay<T> extends AbstractStage<T, T> {
T element = this.getInputPort().receive();
if (null == element) {
// if (this.getInputPort().getPipe().isClosed()) {
if (this.inputPipe.getSignal() == Signal.FINISHED) {
if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) {
this.setReschedulable(false);
assert 0 == this.getInputPort().getPipe().size();
}
......@@ -30,7 +30,7 @@ public class Relay<T> extends AbstractStage<T, T> {
@Override
public void onStart() {
this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
this.cachedCastedInputPipe = (SpScPipe<T>) this.getInputPort().getPipe();
super.onStart();
}
......
......@@ -45,8 +45,8 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<?, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage(pipeline);
Pipeline<Void, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage<Void>(pipeline);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment