Commit b55d8c18 authored by Christian Wulf's avatar Christian Wulf

added analysis configuration concept

parent 4bc98a06
statusListener(OnConsoleStatusListener)
root(WARN)
/*appender("FILE", FileAppender) {
file = "testFile.log"
append = true
......@@ -18,4 +16,6 @@ appender("CONSOLE", ConsoleAppender) {
}
}
root WARN, ["CONSOLE"]
//logger "teetime.variant.methodcallWithPorts.stage", DEBUG, ["CONSOLE"]
\ No newline at end of file
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Analysis {
private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class);
private Configuration configuration;
private final List<Thread> consumerThreads = new LinkedList<Thread>();
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
public void init() {
for (StageWithPort stage : this.configuration.getConsumerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.consumerThreads.add(thread);
}
for (StageWithPort stage : this.configuration.getFiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.finiteProducerThreads.add(thread);
}
for (StageWithPort stage : this.configuration.getInfiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.infiniteProducerThreads.add(thread);
}
}
public void start() {
// start analysis
for (Thread thread : this.consumerThreads) {
thread.start();
}
for (Thread thread : this.finiteProducerThreads) {
thread.start();
}
for (Thread thread : this.infiniteProducerThreads) {
thread.start();
}
// wait for the analysis to complete
try {
for (Thread thread : this.finiteProducerThreads) {
thread.join();
}
for (Thread thread : this.consumerThreads) {
thread.join();
}
} catch (InterruptedException e) {
LOGGER.error("Analysis has stopped unexpectedly", e);
for (Thread thread : this.finiteProducerThreads) {
thread.interrupt();
}
for (Thread thread : this.consumerThreads) {
thread.interrupt();
}
}
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
}
public Configuration getConfiguration() {
return this.configuration;
}
public void setConfiguration(final Configuration configuration) {
this.configuration = configuration;
}
}
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.LinkedList;
import java.util.List;
public class Configuration {
private final List<StageWithPort> consumerStages = new LinkedList<StageWithPort>();
private final List<StageWithPort> finiteProducerStages = new LinkedList<StageWithPort>();
private final List<StageWithPort> infiniteProducerStages = new LinkedList<StageWithPort>();
public List<StageWithPort> getConsumerStages() {
return this.consumerStages;
}
public List<StageWithPort> getFiniteProducerStages() {
return this.finiteProducerStages;
}
public List<StageWithPort> getInfiniteProducerStages() {
return this.infiniteProducerStages;
}
}
......@@ -19,9 +19,9 @@ import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
......@@ -33,22 +33,30 @@ import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class RecordReaderAnalysis extends Analysis {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private Thread producerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
@Override
public void init() {
Configuration configuration = this.buildConfiguration();
this.setConfiguration(configuration);
super.init();
}
private Configuration buildConfiguration() {
Configuration localConfiguration = new Configuration();
StageWithPort producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage(producerPipeline));
localConfiguration.getFiniteProducerStages().add(producerPipeline);
return localConfiguration;
}
private StageWithPort buildProducerPipeline() {
......@@ -69,19 +77,6 @@ public class RecordReaderAnalysis extends Analysis {
return pipeline;
}
@Override
public void start() {
super.start();
this.producerThread.start();
try {
this.producerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
public List<IMonitoringRecord> getElementCollection() {
return this.elementCollection;
}
......
......@@ -31,7 +31,7 @@ import kieker.common.record.misc.KiekerMetadataRecord;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class RecordReaderAnalysisTest {
......@@ -59,7 +59,6 @@ public class RecordReaderAnalysisTest {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(6541, analysis.getElementCollection().size());
......
......@@ -41,7 +41,7 @@ import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
......@@ -89,7 +89,6 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
int maxNumWaits = 0;
......
......@@ -36,7 +36,7 @@ import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
......@@ -97,7 +97,6 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
int maxNumWaits = 0;
......
......@@ -8,9 +8,9 @@ import java.util.List;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
......@@ -39,32 +39,66 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread tcpThread;
private Thread clockThread;
private Thread clock2Thread;
private Thread[] workerThreads;
private int numWorkerThreads;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory;
private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory;
private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory;
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>();
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReconstructionAnalysisWithThreads() {
super();
try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class));
this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class));
this.traceCounterFactory = new StageFactory(Counter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public void init() {
Configuration configuration = this.buildConfiguration();
this.setConfiguration(configuration);
super.init();
Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
}
Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
private Configuration buildConfiguration() {
Configuration localConfiguration = new Configuration();
Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
localConfiguration.getFiniteProducerStages().add(tcpPipeline);
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
localConfiguration.getInfiniteProducerStages().add(clockStage);
final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
localConfiguration.getInfiniteProducerStages().add(clock2Stage);
for (int i = 0; i < this.workerThreads.length; i++) {
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
for (int i = 0; i < this.numWorkerThreads; i++) {
StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
localConfiguration.getConsumerStages().add(pipeline);
}
return localConfiguration;
}
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
......@@ -125,35 +159,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
}
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final StageFactory<Counter<IMonitoringRecord>> recordCounterFactory;
private final StageFactory<ElementDelayMeasuringStage<IMonitoringRecord>> recordDelayFilterFactory;
private final StageFactory<ElementThroughputMeasuringStage<IMonitoringRecord>> recordThroughputFilterFactory;
private final StageFactory<InstanceCounter<IMonitoringRecord, TraceMetadata>> traceMetadataCounterFactory;
private final StageFactory<TraceReconstructionFilter> traceReconstructionFilterFactory;
private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory;
private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory;
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>();
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReconstructionAnalysisWithThreads() {
try {
this.recordCounterFactory = new StageFactory(Counter.class.getConstructor());
this.recordDelayFilterFactory = new StageFactory(ElementDelayMeasuringStage.class.getConstructor());
this.recordThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
this.traceMetadataCounterFactory = new StageFactory(InstanceCounter.class.getConstructor(Class.class));
this.traceReconstructionFilterFactory = new StageFactory(TraceReconstructionFilter.class.getConstructor(ConcurrentHashMapWithDefault.class));
this.traceCounterFactory = new StageFactory(Counter.class.getConstructor());
this.traceThroughputFilterFactory = new StageFactory(ElementThroughputMeasuringStage.class.getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(e);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
}
private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline,
final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
// create stages
......@@ -206,31 +211,6 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
return pipeline;
}
@Override
public void start() {
super.start();
this.clockThread.start();
this.clock2Thread.start();
this.tcpThread.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
this.tcpThread.join();
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment