Commit d2ff4570 authored by Christian Wulf's avatar Christian Wulf

added TraceReductionFilter and test

parent 2fff26a3
......@@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
teetime.variant.methodcallWithPorts.stage.level = WARNING
teetime.variant.methodcallWithPorts.stage.level = FINE
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -4,8 +4,8 @@ java=~/jdk1.7.0_60/bin/java
cp=.:MooBench.jar:META-INF/kieker.monitoring.properties:META-INF/kieker.logging.properties
jvmParams="-javaagent:lib/kieker-1.9_aspectj.jar -Dorg.aspectj.weaver.loadtime.configuration=META-INF/kieker.aop.xml -Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPWriter"
params="-d 10 -h 1 -m 0 -t 1000000 -o tmp/test.txt -q"
runs=$1
#runs=$1
for i in {1..${runs}}; do
for i in {1..3}; do
${java} -cp ${cp} ${jvmParams} mooBench.benchmark.Benchmark ${params};
done
......@@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
......@@ -17,12 +18,13 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numPassedElements++;
this.send(element);
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
}
@Override
......@@ -33,10 +35,11 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (diffInNs > 0) {
long throughputInNsPerElement = this.numPassedElements / diffInNs;
this.throughputs.add(throughputInNsPerElement);
this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit");
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
}
......
......@@ -15,7 +15,6 @@ public class Relay<T> extends AbstractStage<T, T> {
if (null == element) {
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
assert 0 == this.getInputPort().getPipe().size();
}
Thread.yield();
......@@ -26,7 +25,6 @@ public class Relay<T> extends AbstractStage<T, T> {
@Override
public void onIsPipelineHead() {
this.logger.debug("onIsPipelineHead");
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
}
......
......@@ -19,7 +19,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import teetime.util.HashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
......@@ -40,7 +39,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1;
private final Map<Long, TraceBuffer> traceId2trace = new HashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<Long, TraceBuffer> traceId2trace;
public TraceReconstructionFilter(final Map<Long, TraceBuffer> traceId2trace) {
super();
this.traceId2trace = traceId2trace;
}
@Override
protected void execute5(final IFlowRecord element) {
......
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* Buffer for similar traces that are to be aggregated into a single trace.
*
* @author Jan Waller, Florian Biss
*/
public final class TraceAggregationBuffer {
private final long bufferCreatedTimestamp;
private final TraceEventRecords aggregatedTrace;
private int countOfAggregatedTraces;
public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
this.aggregatedTrace = trace;
}
public void count() {
this.countOfAggregatedTraces++;
}
public long getBufferCreatedTimestamp() {
return this.bufferCreatedTimestamp;
}
public TraceEventRecords getTraceEventRecords() {
return this.aggregatedTrace;
}
public int getCount() {
return this.countOfAggregatedTraces;
}
}
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import java.io.Serializable;
import java.util.Comparator;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
/**
* @author Jan Waller, Florian Fittkau, Florian Biss
*/
public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable {
private static final long serialVersionUID = 8920766818232517L;
/**
* Creates a new instance of this class.
*/
public TraceComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
@Override
public int compare(final TraceEventRecords t1, final TraceEventRecords t2) {
final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents();
final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents();
if (recordsT1.length != recordsT2.length) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
for (int i = 0; i < recordsT1.length; i++) {
final AbstractTraceEvent recordT1 = recordsT1[i];
final AbstractTraceEvent recordT2 = recordsT2[i];
final int cmpClass = recordT1.getClass().getName()
.compareTo(recordT2.getClass().getName());
if (cmpClass != 0) {
return cmpClass;
}
if (recordT1 instanceof AbstractOperationEvent) {
final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature()
.compareTo(((AbstractOperationEvent) recordT2).getOperationSignature());
if (cmpSignature != 0) {
return cmpSignature;
}
}
if (recordT1 instanceof AfterOperationFailedEvent) {
final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo(
((AfterOperationFailedEvent) recordT2).getCause());
if (cmpError != 0) {
return cmpClass;
}
}
}
// All records match.
return 0;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker.traceReduction;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* This filter collects incoming traces for a specified amount of time.
* Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace.
* Only one specimen of these traces containing this information will be forwarded from this filter.
*
* Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others.
*
* @author Jan Waller, Florian Biss
*
* @since
*/
public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, TraceEventRecords> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
private long maxCollectionDurationInNs;
public TraceReductionFilter(final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer) {
this.trace2buffer = trace2buffer;
}
@Override
protected void execute5(final TraceEventRecords traceEventRecords) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
synchronized (this) {
this.processTimeoutQueue(timestampInNs);
}
}
final long timestamp = System.nanoTime();
synchronized (this) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) { // NOCS (DCL)
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
}
traceBuffer.count();
}
}
@Override
public void onIsPipelineHead() {
synchronized (this) {
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
record.setCount(buffer.getCount());
this.send(record);
}
this.trace2buffer.clear();
}
super.onIsPipelineHead();
}
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
this.send(record);
}
iterator.remove();
}
}
public long getMaxCollectionDuration() {
return this.maxCollectionDurationInNs;
}
public void setMaxCollectionDuration(final long maxCollectionDuration) {
this.maxCollectionDurationInNs = maxCollectionDuration;
}
public InputPort<Long> getTriggerInputPort() {
return this.triggerInputPort;
}
}
......@@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
}
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -2,7 +2,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -30,10 +33,10 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
private Thread clock2Thread;
private Thread workerThread;
private CountingFilter<IMonitoringRecord> recordCounter;
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
......@@ -71,7 +74,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
......
......@@ -3,7 +3,10 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -35,11 +38,10 @@ public class TraceReconstructionAnalysis extends Analysis {
private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter;
private File inputDir;
......@@ -73,7 +75,7 @@ public class TraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
......
......@@ -80,11 +80,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -5,7 +5,10 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
......@@ -16,6 +19,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
......@@ -120,17 +124,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
}
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
......@@ -147,11 +153,11 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
// EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
......@@ -160,34 +166,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
// // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>();
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(sysout);
// pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
// pipeline.addIntermediateStage(traceReconstructionFilter);
// pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(traceThroughputFilter);
// pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
......@@ -246,10 +253,10 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return throughputs;
}
public List<Long> getTraceDelays() {
public List<Long> getTraceThroughputs() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
for (ElementThroughputMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getThroughputs());
}
return throughputs;
}
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReductionWithThreads;
import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysisWith1Thread() {
this.performAnalysis(1);
}
@Test
public void performAnalysisWith2Threads() {
this.performAnalysis(2);
}
@Test
public void performAnalysisWith4Threads() {
this.performAnalysis(4);
}
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReductionAnalysisWithThreads analysis = new TcpTraceReductionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
// Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
// System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
// TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
// assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
//
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
// assertEquals(21001, analysis.getNumRecords());
assertEquals(21000001, analysis.getNumRecords());
}
public static void main(final String[] args) {
ChwWorkTcpTraceReductionAnalysisWithThreadsTest analysis = new ChwWorkTcpTraceReductionAnalysisWithThreadsTest();
analysis.before();
try {
analysis.performAnalysisWith1Thread();
} catch (Exception e) {
System.err.println(e);