Commit 4539b5cc authored by Christian Wulf's avatar Christian Wulf

fixed test

parent 1a954821
.handlers = java.util.logging.ConsoleHandler
.level = ALL
.level = WARNING
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
teetime.level = ALL
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
teetime.variant.methodcallWithPorts.stage.level = FINE
#teetime.variant.methodcallWithPorts.framework.level = ALL
#teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = FINE
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -20,10 +20,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe<T> pipe = new SpScPipe<T>(capacity);
targetPort.setPipe(pipe);
if (sourcePort != null) {
sourcePort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
sourcePort.setPipe(pipe);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
return pipe;
}
......
......@@ -18,14 +18,14 @@ public class Cache<T> extends ConsumerStage<T, T> {
@Override
public void onIsPipelineHead() {
this.logger.debug("Emitting cached elements...");
this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
}
stopWatch.end();
System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
}
......
......@@ -27,12 +27,16 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/
public class CollectorSink<T> extends ConsumerStage<T, Void> {
private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead
private final List<T> elements;
private final int threshold;
public CollectorSink(final List<T> list) {
public CollectorSink(final List<T> list, final int threshold) {
this.elements = list;
this.threshold = threshold;
}
public CollectorSink(final List<T> list) {
this(list, 100000);
}
@Override
......@@ -50,7 +54,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
protected void execute5(final T element) {
this.elements.add(element);
if ((this.elements.size() % THRESHOLD) == 0) {
if ((this.elements.size() % this.threshold) == 0) {
System.out.println("size: " + this.elements.size());
}
......
......@@ -35,13 +35,24 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
// BETTER use the TimeUnit of the clock
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
if (diffInMs > 0) {
long throughputPerSec = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
} else {
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
}
}
}
......
......@@ -5,12 +5,16 @@ import java.util.HashMap;
import java.util.Map;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import com.google.common.io.Files;
public class FileExtensionSwitch extends ConsumerStage<File, File> {
// BETTER do not extends from AbstractStage since it provides another unused output port
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override
......@@ -23,6 +27,24 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
}
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<File> op : this.fileExtensions.values()) {
op.sendSignal(signal);
}
}
public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1);
......
......@@ -21,6 +21,7 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
*
......@@ -40,6 +41,8 @@ public class Merger<T> extends ConsumerStage<T, T> {
// BETTER use an array since a list always creates a new iterator when looping
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private int finishedInputPorts;
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
public IMergerStrategy<T> getStrategy() {
......@@ -65,6 +68,29 @@ public class Merger<T> extends ConsumerStage<T, T> {
this.setReschedulable(isReschedulable);
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
if (this.finishedInputPorts == this.inputPortList.size()) {
this.getOutputPort().sendSignal(signal);
}
}
@Override
protected void onFinished() {
this.finishedInputPorts++;
}
@Override
protected void execute5(final T element) {
final T token = this.strategy.getNextInput(this);
......
......@@ -18,7 +18,7 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord;
import java.io.File;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine.TextLine2RecordFilter;
......@@ -40,6 +40,6 @@ public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> {
this.setLastStage(textLine2RecordFilter);
// BETTER let the framework choose the optimal pipe implementation
SpScPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort(), 1);
SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
}
}
......@@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -78,8 +80,9 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
}
@Test
......@@ -105,8 +108,9 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -134,8 +138,21 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(1, trace1.getTraceMetadata().getTraceId());
this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
}
private void removeFirstZeroThroughputs(final TraceReconstructionAnalysis analysis) {
List<Long> throughputs = analysis.getThroughputs();
Iterator<Long> iterator = throughputs.iterator();
while (iterator.hasNext()) {
if (iterator.next() == 0) {
iterator.remove();
} else {
break;
}
}
}
}
......@@ -50,10 +50,10 @@ public class TraceReconstructionAnalysis extends Analysis {
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Pipeline<?, ?> pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage(pipeline));
Pipeline<File, ?> pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage<File>(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
......@@ -84,7 +84,7 @@ public class TraceReconstructionAnalysis extends Analysis {
stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// connect stages
SpScPipe.connect(null, dir2RecordsFilter.getInputPort(), 1);
dir2RecordsFilter.getInputPort().setPipe(new SingleElementPipe<File>());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort());
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
......@@ -126,7 +126,13 @@ public class TraceReconstructionAnalysis extends Analysis {
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
try {
this.clockThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
public List<TraceEventRecords> getElementCollection() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment