Commit b5dc965a authored by Christian Wulf's avatar Christian Wulf

added ThroughputFilter

parent bda33879
package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private long numPassedElements;
private long timestamp;
private final List<Long> throughputs = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
Long trigger = this.triggerInputPort.receive();
if (trigger != null) {
this.computeThroughput();
this.resetTimestamp();
}
this.numPassedElements++;
this.send(element);
}
@Override
public void onStart() {
this.resetTimestamp();
super.onStart();
}
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long throughput = this.numPassedElements / diffInNs;
// this.throughputs.add(throughput);
this.logger.info("Throughput: " + throughput + " ns");
}
private void resetTimestamp() {
this.numPassedElements = 0;
this.timestamp = System.nanoTime();
}
public List<Long> getThroughputs() {
return this.throughputs;
}
public InputPort<Long> getTriggerInputPort() {
return triggerInputPort;
}
}
......@@ -108,14 +108,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE
@Override
public void onIsPipelineHead() {
this.logger.info("traceId2trace: " + TraceReconstructionFilter.traceId2trace.keySet());
Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator();
while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next();
this.put(traceBuffer);
iterator.remove();
}
super.onIsPipelineHead();
}
......
......@@ -10,9 +10,12 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Cache;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -36,14 +39,29 @@ public class TraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<TraceEventRecords> throughputFilter;
@Override
public void init() {
super.init();
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline();
Pipeline<Void, Void> clockPipeline = this.buildClockPipeline();
this.producerThread = new Thread(new RunnableStage(clockPipeline));
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockPipeline);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
}
private Pipeline<File, Void> buildProducerPipeline() {
private Pipeline<Void, Void> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(new EndStage<Void>());
return pipeline;
}
private Pipeline<File, Void> buildProducerPipeline(final Pipeline<Void, Void> clockPipeline) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
......@@ -62,8 +80,8 @@ public class TraceReconstructionAnalysis extends Analysis {
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.throughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
// configure stages
......@@ -77,9 +95,12 @@ public class TraceReconstructionAnalysis extends Analysis {
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.throughputFilter.getInputPort());
SingleElementPipe.connect(this.throughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort());
SpScPipe.connect(clockPipeline.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
// fill input ports
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs"));
......@@ -91,6 +112,7 @@ public class TraceReconstructionAnalysis extends Analysis {
pipeline.addIntermediateStage(stringBufferFilter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.throughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(collector);
return pipeline;
......@@ -120,4 +142,8 @@ public class TraceReconstructionAnalysis extends Analysis {
public int getNumTraces() {
return this.traceCounter.getNumElementsPassed();
}
public List<Long> getThroughputs() {
return this.throughputFilter.getThroughputs();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment