Commit db98abb3 authored by Christian Wulf's avatar Christian Wulf

added sysout maxSize for SpSc pipe in KiekerDays experiments

parent daf57da1
......@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.level = ALL
teetime.variant.methodcallWithPorts.framework.core.level = FINE
teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.framework.core.level = FINE
#teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
......@@ -25,9 +26,11 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReconstruction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000;
private static final int TCP_RELAY_MAX_SIZE = 100000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>();
private Thread tcpThread;
private Thread[] workerThreads;
......@@ -62,8 +65,6 @@ public class TcpTraceReconstruction extends Analysis {
return pipeline;
}
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
......@@ -73,7 +74,8 @@ public class TcpTraceReconstruction extends Analysis {
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
......@@ -109,6 +111,16 @@ public class TcpTraceReconstruction extends Analysis {
}
}
@Override
public void onTerminate() {
int maxSize = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
super.onTerminate();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
......@@ -31,9 +32,12 @@ import kieker.common.record.flow.IFlowRecord;
public class TcpTraceReduction extends Analysis {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int TCP_RELAY_MAX_SIZE = 500000;
private static final int TCP_RELAY_MAX_SIZE = 100000;
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>();
private Thread tcpThread;
private Thread clockThread;
......@@ -87,9 +91,6 @@ public class TcpTraceReduction extends Analysis {
return pipeline;
}
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage) {
// create stages
......@@ -101,7 +102,8 @@ public class TcpTraceReduction extends Analysis {
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
......@@ -143,6 +145,16 @@ public class TcpTraceReduction extends Analysis {
this.clockThread.interrupt();
}
@Override
public void onTerminate() {
int maxSize = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
super.onTerminate();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment