Commit 723a3ae4 authored by Christian Wulf's avatar Christian Wulf

upgraded to kieker 1.10-snapshot;

added logs and analysis for eprints, kieker, kieker2
parent b5dc965a
......@@ -13,12 +13,12 @@
</attributes>
</classpathentry>
<classpathentry kind="src" path="conf"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>teetime</name>
<url>http://maven.apache.org</url>
<name>teetime</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>net.kieker-monitoring</groupId>
<artifactId>kieker</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>sonatype.oss.snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>net.kieker-monitoring</groupId>
<artifactId>kieker</artifactId>
<version>1.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
</dependencies>
</project>
......@@ -40,10 +40,6 @@ public class StatisticsUtil {
// utility class
}
public static void calculateAvg(final List<Long> durations) {
}
public static PerformanceResult printStatistics(final long overallDurationInNs, final List<TimestampObject> timestampObjects) {
PerformanceResult performanceResult = new PerformanceResult();
......
......@@ -26,7 +26,7 @@ public class Clock extends ProducerStage<Void, Long> {
}
// System.out.println("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
this.getOutputPort().send(this.getCurrentTimeInNs());
}
private void sleep(final long delayInMs) {
......
......@@ -2,6 +2,7 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
......@@ -17,8 +18,8 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
@Override
protected void execute5(final T element) {
Long trigger = this.triggerInputPort.receive();
if (trigger != null) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeThroughput();
this.resetTimestamp();
}
......@@ -34,9 +35,10 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
private void computeThroughput() {
long diffInNs = System.nanoTime() - this.timestamp;
long throughput = this.numPassedElements / diffInNs;
// this.throughputs.add(throughput);
this.logger.info("Throughput: " + throughput + " ns");
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
long throughputPerMs = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerMs);
// this.logger.info("Throughput: " + throughputPerMs + " elements/ms");
}
private void resetTimestamp() {
......@@ -49,7 +51,7 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> {
}
public InputPort<Long> getTriggerInputPort() {
return triggerInputPort;
return this.triggerInputPort;
}
}
......@@ -7,13 +7,13 @@ import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Cache;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
......@@ -32,6 +32,7 @@ public class TraceReconstructionAnalysis extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread producerThread;
private Thread clockThread;
private ClassNameRegistryRepository classNameRegistryRepository;
......@@ -39,29 +40,28 @@ public class TraceReconstructionAnalysis extends Analysis {
private CountingFilter<TraceEventRecords> traceCounter;
private ThroughputFilter<TraceEventRecords> throughputFilter;
private ThroughputFilter<IFlowRecord> throughputFilter;
private File inputDir;
@Override
public void init() {
super.init();
Pipeline<Void, Void> clockPipeline = this.buildClockPipeline();
this.producerThread = new Thread(new RunnableStage(clockPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockPipeline);
Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockStage);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
}
private Pipeline<Void, Void> buildClockPipeline() {
private StageWithPort<Void, Long> buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
clock.setIntervalDelayInMs(50);
Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(new EndStage<Void>());
return pipeline;
return clock;
}
private Pipeline<File, Void> buildProducerPipeline(final Pipeline<Void, Void> clockPipeline) {
private Pipeline<File, Void> buildProducerPipeline(final StageWithPort<Void, Long> clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
......@@ -79,8 +79,8 @@ public class TraceReconstructionAnalysis extends Analysis {
// isOperationExecutionRecordTraceIdPredicate);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
this.throughputFilter = new ThroughputFilter<IFlowRecord>();
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.throughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
......@@ -94,15 +94,16 @@ public class TraceReconstructionAnalysis extends Analysis {
SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort());
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.throughputFilter.getInputPort());
SingleElementPipe.connect(this.throughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort());
SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort());
SpScPipe.connect(clockPipeline.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
// fill input ports
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/Eprints-logs"));
dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir);
// create and configure pipeline
Pipeline<File, Void> pipeline = new Pipeline<File, Void>();
......@@ -111,8 +112,8 @@ public class TraceReconstructionAnalysis extends Analysis {
pipeline.addIntermediateStage(cache);
pipeline.addIntermediateStage(stringBufferFilter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.throughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(collector);
return pipeline;
......@@ -122,6 +123,7 @@ public class TraceReconstructionAnalysis extends Analysis {
public void start() {
super.start();
this.clockThread.start();
this.producerThread.start();
try {
......@@ -129,6 +131,7 @@ public class TraceReconstructionAnalysis extends Analysis {
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
......@@ -146,4 +149,12 @@ public class TraceReconstructionAnalysis extends Analysis {
public List<Long> getThroughputs() {
return this.throughputFilter.getThroughputs();
}
public File getInputDir() {
return inputDir;
}
public void setInputDir(File inputDir) {
this.inputDir = inputDir;
}
}
......@@ -17,12 +17,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
......@@ -48,8 +51,9 @@ public class TraceReconstructionAnalysisTest {
}
@Test
public void performAnalysis() {
public void performAnalysisWithEprintsLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
analysis.setInputDir(new File("src/test/data/Eprints-logs"));
analysis.init();
this.stopWatch.start();
......@@ -63,13 +67,68 @@ public class TraceReconstructionAnalysisTest {
assertEquals(50002, analysis.getNumRecords());
assertEquals(2, analysis.getNumTraces());
assertEquals(2, analysis.getElementCollection().size());
TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
public void performAnalysisWithKiekerLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
analysis.setInputDir(new File("src/test/data/kieker-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(1489902, analysis.getNumRecords());
assertEquals(24013, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
public void performAnalysisWithKieker2Logs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis();
analysis.setInputDir(new File("src/test/data/kieker2-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(17371, analysis.getNumRecords());
assertEquals(22, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(0, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment