Commit 3620961a authored by Christian Wulf's avatar Christian Wulf

replaced logging in SpScPipe by #waits

parent b85916cc
......@@ -57,7 +57,7 @@
<build>
<plugins>
<!-- we want JDK 1.6 source and binary compatiblility -->
<!-- we want JDK 1.6 source and binary compatibility -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
......
......@@ -44,16 +44,19 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected abstract void execute5(I element);
/**
* Sends the <code>element</code> using the default output port
* Sends the given <code>element</code> using the default output port
*
* @param element
* @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/
protected final void send(final O element) {
this.send(this.getOutputPort(), element);
protected final boolean send(final O element) {
return this.send(this.getOutputPort(), element);
}
protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
protected final boolean send(final OutputPort<O> outputPort, final O element) {
if (!outputPort.send(element)) {
return false;
}
// StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
......@@ -61,6 +64,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
do {
next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
} while (next.isReschedulable());
return true;
}
// public void disable() {
......
......@@ -14,8 +14,13 @@ public class OutputPort<T> {
*/
private StageWithPort<?, ?> cachedTargetStage;
public void send(final T element) {
this.pipe.add(element);
/**
*
* @param element
* @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/
public boolean send(final T element) {
return this.pipe.add(element);
}
public IPipe<T> getPipe() {
......
......@@ -5,7 +5,7 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal;
public interface IPipe<T> {
void add(T element);
boolean add(T element);
T removeLast();
......
......@@ -26,8 +26,9 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
}
@Override
public void add(final T element) {
public boolean add(final T element) {
this.elements.put(this.tail++, element);
return true;
}
@Override
......
......@@ -25,8 +25,8 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
}
@Override
public void add(final T element) {
this.elements.offer(element);
public boolean add(final T element) {
return this.elements.offer(element);
}
@Override
......
......@@ -21,9 +21,10 @@ public class Pipe<T> extends IntraThreadPipe<T> {
* @see teetime.examples.throughput.methodcall.IPipe#add(T)
*/
@Override
public void add(final T element) {
public boolean add(final T element) {
this.elements.addToTailUncommitted(element);
this.elements.commit();
return true;
}
/*
......
......@@ -15,8 +15,9 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> {
}
@Override
public void add(final T element) {
public boolean add(final T element) {
this.element = element;
return true;
}
@Override
......
......@@ -15,8 +15,9 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal;
public class SpScPipe<T> extends AbstractPipe<T> {
private final Queue<T> queue;
private int maxSize;
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
// statistics
private int numWaits;
public SpScPipe(final int capacity) {
ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT);
......@@ -32,9 +33,16 @@ public class SpScPipe<T> extends AbstractPipe<T> {
}
@Override
public void add(final T element) {
this.queue.offer(element);
this.maxSize = Math.max(this.queue.size(), this.maxSize);
public boolean add(final T element) {
// this.maxSize = Math.max(this.queue.size(), this.maxSize);
// BETTER introduce a QueueIsFullStrategy
while (!this.queue.offer(element)) {
this.numWaits++;
Thread.yield();
}
return true;
}
@Override
......@@ -58,8 +66,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
}
// BETTER find a solution w/o any thread-safe code in this stage
public synchronized int getMaxSize() {
return this.maxSize;
public synchronized int getNumWaits() {
return this.numWaits;
}
@Override
......
......@@ -25,13 +25,14 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
}
@Override
public void add(final T element) {
public boolean add(final T element) {
if (this.lastFreeIndex == this.elements.length) {
// if (this.lastFreeIndex == this.elements.getCapacity()) {
this.elements = this.grow();
}
this.elements[this.lastFreeIndex++] = element;
// this.elements.put(this.lastFreeIndex++, element);
return true;
}
@Override
......
......@@ -31,6 +31,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
@Override
public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) {
final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
outputPort.send(element);
return true;
......
......@@ -114,11 +114,11 @@ public class TcpTraceReconstruction extends Analysis {
@Override
public void onTerminate() {
int maxSize = 0;
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
super.onTerminate();
}
......
......@@ -148,11 +148,11 @@ public class TcpTraceReduction extends Analysis {
@Override
public void onTerminate() {
int maxSize = 0;
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max size of TcpRelayPipes: " + maxSize);
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
super.onTerminate();
}
......
......@@ -99,11 +99,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
analysis.onTerminate();
}
int maxSize = 0;
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
maxSize = Math.max(maxSize, pipe.getMaxSize());
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("Max size of tcp-relay pipe: " + maxSize);
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
......
......@@ -83,7 +83,7 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest {
analysis.onTerminate();
}
System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize());
System.out.println("#waits of tcp-relay pipe: " + analysis.getTcpRelayPipe().getNumWaits());
// System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas());
// System.out.println("Max #trace created: " + analysis.getMaxElementsCreated());
System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment