Commit 6b16890a authored by Christian Wulf's avatar Christian Wulf

fixed bug in Clock;

added SysOutFilter
parent 1aaed454
.handlers = java.util.logging.ConsoleHandler
.level = ALL
.level = WARNING
java.util.logging.ConsoleHandler.level = WARNING
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
\ No newline at end of file
#teetime.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = ALL
teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
\ No newline at end of file
package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import kieker.common.record.IMonitoringRecord;
public class SysOutFilter<T> extends ConsumerStage<T, T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final IPipe<IMonitoringRecord> pipe;
public SysOutFilter(final IPipe<IMonitoringRecord> pipe) {
this.pipe = pipe;
}
@Override
protected void execute5(final T element) {
Long timestamp = this.triggerInputPort.receive();
if (timestamp != null) {
// this.logger.info("pipe.size: " + this.pipe.size());
System.out.println("pipe.size: " + this.pipe.size());
}
this.send(element);
}
public InputPort<Long> getTriggerInputPort() {
return this.triggerInputPort;
}
}
......@@ -25,8 +25,8 @@ public class Clock extends ProducerStage<Void, Long> {
this.sleep(this.intervalDelayInMs);
}
// System.out.println("Emitting timestamp");
this.getOutputPort().send(this.getCurrentTimeInNs());
this.logger.debug("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
}
private void sleep(final long delayInMs) {
......
......@@ -15,7 +15,7 @@ public class Relay<T> extends AbstractStage<T, T> {
if (null == element) {
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
System.out.println("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size());
assert 0 == this.getInputPort().getPipe().size();
}
return;
......@@ -25,7 +25,7 @@ public class Relay<T> extends AbstractStage<T, T> {
@Override
public void onIsPipelineHead() {
System.out.println("onIsPipelineHead");
this.logger.debug("onIsPipelineHead");
if (this.getInputPort().getPipe().isClosed()) {
this.setReschedulable(false);
}
......
......@@ -11,6 +11,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.ThroughputFilter;
......@@ -39,28 +40,28 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
return clock;
}
private StageWithPort<Void, Long> buildClock2Pipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(2000);
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
return clock;
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
......
......@@ -50,6 +50,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
@Test
public void performAnalysis() {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.setNumWorkerThreads(2);
analysis.init();
this.stopWatch.start();
......@@ -60,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
analysis.onTerminate();
}
System.out.println("Max size of pipe: " + analysis.getTcpRelayPipe().getMaxSize());
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs());
System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms");
......
......@@ -25,6 +25,7 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
......@@ -32,7 +33,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private Thread tcpThread;
private Thread clockThread;
private Thread clock2Thread;
private Thread workerThread;
private Thread[] workerThreads;
private CountingFilter<IMonitoringRecord> recordCounter;
......@@ -42,6 +43,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private ThroughputFilter<TraceEventRecords> traceThroughputFilter;
private SpScPipe<IMonitoringRecord> tcpRelayPipe;
private int numWorkerThreads;
@Override
public void init() {
......@@ -49,15 +51,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline();
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage(pipeline));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
......@@ -73,21 +79,21 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline() {
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setIntervalDelayInMs(1000);
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
return clock;
}
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
private StageWithPort<Void, Long> buildClock2Pipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(2000);
return clock;
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private Pipeline<IMonitoringRecord, TraceEventRecords> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
// create stages
......@@ -99,31 +105,43 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>();
this.traceCounter = new CountingFilter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1);
SysOutFilter<IMonitoringRecord> sysout = new SysOutFilter<IMonitoringRecord>(this.tcpRelayPipe);
// // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort());
// // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
// SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort());
// SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort());
SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort());
// // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
// // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10);
// SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
// Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(sysout);
// pipeline.addIntermediateStage(instanceOfFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
// pipeline.addIntermediateStage(traceReconstructionFilter);
// pipeline.addIntermediateStage(this.traceThroughputFilter);
// pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
}
......@@ -134,17 +152,18 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
this.tcpThread.start();
// this.clockThread.start();
this.clock2Thread.start();
this.workerThread.start();
// this.clock2Thread.start();
try {
this.tcpThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
this.workerThread.join();
this.tcpThread.join();
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
......@@ -176,4 +195,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return this.tcpRelayPipe;
}
public int getNumWorkerThreads() {
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment