Commit 08c655ce authored by Christian Wulf's avatar Christian Wulf

added logging.properties;

changed successor concept
parent 3757e902
......@@ -104,7 +104,7 @@ sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_blocks=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_parentheses_in_expressions=false
sp_cleanup.use_this_for_non_static_field_access=true
......
.handlers = java.util.logging.ConsoleHandler
.level= ALL
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
#teetime.level = ALL
\ No newline at end of file
......@@ -25,8 +25,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private int index;
private StageWithPort<?, ?> successor;
private boolean reschedulable;
public AbstractStage() {
......@@ -70,18 +68,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return null;
}
// @Override
// public void executeWithPorts() {
// CommittableQueue execute;
// do {
// // execute = this.next().execute2(this.outputElements);
// // execute = this.next().execute2(this.getOutputPort().pipe.getElements());
// this.next().executeWithPorts();
// } while (this.next().isReschedulable());
// }
// protected abstract void execute4(CommittableQueue<I> elements);
protected void execute4(final CommittableQueue<I> elements) {
throw new IllegalStateException(); // default implementation
}
......@@ -89,22 +75,18 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected abstract void execute5(I element);
protected final void send(final O element) {
// this.outputElements.addToTailUncommitted(element);
// this.outputElements.commit();
this.send(this.getOutputPort(), element);
}
this.getOutputPort().send(element);
protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
// CommittableQueue execute;
StageWithPort<O, ?> next = outputPort.getPipe().getTargetStage();
StageWithPort<?, ?> next = this.next();
// StageWithPort<?, ?> next = this.next();
do {
// execute = this.next().execute2(this.outputElements);
// execute = this.next().execute2(this.getOutputPort().pipe.getElements());
next.executeWithPorts();
// System.out.println("Executed " + this.next().getClass().getSimpleName());
} while (next.isReschedulable());
// } while (this.next().getInputPort().pipe.size() > 0);
// } while (execute.size() > 0);
}
// @Override
......@@ -139,16 +121,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
this.parentStage = parentStage;
}
@Override
public StageWithPort<?, ?> next() {
return this.successor;
}
@Override
public void setSuccessor(final StageWithPort<? super O, ?> successor) {
this.successor = successor;
}
@Override
public boolean isReschedulable() {
return this.reschedulable;
......
......@@ -20,6 +20,7 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
I element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
......
......@@ -3,16 +3,27 @@ package teetime.variant.methodcallWithPorts.framework.core;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
public class Pipeline<I, O> implements StageWithPort<I, O> {
private final String id;
/**
* A unique logger instance per stage instance
*/
protected Log logger;
private StageWithPort<I, ?> firstStage;
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
private StageWithPort<?, ?> successor;
private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage;
private int index;
......@@ -21,6 +32,15 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private boolean reschedulable;
private int firstStageIndex;
public Pipeline() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.id);
}
public String getId() {
return this.id;
}
public void setFirstStage(final StageWithPort<I, ?> stage) {
this.firstStage = stage;
}
......@@ -60,6 +80,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex];
firstStage.executeWithPorts();
......@@ -71,7 +92,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
StageWithPort<?, ?> currentStage = stage;
while (!currentStage.isReschedulable()) {
this.firstStageIndex++;
currentStage = currentStage.next();
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
currentStage = this.stages[this.firstStageIndex];
if (currentStage == null) { // loop reaches the last stage
this.setReschedulable(false);
this.cleanUp();
......@@ -120,17 +142,17 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
}
this.stages[this.stages.length - 1] = this.lastStage;
for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
// stage.setParentStage(this, i);
// stage.setListener(this);
}
// for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
// stage.setParentStage(this, i);
// stage.setListener(this);
// }
for (int i = 0; i < this.stages.length - 1; i++) {
StageWithPort stage = this.stages[i];
stage.setSuccessor(this.stages[i + 1]);
}
this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
// for (int i = 0; i < this.stages.length - 1; i++) {
// StageWithPort stage = this.stages[i];
// stage.setSuccessor(this.stages[i + 1]);
// }
// this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
for (StageWithPort<?, ?> stage : this.stages) {
stage.onStart();
......@@ -148,16 +170,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.parentStage = parentStage;
}
@Override
public StageWithPort<?, ?> next() {
throw new IllegalStateException();
}
@Override
public void setSuccessor(final StageWithPort<? super O, ?> successor) {
throw new IllegalStateException();
}
@Override
public boolean isReschedulable() {
return this.reschedulable;
......@@ -183,7 +195,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(null, i);
// stage.setListener(null);
stage.setSuccessor(null);
// stage.setSuccessor(null);
}
this.firstStage = null;
......
......@@ -10,26 +10,14 @@ public interface StageWithPort<I, O> {
void executeWithPorts();
// void executeWithPorts(Object element);
// O execute(Object element);
// CommittableQueue<O> execute2();
CommittableQueue<O> execute2(CommittableQueue<I> elements);
// SchedulingInformation getSchedulingInformation();
StageWithPort<?, ?> getParentStage();
void setParentStage(StageWithPort<?, ?> parentStage, int index);
// void setListener(OnDisableListener listener);
StageWithPort<?, ?> next();
void setSuccessor(StageWithPort<? super O, ?> successor);
boolean isReschedulable();
void onIsPipelineHead();
......
......@@ -2,9 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean();
private StageWithPort<T, ?> targetStage;
@Override
public boolean isClosed() {
......@@ -16,4 +19,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
}
@Override
public StageWithPort<T, ?> getTargetStage() {
return this.targetStage;
}
public void setTargetStage(StageWithPort<T, ?> targetStage) {
this.targetStage = targetStage;
}
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public interface IPipe<T> {
public abstract void add(T element);
......@@ -16,4 +18,6 @@ public interface IPipe<T> {
public abstract boolean isClosed();
public abstract StageWithPort<T, ?> getTargetStage();
}
......@@ -27,25 +27,13 @@ public class EndStage<T> implements StageWithPort<T, T> {
}
@Override
public StageWithPort getParentStage() {
public StageWithPort<?, ?> getParentStage() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setParentStage(final StageWithPort parentStage, final int index) {
// TODO Auto-generated method stub
}
@Override
public StageWithPort next() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setSuccessor(final StageWithPort<? super T, ?> successor) {
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
// TODO Auto-generated method stub
}
......
......@@ -16,13 +16,20 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
@Override
protected void execute5(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath());
this.logger.debug("fileExtension: " + fileExtension);
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
outputPort.send(file);
if (outputPort != null) {
this.send(outputPort, file);
}
}
public OutputPort<File> addFileExtension(final String fileExtension) {
public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1);
}
OutputPort<File> outputPort = new OutputPort<File>();
this.fileExtensions.put(fileExtension, outputPort);
this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'");
return outputPort;
}
......
$0=kieker.common.record.misc.KiekerMetadataRecord
$1=kieker.common.record.controlflow.OperationExecutionRecord
......@@ -19,12 +19,11 @@ import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.kieker.File2RecordFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
......@@ -38,11 +37,7 @@ import kieker.common.record.IMonitoringRecord;
*/
public class RecordReaderAnalysis extends Analysis {
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<IMonitoringRecord> timestampObjectsList = new LinkedList<IMonitoringRecord>();
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private Thread producerThread;
......@@ -51,15 +46,15 @@ public class RecordReaderAnalysis extends Analysis {
@Override
public void init() {
super.init();
Pipeline<File, Object> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
Pipeline<File, Object> producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage(producerPipeline));
}
private Pipeline<File, Object> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private Pipeline<File, Object> buildProducerPipeline() {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
File2RecordFilter file2RecordFilter = new File2RecordFilter(this.classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.timestampObjectsList);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final Pipeline<File, Object> pipeline = new Pipeline<File, Object>();
pipeline.setFirstStage(file2RecordFilter);
......@@ -67,6 +62,10 @@ public class RecordReaderAnalysis extends Analysis {
SingleElementPipe.connect(file2RecordFilter.getOutputPort(), collector.getInputPort());
SpScPipe<File> dirInputPipe = new SpScPipe<File>(1);
dirInputPipe.add(new File("src/test/data/bookstore-logs"));
file2RecordFilter.getInputPort().setPipe(dirInputPipe);
return pipeline;
}
......@@ -83,21 +82,8 @@ public class RecordReaderAnalysis extends Analysis {
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<IMonitoringRecord> getTimestampObjectsList() {
return this.timestampObjectsList;
public List<IMonitoringRecord> getElementCollection() {
return this.elementCollection;
}
}
......@@ -15,32 +15,30 @@
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import test.PerformanceTest;
import teetime.util.StopWatch;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class RecordReaderAnalysisTest extends PerformanceTest {
public class RecordReaderAnalysisTest {
@Test
public void performAnalysis(final int numThreads) {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@Test
public void performAnalysis() {
final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
@Override
public TimestampObject create() {
return new TimestampObject();
}
});
analysis.init();
this.stopWatch.start();
......@@ -51,7 +49,8 @@ public class RecordReaderAnalysisTest extends PerformanceTest {
analysis.onTerminate();
}
// this.timestampObjects = analysis.getTimestampObjectsList();
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment