Commit 34195e54 authored by Florian Fittkau's avatar Florian Fittkau

35 million records per second :)

parent 9ca90467
......@@ -10,6 +10,5 @@
</attributes>
</classpathentry>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry combineaccessrules="false" kind="src" path="/kieker"/>
<classpathentry kind="output" path="bin"/>
</classpath>
......@@ -6,7 +6,7 @@
</weaver>
<aspects>
<concrete-aspect name="kieker.monitoring.probe.aspectj.flow.operationExecutionFlat.TargetedAspect" extends="kieker.monitoring.probe.aspectj.flow.operationExecutionFlat.AbstractAspect">
<concrete-aspect name="explorviz.hpc_monitoring.probe.TargetedAspect" extends="explorviz.hpc_monitoring.probe.AbstractAspect">
<pointcut name="monitoredOperation" expression="execution(* testpackage.TestClass*.*(..))" />
</concrete-aspect>
</aspects>
......
......@@ -6,7 +6,7 @@
</weaver>
<aspects>
<concrete-aspect name="explorviz.hpc_monitoring.probe.TargetedAspect" extends="explorviz.hpc_monitoring.probe.AbstractAspect">
<concrete-aspect name="kieker.monitoring.probe.aspectj.flow.operationExecutionFlat.TargetedAspect" extends="kieker.monitoring.probe.aspectj.flow.operationExecutionFlat.AbstractAspect">
<pointcut name="monitoredOperation" expression="execution(* testpackage.TestClass*.*(..))" />
</concrete-aspect>
</aspects>
......
......@@ -13,6 +13,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.disruptor.ByteArrayEvent;
import explorviz.hpc_monitoring.probe.ThreadLocalByteBuffer;
import explorviz.hpc_monitoring.writer.TCPWriter;
public class MonitoringController {
......@@ -25,6 +26,10 @@ public class MonitoringController {
private static volatile boolean monitoringEnabled = true;
private static final int MESSAGE_BUFFER_SIZE = 131072;
public static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE);
static {
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>(
......@@ -52,7 +57,8 @@ public class MonitoringController {
public static final Integer getIdForSignature(final Signature sig) {
Integer result = signatureReg.get(sig);
if (result == null) {
final String value = SignatureToStringConverter.signatureToLongString(sig);
final String value = SignatureToStringConverter
.signatureToLongString(sig);
result = getIdForString(value);
signatureReg.put(sig, result);
}
......
......@@ -14,11 +14,6 @@ import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
@Aspect
public abstract class AbstractAspect {
private static final int MESSAGE_BUFFER_SIZE = 131072;
private static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE);
// private static final CountingThroughputFilter counter = new
// CountingThroughputFilter(
// "probe: ");
......@@ -37,7 +32,7 @@ public abstract class AbstractAspect {
// return thisJoinPoint.proceed();
// }
final ByteBuffer buffer = bufferStore.get();
final ByteBuffer buffer = MonitoringController.bufferStore.get();
final ProbeTraceMetaData trace = TraceRegistry.getTrace();
final boolean newTrace = trace.isNewTrace();
......@@ -58,12 +53,13 @@ public abstract class AbstractAspect {
}
buffer.put(BeforeOperationEvent.CLAZZ_ID);
buffer.putLong(System.nanoTime());
buffer.putLong(System.currentTimeMillis());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId);
final Object retval;
try {
retval = thisJoinPoint.proceed();
} catch (final Throwable th) {
......@@ -78,7 +74,7 @@ public abstract class AbstractAspect {
}
buffer.put(AfterFailedOperationEvent.CLAZZ_ID);
buffer.putLong(System.nanoTime());
buffer.putLong(System.currentTimeMillis());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId);
......@@ -86,11 +82,12 @@ public abstract class AbstractAspect {
final int errorId = MonitoringController.getIdForString(th
.toString());
buffer.putInt(errorId);
throw th;
} finally {
if (newTrace) { // close the trace
if (newTrace) {
TraceRegistry.unregisterTrace();
}
throw th;
}
if (AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
......@@ -103,11 +100,15 @@ public abstract class AbstractAspect {
}
buffer.put(AfterOperationEvent.CLAZZ_ID);
buffer.putLong(System.nanoTime());
buffer.putLong(System.currentTimeMillis());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId);
if (newTrace) { // close the trace
TraceRegistry.unregisterTrace();
}
return retval;
}
}
......@@ -19,7 +19,7 @@ public class ProbeTraceMetaData {
}
public final int getNextOrderId() {
return ++nextOrderId;
return nextOrderId++;
}
public final void setNextOrderId(final int nextOrderId) {
......@@ -35,7 +35,7 @@ public class ProbeTraceMetaData {
}
public void reset() {
nextOrderId = -1;
nextOrderId = 0;
newTrace = true;
}
}
......@@ -96,9 +96,16 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
send(event.getValue(), event.getLength());
}
private void send(final byte[] message, final int length) {
if (socketChannel.isConnected()) {
socketChannel.write(ByteBuffer.wrap(event.getValue(), 0,
event.getLength()));
try {
socketChannel.write(ByteBuffer.wrap(message, 0, length));
} catch (final IOException e) {
e.printStackTrace();
}
}
}
......@@ -150,5 +157,8 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
hostnameId = MonitoringController.getIdForString(hostnameTmp);
applicationId = MonitoringController
.getIdForString("MonitoredApplication");
final byte[] message = new byte[HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
send(message, HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
}
}
......@@ -10,7 +10,7 @@ public class TestStart {
public void run() {
for (int i = 0; i < 400000000; i++) {
// final TestClass testClass = new TestClass();
TestClass.testMethod(1);
TestClass.testMethod(0);
}
}
}).run();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment