Commit 4eef37c8 authored by Christian Wulf's avatar Christian Wulf

added DummyPipe;

removed code for leading zeros
parent 281780ef
......@@ -26,4 +26,4 @@ logger "teetime.variant.methodcallWithPorts.stage", INFO
logger "teetime.variant.methodcallWithPorts.framework.core.pipe", INFO
logger "teetime.variant.methodcallWithPorts.examples.kiekerdays.TimingsReader", TRACE, ["FILE"]
\ No newline at end of file
logger "util.TimingsReader", TRACE, ["FILE"]
\ No newline at end of file
......@@ -8,6 +8,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public abstract class AbstractStage implements StageWithPort {
......@@ -59,6 +60,18 @@ public abstract class AbstractStage implements StageWithPort {
public void onStart() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
}
@SuppressWarnings("unchecked")
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
if (null == outputPort.getPipe()) { // if port is unconnected
this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
outputPort.setPipe(new DummyPipe());
}
}
}
protected void onFinished() {
......
......@@ -4,7 +4,7 @@ public class OutputPort<T> extends AbstractPort<T> {
/**
* Performance cache: Avoids the following method chain
*
*
* <pre>
* this.getPipe().getTargetPort().getOwningStage()
* </pre>
......@@ -16,7 +16,7 @@ public class OutputPort<T> extends AbstractPort<T> {
}
/**
*
*
* @param element
* @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/
......@@ -33,9 +33,7 @@ public class OutputPort<T> extends AbstractPort<T> {
}
public void sendSignal(final Signal signal) {
if (this.pipe != null) { // if the output port is connected with a pipe
this.pipe.setSignal(signal);
}
this.pipe.setSignal(signal);
}
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
* A pipe implementation used to connect unconnected output ports.
*
* @author Christian Wulf
*
*/
@SuppressWarnings("rawtypes")
public final class DummyPipe implements IPipe {
@Override
public boolean add(final Object element) {
return false;
}
@Override
public Object removeLast() {
return null;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public int size() {
return 0;
}
@Override
public Object readLast() {
return null;
}
@Override
public InputPort<Object> getTargetPort() {
return null;
}
@Override
public void setTargetPort(final InputPort targetPort) {}
@Override
public void setSignal(final Signal signal) {}
@Override
public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
}
......@@ -170,13 +170,12 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
}
}
private void createAndSendRecord(final ByteBuffer buffer) {
private final void createAndSendRecord(final ByteBuffer buffer) {
final int clazzid = buffer.getInt();
final long loggingTimestamp = buffer.getLong();
final IMonitoringRecord record;
try { // NOCS (Nested try-catch)
try {
// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
this.send(this.outputPort, record);
} catch (final MonitoringRecordException ex) {
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
package teetime.variant.methodcallWithPorts.stage.kieker;
import java.io.IOException;
import java.net.InetSocketAddress;
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
package util;
import java.io.BufferedOutputStream;
import java.io.File;
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
package util;
import java.io.File;
import java.io.IOException;
......@@ -9,8 +9,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.StatisticsUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharSource;
......@@ -40,9 +38,6 @@ public class TimingsReader {
durationsInNs.add(timing);
}
LOGGER.trace("Removing leading zeros...");
StatisticsUtil.removeLeadingZeroThroughputs(durationsInNs);
LOGGER.trace("Calculating quantiles...");
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(durationsInNs);
LOGGER.info(StatisticsUtil.getQuantilesString(quintiles));
......
......@@ -3,6 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
public class TcpTraceLogging extends Analysis {
......@@ -29,15 +30,9 @@ public class TcpTraceLogging extends Analysis {
}
private StageWithPort buildTcpPipeline() {
TCPReaderSink tcpReader = new TCPReaderSink();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
//
// SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
//
// // create and configure pipeline
// Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
// pipeline.setFirstStage(tcpReader);
// pipeline.setLastStage(endStage);
// TCPReaderSink tcpReader = new TCPReaderSink();
TCPReader tcpReader = new TCPReader();
return tcpReader;
}
......
Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10
Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment