Commit 09e7fe2b authored by Florian Fittkau's avatar Florian Fittkau

optimizations

parent 34195e54
package explorviz.hpc_monitoring;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -16,24 +19,34 @@ import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.probe.ThreadLocalByteBuffer;
import explorviz.hpc_monitoring.writer.TCPWriter;
final class SignatureComperator implements Comparator<Signature>, Serializable {
private static final long serialVersionUID = 6319332357717471530L;
@Override
public int compare(final Signature o1, final Signature o2) {
return o1.hashCode() - o2.hashCode();
}
}
public class MonitoringController {
private static final RingBuffer<ByteArrayEvent> ringBuffer;
private static final int MESSAGE_BUFFER_SIZE = 131072;
public static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE);
private static final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>();
private static final ConcurrentHashMap<Signature, Integer> signatureReg = new ConcurrentHashMap<Signature, Integer>();
private static final RingBuffer<ByteArrayEvent> ringBuffer;
private static final Map<String, Integer> stringReg = new ConcurrentSkipListMap<String, Integer>();
private static final AtomicInteger stringRegIndex = new AtomicInteger(0);
private static volatile boolean monitoringEnabled = true;
private static final Map<Signature, Integer> signatureReg = new ConcurrentSkipListMap<Signature, Integer>(
new SignatureComperator());
private static final int MESSAGE_BUFFER_SIZE = 131072;
public static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE);
private static volatile boolean monitoringEnabled = true;
static {
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>(
ByteArrayEvent.EVENT_FACTORY, 128, exec);
ByteArrayEvent.EVENT_FACTORY, 256, exec);
@SuppressWarnings("unchecked")
final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1];
......@@ -45,12 +58,10 @@ public class MonitoringController {
tcpWriter.init();
}
public static final void newMonitoringRecord(final byte[] message,
final int length) {
public static final void newMonitoringRecord(final byte[] message) {
final long hiseq = ringBuffer.next();
final ByteArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(message);
valueEvent.setLength(length);
ringBuffer.publish(hiseq);
}
......@@ -82,7 +93,7 @@ public class MonitoringController {
buffer.put(value.getBytes());
newMonitoringRecord(regRecord, regRecordLength);
newMonitoringRecord(regRecord);
}
return result;
......
......@@ -48,7 +48,7 @@ public abstract class AbstractAspect {
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
MonitoringController.newMonitoringRecord(messages);
buffer.clear();
}
......@@ -69,7 +69,7 @@ public abstract class AbstractAspect {
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
MonitoringController.newMonitoringRecord(messages);
buffer.clear();
}
......@@ -77,7 +77,7 @@ public abstract class AbstractAspect {
buffer.putLong(System.currentTimeMillis());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId);
buffer.putInt(signatureId); // TODO might not be needed
final int errorId = MonitoringController.getIdForString(th
.toString());
......@@ -95,7 +95,7 @@ public abstract class AbstractAspect {
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
MonitoringController.newMonitoringRecord(messages);
buffer.clear();
}
......@@ -103,7 +103,7 @@ public abstract class AbstractAspect {
buffer.putLong(System.currentTimeMillis());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId);
buffer.putInt(signatureId); // TODO might not be needed
if (newTrace) { // close the trace
TraceRegistry.unregisterTrace();
......
......@@ -96,13 +96,23 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
send(event.getValue(), event.getLength());
send(event.getValue());
}
private void send(final byte[] message, final int length) {
private void send(final byte[] message) {
if (socketChannel.isConnected()) {
try {
socketChannel.write(ByteBuffer.wrap(message, 0, length));
socketChannel.write(ByteBuffer.wrap(message));
} catch (final IOException e) {
e.printStackTrace();
}
}
}
private void send(final ByteBuffer buffer) {
if (socketChannel.isConnected()) {
try {
socketChannel.write(buffer);
} catch (final IOException e) {
e.printStackTrace();
}
......@@ -159,6 +169,11 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
.getIdForString("MonitoredApplication");
final byte[] message = new byte[HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
send(message, HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
final ByteBuffer buffer = ByteBuffer.wrap(message);
buffer.put(HostApplicationMetadata.CLAZZ_ID);
buffer.putInt(hostnameId);
buffer.putInt(applicationId);
buffer.flip();
send(buffer);
}
}
......@@ -10,7 +10,7 @@ public class TestStart {
public void run() {
for (int i = 0; i < 400000000; i++) {
// final TestClass testClass = new TestClass();
TestClass.testMethod(0);
TestClass.testMethod(9);
}
}
}).run();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment