Commit a8007685 authored by Florian Fittkau's avatar Florian Fittkau

refactoring and removed some TODOs

parent 92318842
......@@ -266,7 +266,7 @@ org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
org.eclipse.jdt.core.formatter.lineSplit=80
org.eclipse.jdt.core.formatter.lineSplit=100
org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
......
package explorviz.hpc_monitoring.connector;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import com.lmax.disruptor.EventHandler;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
public class TCPConnector implements EventHandler<ByteArrayEvent> {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private String providerUrl;
private final int providerPort;
private Socket socket;
private BufferedOutputStream bufferedOutputStream;
public TCPConnector(final String providerUrl, final int providerPort) {
this.providerUrl = providerUrl;
this.providerPort = providerPort;
try {
connect(providerUrl);
} catch (final IOException e) {
e.printStackTrace();
}
}
private void connect(final String provider) throws IOException {
socket = new Socket(providerUrl, providerPort);
bufferedOutputStream = new BufferedOutputStream(
socket.getOutputStream(), MESSAGE_BUFFER_SIZE);
}
public final void sendMessage(final byte[] message) {
try {
bufferedOutputStream.write(message);
// if (endOfBatch) {
// bufferedOutputStream.flush();
// }
} catch (final IOException e) {
e.printStackTrace();
}
}
public final void cleanup() {
disconnect();
}
private void disconnect() {
if (socket.isConnected()) {
try {
socket.close();
} catch (final IOException e) {
System.out.println(e.toString());
}
}
}
public void setProvider(final String provider) {
synchronized (this) {
if (!provider.equals(providerUrl)) {
disconnect();
try {
connect(provider);
providerUrl = provider;
notifyAll();
} catch (final IOException e) {
e.printStackTrace();
}
}
}
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
sendMessage(event.getValue());
}
}
package explorviz.hpc_monitoring.filter.reconstruction;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.TreeSet;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public class TraceBuffer {
private static final Comparator<AbstractOperationEvent> COMPARATOR = new AbstractOperationEventComperator();
private HostApplicationMetadata traceMetadata;
private HostApplicationMetaData traceMetadata;
private final TreeSet<AbstractOperationEvent> events = new TreeSet<AbstractOperationEvent>(
COMPARATOR);
......@@ -46,8 +45,8 @@ public class TraceBuffer {
}
if (!events.add(event)) {
System.out.println("Duplicate entry for orderIndex " + orderIndex
+ " with traceId " + event.getTraceId());
System.out.println("Duplicate entry for orderIndex " + orderIndex + " with traceId "
+ event.getTraceId());
damaged = true;
}
}
......@@ -67,7 +66,7 @@ public class TraceBuffer {
return orderIndex;
}
public void setTrace(final HostApplicationMetadata trace) {
public void setTrace(final HostApplicationMetaData trace) {
if (traceMetadata != null) {
damaged = true;
return;
......@@ -80,13 +79,11 @@ public class TraceBuffer {
}
public final boolean isInvalid() {
return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size())
|| events.isEmpty() || damaged);
return ((openEvents != 0) || ((maxOrderIndex + 1) != events.size()) || events.isEmpty() || damaged);
}
public final Trace toTrace() {
final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events
.size()];
final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events.size()];
final Iterator<AbstractOperationEvent> iterator = events.iterator();
int index = 0;
while (iterator.hasNext()) {
......@@ -94,19 +91,15 @@ public class TraceBuffer {
index++;
}
// TODO set runtimes
return new Trace(traceMetadata, arrayEvents);
}
/**
* @author Jan Waller
*/
private static final class AbstractOperationEventComperator implements
Comparator<AbstractOperationEvent>, Serializable {
private static final long serialVersionUID = 8920737343446332517L;
Comparator<AbstractOperationEvent> {
@Override
public int compare(final AbstractOperationEvent o1,
final AbstractOperationEvent o2) {
public int compare(final AbstractOperationEvent o1, final AbstractOperationEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
......
package explorviz.hpc_monitoring.filter.reconstruction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -15,22 +17,22 @@ import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimedReader;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.reader.TimeSignalReader;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public final class TraceReconstructionFilter implements
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
public final class TraceReconstructionFilter implements EventHandler<RecordArrayEvent>,
IPeriodicTimeSignalReceiver {
private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 256;
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Reconstructed traces per second");
"Reconstructed traces/sec");
private final long maxTraceTimeout;
private final Map<Long, TraceBuffer> traceId2trace = new TreeMap<Long, TraceBuffer>();
private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceBuffer>();
private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE];
......@@ -46,30 +48,36 @@ public final class TraceReconstructionFilter implements
@SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000,
endReceiver);
eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimedReader(1 * 1000, this).start();
new TimeSignalReader(1 * 1000, this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
checkForTimeouts(timestamp); // TODO comes from other thread -
// synchronize!
flushOutputBuffer();
synchronized (this) {
checkForTimeouts(timestamp);
flushOutputBuffer();
}
}
private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
final List<Long> traceIdsToRemove = new ArrayList<Long>();
for (final Entry<Long, TraceBuffer> entry : traceId2trace.entrySet()) {
final TraceBuffer traceBuffer = entry.getValue();
if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
sendOutInvalidTrace(traceBuffer.toTrace());
// TODO remove from traceId2trace
traceIdsToRemove.add(entry.getKey());
}
}
for (final Long traceIdToRemove : traceIdsToRemove) {
traceId2trace.remove(traceIdToRemove);
}
}
private void sendOutValidTrace(final Trace trace) {
......@@ -78,13 +86,12 @@ public final class TraceReconstructionFilter implements
private void sendOutInvalidTrace(final Trace trace) {
// putInRingBuffer(trace); // TODO
System.out.println("Invalid trace: "
+ trace.getTraceEvents()[0].getTraceId());
System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId());
}
private void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
synchronized (this) { // TODO remove
synchronized (this) {
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) {
......@@ -94,61 +101,57 @@ public final class TraceReconstructionFilter implements
}
private void flushOutputBuffer() {
synchronized (this) { // TODO remove
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE]; // TODO
// object
// reusage?
outputBufferIndex = 0;
}
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValues(outputBuffer);
valueEvent.setValuesLength(outputBufferIndex);
ringBuffer.publish(hiseq);
outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE];
outputBufferIndex = 0;
}
}
@Override
public void onEvent(final RecordArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
for (final IRecord record : event.getValues()) { // TODO save length in
// event
if (record != null) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(
abstractOperationEvent.getTraceId(),
event.getMetadata());
traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) {
traceId2trace.remove(traceId);
sendOutValidTrace(traceBuffer.toTrace());
}
public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
final IRecord[] values = event.getValues();
final int valuesLength = event.getValuesLength();
for (int i = 0; i < valuesLength; i++) {
final IRecord record = values[i];
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(
abstractOperationEvent.getTraceId(), event.getMetadata());
traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) {
traceId2trace.remove(traceId);
sendOutValidTrace(traceBuffer.toTrace());
}
}
}
private TraceBuffer getBufferForTraceId(final long traceId,
final HostApplicationMetadata metadata) {
final HostApplicationMetaData metadata) {
TraceBuffer traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) {
traceBuffer = new TraceBuffer(); // TODO dont create new - keep old
// ones and reset!
traceBuffer.setTrace(metadata); // TODO reuse...
traceBuffer = new TraceBuffer();
traceBuffer.setTrace(metadata);
traceId2trace.put(traceId, traceBuffer);
}
return traceBuffer;
}
public void terminate() {
for (final Object entry : traceId2trace.values()) {
if (entry instanceof TraceBuffer) {
sendOutInvalidTrace(((TraceBuffer) entry).toTrace());
synchronized (this) {
for (final TraceBuffer entry : traceId2trace.values()) {
sendOutInvalidTrace(entry.toTrace());
}
traceId2trace.clear();
}
traceId2trace.clear();
}
}
\ No newline at end of file
package explorviz.hpc_monitoring.filter.reduction;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public class TraceAggregationBuffer {
private Trace accumulator;
......
package explorviz.hpc_monitoring.filter.reduction;
import java.util.Comparator;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public class TraceComperator implements Comparator<Trace> {
@Override
public int compare(final Trace t1, final Trace t2) {
final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents();
final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents();
if ((recordsT1.length - recordsT2.length) != 0) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
final int cmpApplicationNames = t1.getTraceMetadata().getApplication()
.compareTo(t2.getTraceMetadata().getApplication());
if (cmpApplicationNames != 0) {
return cmpApplicationNames;
}
for (int i = 0; i < recordsT1.length; i++) {
final AbstractOperationEvent recordT1 = recordsT1[i];
final AbstractOperationEvent recordT2 = recordsT2[i];
final int cmpSignature = recordT1.getOperationSignatureId()
- recordT2.getOperationSignatureId();
if (cmpSignature != 0) {
return cmpSignature;
}
final int cmpClass = recordT1.getClass().getName()
.compareTo(recordT2.getClass().getName());
if (cmpClass != 0) {
return cmpClass;
}
if (recordT1 instanceof AfterFailedOperationEvent) {
final int cmpError = ((AfterFailedOperationEvent) recordT1).getCause().compareTo(
((AfterFailedOperationEvent) recordT2).getCause());
if (cmpError != 0) {
return cmpClass;
}
}
}
return 0;
}
}
package explorviz.hpc_monitoring.filter.reduction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -16,20 +15,20 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.disruptor.RecordEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.TimedReader;
import explorviz.hpc_monitoring.reader.TimeProvider;
import explorviz.hpc_monitoring.reader.TimeSignalReader;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.event.AbstractOperationEvent;
import explorviz.hpc_monitoring.record.trace.Trace;
public class TracePatternSummarizationFilter implements
EventHandler<RecordArrayEvent>, IPeriodicTimeSignalReceiver {
public class TracePatternSummarizationFilter implements EventHandler<RecordArrayEvent>,
IPeriodicTimeSignalReceiver {
private final long maxCollectionDuration;
private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>(
private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>(
new TraceComperator());
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Reduced trace results per second");
"Reduced traces/sec");
private final RingBuffer<RecordEvent> ringBuffer;
......@@ -48,12 +47,37 @@ public class TracePatternSummarizationFilter implements
disruptor.handleEventsWith(eventHandlers);
}
ringBuffer = disruptor.start();
new TimedReader(1 * 1000, this).start();
new TimeSignalReader(1 * 1000, this).start();
}
@Override
public void onEvent(final RecordArrayEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
final IRecord[] values = event.getValues();
final int valuesLength = event.getValuesLength();
synchronized (this) {
for (int i = 0; i < valuesLength; i++) {
final IRecord record = values[i];
insertIntoBuffer((Trace) record);
}
}
}
private void insertIntoBuffer(final Trace trace) {
TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace);
if (traceAggregationBuffer == null) {
traceAggregationBuffer = new TraceAggregationBuffer(TimeProvider.getCurrentTimestamp());
trace2buffer.put(trace, traceAggregationBuffer);
}
traceAggregationBuffer.insertTrace(trace);
}
@Override
public void periodicTimeSignal(final long timestamp) {
processTimeoutQueue(timestamp);
synchronized (this) {
processTimeoutQueue(timestamp);
}
}
private void processTimeoutQueue(final long timestamp) {
......@@ -73,64 +97,16 @@ public class TracePatternSummarizationFilter implements
private void sendOutTrace(final Trace aggregatedTrace) {
counter.inputObjects(aggregatedTrace);
putInRingBuffer(aggregatedTrace);
}
private void putInRingBuffer(final IRecord record) {
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(record);
valueEvent.setValue(aggregatedTrace);
ringBuffer.publish(hiseq);
}
@Override
public void onEvent(final RecordArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
for (final IRecord record : event.getValues()) {
if (record != null) {
if (record instanceof Trace) {
insertIntoBuffer((Trace) record);
}
}
}
}
private void insertIntoBuffer(final Trace trace) {
TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace);
if (traceAggregationBuffer == null) {
traceAggregationBuffer = new TraceAggregationBuffer(
System.currentTimeMillis());
trace2buffer.put(trace, traceAggregationBuffer);
}
traceAggregationBuffer.insertTrace(trace);
}
public void terminate(final boolean error) {
for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) {
sendOutTrace(traceBuffer.getAggregatedTrace());
}
trace2buffer.clear();
}
private static final class TraceComperator implements Comparator<Trace> {
@Override
public int compare(final Trace t1, final Trace t2) {
final AbstractOperationEvent[] recordsT1 = t1.getTraceEvents();
final AbstractOperationEvent[] recordsT2 = t2.getTraceEvents();
if ((recordsT1.length - recordsT2.length) != 0) {
return recordsT1.length - recordsT2.length;
}
// final int cmpHostnames = t1.getTraceMetadata().getHostname()
// .compareTo(t2.getTraceMetadata().getHostname());
// if (cmpHostnames != 0) {
// return cmpHostnames;
// }
// TODO deep check records
return 0;
}
}
}
package explorviz.hpc_monitoring.reader;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.lmax.disruptor.RingBuffer;
import explorviz.hpc_monitoring.disruptor.RecordArrayEvent;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.StringRegistryRecord;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiver {
private static final int MESSAGE_BUFFER_SIZE = 131072;
private static final int OUTPUT_MESSAGE_BUFFER_SIZE = 16384;
private HostApplicationMetaData hostApplicationMetadata;
private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>();
private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Received records/sec in Reader" + Thread.currentThread().getId());
private final SocketChannel socketChannel;
private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[OUTPUT_MESSAGE_BUFFER_SIZE];
private int outputBufferIndex = 0;
public TCPReaderThread(final SocketChannel socketChannel,
final RingBuffer<RecordArrayEvent> ringBuffer) {
this.socketChannel = socketChannel;
this.ringBuffer = ringBuffer;
new TimeSignalReader(1 * 1000, this).start();
}
@Override
public void run() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
try {
while ((socketChannel.read(buffer)) != -1) {
buffer.flip();
messagesfromByteArray(buffer);
}
} catch (final IOException ex) {
System.out.println("Error in read() " + ex.getMessage());
}
}
@Override
public void periodicTimeSignal(final long timestamp) {
synchronized (this) { // TODO remove
flushOutputBuffer();
}
}
private final void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
buffer.mark();
try {
final byte clazzId = buffer.get();
switch (clazzId) {
case HostApplicationMetaData.CLAZZ_ID: {
readInTraceMetadata(buffer);
break;
}
case BeforeOperationEvent.CLAZZ_ID: {
readInBeforeOperationEvent(buffer);
break;
}
case AfterFailedOperationEvent.CLAZZ_ID: {
readInAfterFailedOperationEvent(buffer);
break;
}
case AfterOperationEvent.CLAZZ_ID: {
readInAfterOperationEvent(buffer);
break;
}
case StringRegistryRecord.CLAZZ_ID: {
final int mapId = buffer.getInt<