Commit c912651d authored by Christian Wulf's avatar Christian Wulf

switched to the generic distributor

parent 92215792
......@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.level = ALL
#teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = FINE
teetime.variant.methodcallWithPorts.framework.core.level = FINE
teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -107,8 +107,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
......
package teetime.variant.methodcallWithPorts.stage;
import java.util.ArrayList;
import java.util.List;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public final class Distributor<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor
private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>();
private OutputPort<T>[] outputPorts;
private int nextOutputPortIndex;
private int size;
// private int mask;
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
OutputPort<T> outputPort = this.outputPorts[this.nextOutputPortIndex % this.size];
this.nextOutputPortIndex++;
outputPort.send(element);
}
@Override
public void onIsPipelineHead() {
// for (OutputPort<?> op : this.outputPorts) {
// op.getPipe().close();
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("End signal sent, size: " + op.getPipe().size());
// }
// }
// for (OutputPort<?> op : this.outputPorts) {
// op.pipe = null;
// }
// this.outputPorts = null;
// this.outputPortList.clear();
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<?> op : this.outputPorts) {
op.sendSignal(signal);
}
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
this.size = this.outputPortList.size();
// this.mask = this.size - 1;
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
// System.out.println("outputPorts: " + this.outputPorts);
}
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
}
private OutputPort<T> getNewOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
this.outputPortList.add(outputPort);
return outputPort;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
}
}
......@@ -37,23 +37,23 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
long diffInNs = timestampInNs - this.lastTimestampInNs;
// BETTER use the TimeUnit of the clock
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
if (diffInMs > 0) {
long throughputPerSec = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
long throughputPerTimeUnit = -1;
this.resetTimestamp(timestampInNs);
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
throughputPerTimeUnit = this.numPassedElements / diffInSec;
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
} else {
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
if (diffInMs > 0) {
throughputPerTimeUnit = this.numPassedElements / diffInMs;
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
}
}
this.throughputs.add(throughputPerTimeUnit);
this.resetTimestamp(timestampInNs);
}
private void resetTimestamp(final Long timestampInNs) {
......
......@@ -20,7 +20,9 @@ import java.util.ArrayList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
* @author Christian Wulf
......@@ -39,12 +41,13 @@ public class Distributor<T> extends AbstractStage<T, T> {
private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
}
@Override
......@@ -60,6 +63,24 @@ public class Distributor<T> extends AbstractStage<T, T> {
// }
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<T> op : this.outputPortList) {
op.sendSignal(signal);
}
}
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
......@@ -75,13 +96,12 @@ public class Distributor<T> extends AbstractStage<T, T> {
return this.outputPortList;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
this.execute5(element);
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
}
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -30,7 +30,6 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
......@@ -38,6 +37,7 @@ import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.Sink;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -12,10 +12,10 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
......
......@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -17,12 +17,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -18,12 +18,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment