Commit bdb01407 authored by Florian Fittkau's avatar Florian Fittkau

Records per second: 28,60 millions :)

parent 9b6b26c1
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="testpackage.TestStart"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="monitored-application"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-javaagent:lib/aspectjweaver.jar"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-javaagent:lib/aspectjweaver.jar -Xmx4G"/>
</launchConfiguration>
......@@ -15,30 +15,30 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.writer.ByteArrayEvent;
import explorviz.hpc_monitoring.disruptor.Byte32ArrayEvent;
import explorviz.hpc_monitoring.writer.TCPWriter;
public enum MonitoringController { // Singleton (Effective Java #3)
INSTANCE;
public class MonitoringController {
private static final RingBuffer<Byte32ArrayEvent> ringBuffer;
private final RingBuffer<ByteArrayEvent> ringBuffer;
private static final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>();
private static final ConcurrentHashMap<Signature, Integer> signatureReg = new ConcurrentHashMap<Signature, Integer>();
private final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>(
16, 0.75f, 2);
private final ConcurrentHashMap<Signature, Integer> signatureReg = new ConcurrentHashMap<Signature, Integer>(
16, 0.75f, 2);
private final AtomicInteger stringRegIndex = new AtomicInteger(0);
private static final AtomicInteger stringRegIndex = new AtomicInteger(0);
private MonitoringController() {
// private static final CountingThroughputFilter counter;
static {
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteArrayEvent> disruptor = new Disruptor<ByteArrayEvent>(
ByteArrayEvent.EVENT_FACTORY, 32768, exec);
final Disruptor<Byte32ArrayEvent> disruptor = new Disruptor<Byte32ArrayEvent>(
Byte32ArrayEvent.EVENT_FACTORY, 128, exec);
@SuppressWarnings("unchecked")
final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1];
final EventHandler<Byte32ArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TCPWriter("127.0.0.1", 10133);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
// counter = new CountingThroughputFilter("monitoring: ");
}
// public final ByteArrayEvent getNewMonitoringRecordSlot() {
......@@ -52,15 +52,17 @@ public enum MonitoringController { // Singleton (Effective Java #3)
// ringBuffer.publish(record.getSequence());
// }
public final void newMonitoringRecord(final byte[] message, final int length) {
public static final void newMonitoringRecord(final byte[] message,
final int length) {
// counter.inputObjects(message);
final long hiseq = ringBuffer.next();
final ByteArrayEvent valueEvent = ringBuffer.get(hiseq);
final Byte32ArrayEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(message);
valueEvent.setLength(length);
ringBuffer.publish(hiseq);
}
public final Integer getIdForSignature(final Signature sig) {
public static final Integer getIdForSignature(final Signature sig) {
Integer result = signatureReg.get(sig);
if (result == null) {
final String value = signatureToLongString(sig);
......@@ -70,7 +72,7 @@ public enum MonitoringController { // Singleton (Effective Java #3)
return result;
}
public final Integer getIdForString(final String value) {
public static final Integer getIdForString(final String value) {
Integer result = stringReg.get(value);
if (result == null) {
result = stringRegIndex.getAndIncrement();
......@@ -92,7 +94,7 @@ public enum MonitoringController { // Singleton (Effective Java #3)
return result;
}
private final String signatureToLongString(final Signature sig) {
private static final String signatureToLongString(final Signature sig) {
if (sig instanceof MethodSignature) {
final MethodSignature signature = (MethodSignature) sig;
final StringBuilder sb = new StringBuilder(256);
......@@ -145,7 +147,7 @@ public enum MonitoringController { // Singleton (Effective Java #3)
}
}
private final StringBuilder addTypeList(final StringBuilder sb,
private static final StringBuilder addTypeList(final StringBuilder sb,
final Class<?>[] clazzes) {
if (null != clazzes) {
boolean first = true;
......@@ -161,7 +163,7 @@ public enum MonitoringController { // Singleton (Effective Java #3)
return sb;
}
private final StringBuilder addType(final StringBuilder sb,
private static final StringBuilder addType(final StringBuilder sb,
final Class<?> clazz) {
if (null == clazz) {
sb.append("ANONYMOUS");
......
......@@ -19,7 +19,7 @@ package explorviz.hpc_monitoring;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.Stack;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
......@@ -33,10 +33,9 @@ public enum TraceRegistry { // Singleton (Effective Java #3)
private final int applicationId;
private final ThreadLocal<byte[]> traceStorage = new ThreadLocal<byte[]>();
private final ThreadLocal<Stack<byte[]>> enclosingTraceStack = new ThreadLocal<Stack<byte[]>>();
private final WeakHashMap<Thread, TracePoint> parentTrace = new WeakHashMap<Thread, TracePoint>();
private final ConcurrentHashMap<Thread, TracePoint> parentTrace = new ConcurrentHashMap<Thread, TracePoint>();
private TraceRegistry() {
String hostnameTmp = ""; // TODO
......@@ -49,8 +48,8 @@ public enum TraceRegistry { // Singleton (Effective Java #3)
ex.printStackTrace();
}
}
hostnameId = MonitoringController.INSTANCE.getIdForString(hostnameTmp);
applicationId = MonitoringController.INSTANCE
hostnameId = MonitoringController.getIdForString(hostnameTmp);
applicationId = MonitoringController
.getIdForString("MonitoredApplication");
final long uniqueOffset = new SecureRandom().nextLong();
......@@ -116,11 +115,11 @@ public enum TraceRegistry { // Singleton (Effective Java #3)
if (!localTraceStack.isEmpty()) { // we actually found something
traceStorage.set(localTraceStack.pop());
} else {
enclosingTraceStack.remove();
traceStorage.remove();
enclosingTraceStack.set(null);
traceStorage.set(null);
}
} else {
traceStorage.remove();
traceStorage.set(null);
}
}
......
package explorviz.hpc_monitoring.probe;
import java.nio.ByteBuffer;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
......@@ -15,9 +17,16 @@ import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
@Aspect
public abstract class AbstractAspect {
private static final MonitoringController CTRLINST = MonitoringController.INSTANCE;
private static final TraceRegistry TRACEREGISTRY = TraceRegistry.INSTANCE;
private static final int MESSAGE_BUFFER_SIZE = 131072;
private static final ThreadLocal<ByteBuffer> bufferStore = new ThreadLocal<ByteBuffer>();
// private static final CountingThroughputFilter counter = new
// CountingThroughputFilter(
// "probe: ");
/**
* The pointcut for the monitored operations. Inheriting classes should
* extend the pointcut in order to find the correct executions of the
......@@ -29,27 +38,37 @@ public abstract class AbstractAspect {
@Around("monitoredOperation()")
public Object operation(final ProceedingJoinPoint thisJoinPoint)
throws Throwable {
// if (!CTRLINST.isProbeActivated(signature)) { // TODO
// return thisJoinPoint.proceed();
// }
// counter.inputObjects(thisJoinPoint);
ByteBuffer buffer = bufferStore.get();
if (buffer == null) {
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
bufferStore.set(buffer);
}
// TODO static methods and constructor calls
byte[] trace = TRACEREGISTRY.getTrace();
final boolean newTrace = trace == null;
if (newTrace) {
trace = TRACEREGISTRY.registerTrace();
CTRLINST.newMonitoringRecord(trace,
TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
if (TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
buffer.flip();
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
buffer.clear();
}
buffer.put(trace, 0, TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
}
final long traceId = UnsafeBits.getLong(trace, 4);
// final String clazz = thisObject.getClass().getName(); TODO needed?
final int signatureId = CTRLINST.getIdForSignature(thisJoinPoint
.getSignature());
final int signatureId = MonitoringController
.getIdForSignature(thisJoinPoint.getSignature());
final byte[] beforeEvent = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
UnsafeBits.putInt(beforeEvent, 0, BeforeOperationEvent.CLAZZ_ID);
UnsafeBits.putLong(beforeEvent, 4, System.nanoTime());
UnsafeBits.putLong(beforeEvent, 4, System.currentTimeMillis());
UnsafeBits.putLong(beforeEvent, 12, traceId);
final int nextBeforeOrderId = UnsafeBits.getInt(trace, 32);
......@@ -58,7 +77,15 @@ public abstract class AbstractAspect {
UnsafeBits.putInt(beforeEvent, 20, nextBeforeOrderId);
UnsafeBits.putInt(beforeEvent, 24, signatureId);
CTRLINST.newMonitoringRecord(beforeEvent,
if (BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
buffer.flip();
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
buffer.clear();
}
buffer.put(beforeEvent, 0,
BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
final Object retval;
......@@ -68,7 +95,7 @@ public abstract class AbstractAspect {
final byte[] afterFailedEvent = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
UnsafeBits.putInt(afterFailedEvent, 0,
AfterFailedOperationEvent.CLAZZ_ID);
UnsafeBits.putLong(afterFailedEvent, 4, System.nanoTime());
UnsafeBits.putLong(afterFailedEvent, 4, System.currentTimeMillis());
UnsafeBits.putLong(afterFailedEvent, 12, traceId);
final int nextAfterFailOrderId = UnsafeBits.getInt(trace, 32);
......@@ -77,10 +104,20 @@ public abstract class AbstractAspect {
UnsafeBits.putInt(afterFailedEvent, 20, nextAfterFailOrderId);
UnsafeBits.putInt(afterFailedEvent, 24, signatureId);
final int errorId = CTRLINST.getIdForString(th.toString());
final int errorId = MonitoringController.getIdForString(th
.toString());
UnsafeBits.putInt(afterFailedEvent, 28, errorId);
CTRLINST.newMonitoringRecord(afterFailedEvent,
if (AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer
.remaining()) {
buffer.flip();
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
buffer.clear();
}
buffer.put(afterFailedEvent, 0,
AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
throw th;
} finally {
......@@ -91,7 +128,7 @@ public abstract class AbstractAspect {
final byte[] afterEvent = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
UnsafeBits.putInt(afterEvent, 0, AfterOperationEvent.CLAZZ_ID);
UnsafeBits.putLong(afterEvent, 4, System.nanoTime());
UnsafeBits.putLong(afterEvent, 4, System.currentTimeMillis());
UnsafeBits.putLong(afterEvent, 12, traceId);
final int nextAfterOrderId = UnsafeBits.getInt(trace, 32);
......@@ -100,8 +137,15 @@ public abstract class AbstractAspect {
UnsafeBits.putInt(afterEvent, 20, nextAfterOrderId);
UnsafeBits.putInt(afterEvent, 24, signatureId);
CTRLINST.newMonitoringRecord(afterEvent,
AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
if (AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
buffer.flip();
final int max = buffer.limit();
final byte[] messages = new byte[max];
buffer.get(messages, 0, max);
MonitoringController.newMonitoringRecord(messages, max);
buffer.clear();
}
buffer.put(afterEvent, 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
return retval;
}
}
package explorviz.hpc_monitoring.writer;
import com.lmax.disruptor.EventFactory;
/**
* WARNING: This is a mutable object which will be recycled by the RingBuffer.
* You must take a copy of data it holds before the framework recycles it.
*/
public final class ByteArrayEvent {
private byte[] value = new byte[32];
private int length;
private long sequence;
public final byte[] getValue() {
return value;
}
public void setValue(final byte[] value) {
this.value = value;
}
public final int getLength() {
return length;
}
public void setLength(final int length) {
this.length = length;
}
public long getSequence() {
return sequence;
}
public void setSequence(final long sequence) {
this.sequence = sequence;
}
public final static EventFactory<ByteArrayEvent> EVENT_FACTORY = new EventFactory<ByteArrayEvent>() {
@Override
public ByteArrayEvent newInstance() {
return new ByteArrayEvent();
}
};
}
\ No newline at end of file
......@@ -9,8 +9,10 @@ import java.nio.channels.SocketChannel;
import com.lmax.disruptor.EventHandler;
public class TCPWriter implements EventHandler<ByteArrayEvent> {
private static final int MESSAGE_BUFFER_SIZE = 65536;
import explorviz.hpc_monitoring.disruptor.Byte32ArrayEvent;
public class TCPWriter implements EventHandler<Byte32ArrayEvent> {
private static final int MESSAGE_BUFFER_SIZE = 131072;
private URL providerURL;
private URL loadBalancerURL;
......@@ -19,9 +21,7 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
private SocketChannel socketChannel;
private ByteBuffer buffer;
// private CountingThroughputFilter counter;
private final ByteBuffer buffer;
public TCPWriter(final String hostname, final int port) {
try {
......@@ -33,7 +33,6 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
loadBalancerWaitTimeInNs = 0;
loadBalancerURL = null;
// counter = new CountingThroughputFilter("Records per second from: ");
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
try {
connect();
......@@ -55,6 +54,7 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
}
this.loadBalancerWaitTimeInNs = loadBalancerWaitTimeInNs;
buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
createLoadBalancer();
}
......@@ -85,11 +85,12 @@ public class TCPWriter implements EventHandler<ByteArrayEvent> {
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
public void onEvent(final Byte32ArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
if (socketChannel.isConnected()) {
sendMessage(event.getValue(), event.getLength(), endOfBatch);
// counter.inputObjects(event);
// sendMessage(event.getValue(), event.getLength(), endOfBatch);
socketChannel.write(ByteBuffer.wrap(event.getValue(), 0,
event.getLength()));
}
}
......
......@@ -3,7 +3,7 @@ package testpackage;
public class TestStart {
public static void main(final String[] args) {
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment