Commit 98152bf5 authored by Christian Wulf's avatar Christian Wulf

added tcp load driver for kieker records

parent 13a75c42
......@@ -3,12 +3,12 @@
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s) %6$s %n
#teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.level = ALL
#teetime.variant.methodcallWithPorts.framework.core.level = FINE
#teetime.variant.methodcallWithPorts.stage.level = INFO
teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
teetime.variant.methodcallWithPorts.examples.kiekerdays.level = FINE
......@@ -18,7 +18,7 @@ package teetime.variant.explicitScheduling.framework.core;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class Analysis {
......@@ -32,6 +32,6 @@ public class Analysis {
}
public void onTerminate() {
// System.out.println("Analysis stopped.");
System.out.println("Analysis stopped.");
}
}
package teetime.variant.methodcallWithPorts.stage.explorviz;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final byte HostApplicationMetaDataRecord_CLAZZ_ID = 0;
private static final byte BeforeOperationEventRecord_CLAZZ_ID = 1;
private static final byte AfterOperationEventRecord_CLAZZ_ID = 3;
private static final byte StringRegistryRecord_CLAZZ_ID = 4;
private static final int HostApplicationMetaDataRecord_BYTE_LENGTH = 16;
private static final int BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH = 36;
private static final int AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH = 20;
private final ILookup<String> stringRegistry = new Lookup<String>();
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private int port1 = 10133;
public final int getPort1() {
return this.port1;
}
public final void setPort1(final int port1) {
this.port1 = port1;
}
@Override
protected void execute() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port1));
if (super.logger.isDebugEnabled()) {
super.logger.debug("Listening on port " + this.port1);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
// System.out.println("Reading, remaining:" + buffer.remaining());
try {
while (buffer.hasRemaining()) {
buffer.mark();
this.messagesfromByteArray(buffer);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
// System.out.println("Underflow, remaining:" + buffer.remaining());
buffer.compact();
}
}
// System.out.println("Channel closing...");
socketChannel.close();
// END also loop this one?
} catch (final IOException ex) {
super.logger.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (super.logger.isDebugEnabled()) {
super.logger.debug("Failed to close TCP connection!", e);
}
}
}
this.setReschedulable(false);
}
}
private final void messagesfromByteArray(final ByteBuffer buffer) {
final byte clazzId = buffer.get();
switch (clazzId) {
case HostApplicationMetaDataRecord_CLAZZ_ID: {
if (buffer.remaining() >= HostApplicationMetaDataRecord_BYTE_LENGTH) {
this.readInHostApplicationMetaData(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case BeforeOperationEventRecord_CLAZZ_ID: {
if (buffer.remaining() >= BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH) {
this.readInBeforeOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case AfterOperationEventRecord_CLAZZ_ID: {
if (buffer.remaining() >= AfterOperationEventRecord_COMPRESSED_BYTE_LENGTH) {
this.readInAfterOperationEvent(buffer);
break;
}
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
case StringRegistryRecord_CLAZZ_ID: {
int mapId = 0;
int stringLength = 0;
if (buffer.remaining() >= 8) {
mapId = buffer.getInt();
stringLength = buffer.getInt();
} else {
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
if (buffer.remaining() >= stringLength) {
final byte[] stringByteArray = new byte[stringLength];
buffer.get(stringByteArray);
this.stringRegistry.set(new String(stringByteArray), mapId);
this.checkWaitingMessages();
} else {
buffer.position(buffer.position() - 9);
buffer.compact();
return;
}
break;
}
default: {
System.out.println("unknown class id " + clazzId + " at offset "
+ (buffer.position() - 1));
return;
}
}
}
private final void readInHostApplicationMetaData(final ByteBuffer buffer) {
final int systemnameId = buffer.getInt();
final int ipaddressId = buffer.getInt();
final int hostnameId = buffer.getInt();
final int applicationId = buffer.getInt();
// just consume; not necessary for kieker
}
private final void readInBeforeOperationEvent(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
final int objectId = buffer.getInt();
final int operationId = buffer.getInt();
final int clazzId = buffer.getInt();
final int interfaceId = buffer.getInt();
final String operation = this.stringRegistry.get(operationId);
final String clazz = this.stringRegistry.get(clazzId);
if (operation == null || clazz == null) {
this.putInWaitingMessages(buffer, BeforeOperationEventRecord_COMPRESSED_BYTE_LENGTH + 1);
return;
}
final IMonitoringRecord record = new BeforeOperationEvent(timestamp, traceId, orderIndex, operation, clazz);
this.send(this.outputPort, record);
}
private final void readInAfterOperationEvent(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
final IMonitoringRecord record = new AfterOperationEvent(timestamp, traceId, orderIndex, null, null);
this.send(this.outputPort, record);
}
private final void putInWaitingMessages(final ByteBuffer buffer, final int length) {
final byte[] message = new byte[length];
buffer.position(buffer.position() - length);
buffer.get(message);
this.waitingForStringMessages.add(message);
}
private final void checkWaitingMessages() {
final List<byte[]> localWaitingList = new ArrayList<byte[]>();
for (final byte[] waitingMessage : this.waitingForStringMessages) {
localWaitingList.add(waitingMessage);
}
this.waitingForStringMessages.clear();
for (final byte[] waitingMessage : localWaitingList) {
final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage);
final byte waitingMessageClazzId = buffer.get();
switch (waitingMessageClazzId) {
case HostApplicationMetaDataRecord_CLAZZ_ID:
this.readInHostApplicationMetaData(buffer);
break;
case BeforeOperationEventRecord_CLAZZ_ID:
this.readInBeforeOperationEvent(buffer);
break;
case AfterOperationEventRecord_CLAZZ_ID:
this.readInAfterOperationEvent(buffer);
break;
default:
break;
}
}
}
}
......@@ -41,9 +41,9 @@ import kieker.common.util.registry.Lookup;
/**
* This is a reader which reads the records from a TCP port.
*
*
* @author Jan Waller, Nils Christian Ehmke
*
*
* @since 1.10
*/
public class TCPReader extends ProducerStage<IMonitoringRecord> {
......@@ -140,17 +140,7 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
try {
while (buffer.hasRemaining()) {
buffer.mark();
final int clazzid = buffer.getInt();
final long loggingTimestamp = buffer.getLong();
final IMonitoringRecord record;
try { // NOCS (Nested try-catch)
// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
this.send(this.outputPort, record);
} catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex);
}
this.createAndSendRecord(buffer);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
......@@ -180,10 +170,24 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
}
}
private void createAndSendRecord(final ByteBuffer buffer) {
final int clazzid = buffer.getInt();
final long loggingTimestamp = buffer.getLong();
final IMonitoringRecord record;
try { // NOCS (Nested try-catch)
// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
this.send(this.outputPort, record);
} catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex);
}
}
/**
*
*
* @author Jan Waller
*
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.registry.IMonitoringRecordReceiver;
import kieker.common.util.registry.Registry;
public class KiekerLoadDriver {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private final RunnableStage runnableStage;
private long[] timings;
public KiekerLoadDriver(final File directory) {
StageWithPort producerPipeline = this.buildProducerPipeline(directory);
this.runnableStage = new RunnableStage(producerPipeline);
}
private StageWithPort buildProducerPipeline(final File directory) {
ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter);
pipeline.setLastStage(collector);
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().getPipe().add(directory);
return pipeline;
}
public Collection<IMonitoringRecord> load() {
this.runnableStage.run();
return this.elementCollection;
}
private static class RecordReceiver implements IMonitoringRecordReceiver {
private final Registry<String> stringRegistry;
private final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.MAX_VALUE * 10);
private SocketChannel socketChannel;
public RecordReceiver(final Registry<String> stringRegistry) throws IOException {
this.stringRegistry = stringRegistry;
}
@Override
public boolean newMonitoringRecord(final IMonitoringRecord record) {
System.out.println("Registering " + record);
record.writeBytes(this.buffer, this.stringRegistry);
this.buffer.flip();
try {
int writtenBytes = this.socketChannel.write(this.buffer);
System.out.println("writtenBytes: " + writtenBytes);
this.buffer.clear();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
public void connect() throws IOException {
String hostname = "localhost";
int port = 10134;
System.out.println("Connecting to " + hostname + ":" + port);
this.socketChannel = SocketChannel.open();
this.socketChannel.connect(new InetSocketAddress(hostname, port));
}
// public void sendRegistryRecords() throws IOException {
// String hostname = "localhost";
// int port = 10134;
// System.out.println("Connecting to " + hostname + ":" + port);
//
// SocketChannel socketChannel = SocketChannel.open();
// try {
// socketChannel.connect(new InetSocketAddress(hostname, port));
// this.buffer.flip();
// socketChannel.write(this.buffer);
// } finally {
// socketChannel.close();
// }
// }
public void close() throws IOException {
this.socketChannel.close();
}
}
public static void main(final String[] args) throws IOException {
final File directory = new File(args[0]);
final File outputFile = new File(args[1]);
final int runs = Integer.parseInt(args[2]);
KiekerLoadDriver kiekerLoadDriver = new KiekerLoadDriver(directory);
kiekerLoadDriver.timings = new long[runs];
Collection<IMonitoringRecord> records = kiekerLoadDriver.load();
final Registry<String> stringRegistry = new Registry<String>();
ByteBuffer recordBuffer = ByteBuffer.allocateDirect(Short.MAX_VALUE);
RecordReceiver recordReceiver = new RecordReceiver(stringRegistry);
stringRegistry.setRecordReceiver(recordReceiver);
try {
recordReceiver.connect();
for (IMonitoringRecord record : records) {
int clazzId = stringRegistry.get(record.getClass().getName());
recordBuffer.putInt(clazzId);
recordBuffer.putLong(record.getLoggingTimestamp());
record.writeBytes(recordBuffer, stringRegistry);
}
String hostname = "localhost";
int port = 10133;
System.out.println("Connecting to " + hostname + ":" + port);
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(new InetSocketAddress(hostname, port));
for (int i = 0; i < runs; i++) {
recordBuffer.flip();
// System.out.println("position: " + recordBuffer.position());
// System.out.println("limit: " + recordBuffer.limit());
long start_ns = System.nanoTime();
int writtenBytes = socketChannel.write(recordBuffer);
long stop_ns = System.nanoTime();
kiekerLoadDriver.timings[i] = stop_ns - start_ns;
if ((i % 100000) == 0) {
System.out.println(i); // NOPMD (System.out)
}
// System.out.println("writtenBytes (record): " + writtenBytes);
}
} finally {
socketChannel.close();
}
} finally {
recordReceiver.close();
}
PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8");
try {
for (long timing : kiekerLoadDriver.timings) {
ps.println("KiekerLoadDriver;" + timing);
}
} finally {
ps.close();
}
}
}
......@@ -39,9 +39,9 @@ import kieker.common.util.registry.Lookup;
/**
* This is a reader which reads the records from a TCP port.
*
*
* @author Jan Waller, Nils Christian Ehmke
*
*
* @since 1.10
*/
public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
......@@ -156,9 +156,9 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
}
/**
*
*
* @author Jan Waller
*
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.explorviz.KiekerRecordTcpReader;
import kieker.common.record.IMonitoringRecord;
public class TcpTraceLoggingExplorviz extends Analysis {
private Thread tcpThread;
@Override
public void init() {
super.init();
StageWithPort tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
}
@Override
public void start() {
super.start();
this.tcpThread.start();
try {
this.tcpThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
private StageWithPort buildTcpPipeline() {
KiekerRecordTcpReader tcpReader = new KiekerRecordTcpReader();
Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
// create and configure pipeline
Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(endStage);
return tcpReader;
}
public static void main(final String[] args) {
final TcpTraceLoggingExplorviz analysis = new TcpTraceLoggingExplorviz();
analysis.init();
try {
analysis.start();
} finally {
analysis.onTerminate();
}
}
}
......@@ -49,6 +49,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(this.recordCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
......@@ -56,7 +57,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(this.recordThroughputStage);
// pipeline.addIntermediateStage(this.recordThroughputStage);
pipeline.setLastStage(endStage);
return pipeline;
}
......
......@@ -84,12 +84,12 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
// connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort());
// SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceThroughputFilter.getInputPort());
SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort());
// SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
......@@ -100,9 +100,9 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(this.recordThroughputFilter);
// pipeline.addIntermediateStage(this.recordThroughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
// pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceThroughputFilter);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(endStage);
return pipeline;
......@@ -113,8 +113,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
super.start();
this.workerThread.start();
this.clockThread.start();
// this.clock2Thread.start();
// this.clockThread.start();
this.clock2Thread.start();
try {
this.workerThread.join();
......@@ -122,7 +122,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
// this.clock2Thread.interrupt();
this.clock2Thread.interrupt();
}
public List<TraceEventRecords> getElementCollection() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment