Commit 029dc624 authored by Christian Wulf's avatar Christian Wulf

added dur from beginning of cache

parent 6d154c78
.handlers = java.util.logging.ConsoleHandler
.level= ALL
.level = ALL
java.util.logging.ConsoleHandler.level = INFO
java.util.logging.ConsoleHandler.level = WARNING
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
......
......@@ -17,6 +17,9 @@ package kieker.analysis.stage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.InputPort;
......@@ -45,9 +48,13 @@ public class CacheFilter extends AbstractFilterPlugin {
@Override
public void terminate(final boolean error) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (final Object data : this.cache) {
super.deliver(EmptyPassOnFilter.OUTPUT_PORT_NAME, data);
}
stopWatch.end();
System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.terminate(error);
}
......
......@@ -20,7 +20,10 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
I element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
......
......@@ -2,7 +2,9 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
public class Cache<T> extends ConsumerStage<T, T> {
......@@ -17,9 +19,13 @@ public class Cache<T> extends ConsumerStage<T, T> {
@Override
public void onIsPipelineHead() {
this.logger.debug("Emitting cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
}
stopWatch.end();
System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
}
......
......@@ -9,6 +9,7 @@ public class CountingFilter<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
this.numElementsPassed++;
// this.logger.info("count: " + this.numElementsPassed);
this.send(element);
}
......
......@@ -35,12 +35,13 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
// long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
// long throughputPerMs = this.numPassedElements / diffInMs;
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s");
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
// long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
// long throughputPerSec = this.numPassedElements / diffInSec;
}
private void resetTimestamp() {
......
......@@ -52,7 +52,9 @@ public class Merger<T> extends ConsumerStage<T, T> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
this.execute5(null);
......
......@@ -56,7 +56,7 @@ public class TraceReconstructionAnalysis extends Analysis {
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(50);
clock.setIntervalDelayInMs(100);
return clock;
}
......@@ -64,19 +64,12 @@ public class TraceReconstructionAnalysis extends Analysis {
private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
// final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate(
// false, null);
// create stages
final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository);
this.recordCounter = new CountingFilter<IMonitoringRecord>();
final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>();
final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>();
// final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>(
// isIMonitoringRecordInRange);
// final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>(
// isOperationExecutionRecordTraceIdPredicate);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ThroughputFilter<IFlowRecord>();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment