Commit 9a2e265c authored by Christian Wulf's avatar Christian Wulf

added delay and throughput measuring stages

parent 79d4e679
......@@ -8,5 +8,5 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = ALL
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
\ No newline at end of file
teetime.variant.methodcallWithPorts.stage.level = WARNING
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -81,8 +81,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// StageWithPort<?, ?> headStage = this.currentHeads.next();
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
......@@ -187,8 +185,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.firstStage = null;
this.intermediateStages.clear();
this.lastStage = null;
System.out.println("cleaned up");
}
}
......@@ -25,7 +25,7 @@ public class Clock extends ProducerStage<Void, Long> {
this.sleep(this.intervalDelayInMs);
}
this.logger.debug("Emitting timestamp");
// this.logger.debug("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
}
......
package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long lastTimestampInNs;
private final List<Long> delays = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementDelay(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
}
@Override
public void onStart() {
this.resetTimestamp(System.nanoTime());
super.onStart();
}
private void computeElementDelay(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (this.numPassedElements > 0) {
long delayInNsPerElement = diffInNs / this.numPassedElements;
this.delays.add(delayInNsPerElement);
this.logger.info("Delay: " + delayInNsPerElement + " time units/element");
this.resetTimestamp(timestampInNs);
}
}
private void resetTimestamp(final Long timestampInNs) {
this.numPassedElements = 0;
this.lastTimestampInNs = timestampInNs;
}
public List<Long> getDelays() {
return this.delays;
}
public InputPort<Long> getTriggerInputPort() {
return this.triggerInputPort;
}
}
......@@ -2,17 +2,16 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ThroughputFilter<T> extends ConsumerStage<T, T> {
public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long timestamp;
private long lastTimestampInNs;
private final List<Long> throughputs = new LinkedList<Long>();
......@@ -20,8 +19,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
protected void execute5(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeThroughput();
this.resetTimestamp();
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
......@@ -29,24 +27,24 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
@Override
public void onStart() {
this.resetTimestamp();
this.resetTimestamp(System.nanoTime());
super.onStart();
}
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (diffInNs > 0) {
long throughputInNsPerElement = this.numPassedElements / diffInNs;
this.throughputs.add(throughputInNsPerElement);
this.logger.info("Throughput: " + throughputInNsPerElement + " elements/time unit");
// long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
// long throughputPerSec = this.numPassedElements / diffInSec;
this.resetTimestamp(timestampInNs);
}
}
private void resetTimestamp() {
private void resetTimestamp(final Long timestampInNs) {
this.numPassedElements = 0;
this.timestamp = System.nanoTime();
this.lastTimestampInNs = timestampInNs;
}
public List<Long> getThroughputs() {
......
......@@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l))));
}
......@@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -61,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisTest {
}
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
......@@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -76,7 +76,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
}
@Test
......@@ -102,7 +102,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
......@@ -130,7 +130,7 @@ public class NieWorkKiekerTraceReconstructionAnalysisTest {
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
System.out.println("Mean throughput: " + quintiles.get(0.5) + " time units/element");
}
}
......@@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -34,8 +34,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private ElementThroughputMeasuringStage<IFlowRecord> recordThroughputFilter;
private ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter;
@Override
public void init() {
......@@ -70,9 +70,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
this.recordThroughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
......
......@@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.stage.Cache;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -40,7 +40,7 @@ public class TraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> throughputFilter;
private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter;
private File inputDir;
......@@ -72,7 +72,7 @@ public class TraceReconstructionAnalysis extends Analysis {
final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ThroughputFilter<IFlowRecord>();
this.throughputFilter = new ElementThroughputMeasuringStage<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
......
......@@ -80,8 +80,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordDelays());
System.out.println("Median record delay: " + recordQuintiles.get(0.5) + " time units/record");
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
// assertEquals(1000, analysis.getNumTraces());
assertEquals(1000000, analysis.getNumTraces());
......
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
......@@ -12,10 +15,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -35,13 +38,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private Thread clock2Thread;
private Thread[] workerThreads;
private CountingFilter<IMonitoringRecord> recordCounter;
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<IFlowRecord> recordThroughputFilter;
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
private int numWorkerThreads;
......@@ -81,6 +77,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
......@@ -93,18 +90,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return pipeline;
}
private static class StageFactory<T extends StageWithPort<?, ?>> {
private final Constructor<T> constructor;
private final List<T> stages = new ArrayList<T>();
public StageFactory(final Constructor<T> constructor) {
this.constructor = constructor;
}
public T create(final Object... initargs) {
try {
T stage = this.constructor.newInstance(initargs);
this.stages.add(stage);
return stage;
} catch (InstantiationException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
} catch (IllegalArgumentException e) {
throw new IllegalStateException(e);
} catch (InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
public List<T> getStages() {
return this.stages;
}
}
private final StageFactory<CountingFilter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<CountingFilter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.traceCounterFactory = new StageFactory(CountingFilter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
this.recordCounter = new CountingFilter<IMonitoringRecord>();
CountingFilter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>();
ElementDelayMeasuringStage<IMonitoringRecord> recordThroughputFilter = this.recordThroughputFilterFactory.create();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
CountingFilter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementDelayMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
// EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
......@@ -118,7 +163,9 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), recordThroughputFilter.getInputPort());
SingleElementPipe.connect(recordThroughputFilter.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
......@@ -127,15 +174,15 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
// SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>();
pipeline.setFirstStage(relay);
// pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(recordThroughputFilter);
// pipeline.addIntermediateStage(sysout);
// pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
......@@ -151,7 +198,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
super.start();
this.tcpThread.start();
// this.clockThread.start();
this.clockThread.start();
// this.clock2Thread.start();
for (Thread workerThread : this.workerThreads) {
......@@ -176,19 +223,35 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
public int getNumRecords() {
return this.recordCounter.getNumElementsPassed();
int sum = 0;
for (CountingFilter<IMonitoringRecord> stage : this.recordCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
int sum = 0;
for (CountingFilter<TraceEventRecords> stage : this.traceCounterFactory.getStages()) {
sum += stage.getNumElementsPassed();
}
return sum;
}
public List<Long> getRecordThroughputs() {
return this.recordThroughputFilter.getThroughputs();
public List<Long> getRecordDelays() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<IMonitoringRecord> stage : this.recordThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
return throughputs;
}
public List<Long> getTraceThroughputs() {
return this.traceThroughputFilter.getThroughputs();
public List<Long> getTraceDelays() {
List<Long> throughputs = new LinkedList<Long>();
for (ElementDelayMeasuringStage<TraceEventRecords> stage : this.traceThroughputFilterFactory.getStages()) {
throughputs.addAll(stage.getDelays());
}
return throughputs;
}
public SpScPipe<IMonitoringRecord> getTcpRelayPipe() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment