Commit f40d945e authored by Florian Fittkau's avatar Florian Fittkau

disruptor in again

parent c17f98e4
......@@ -8,6 +8,7 @@
<classpathentry kind="lib" path="lib/geronimo-jms_1.1_spec-1.1.1.jar"/>
<classpathentry kind="lib" path="lib/activemq-broker-5.9.1.jar"/>
<classpathentry kind="lib" path="lib/aspectjweaver-1.8.5.jar"/>
<classpathentry kind="lib" path="lib/disruptor-3.3.0.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
......
......@@ -73,6 +73,7 @@
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/aspectjweaver-1.8.5.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/disruptor-3.3.0.jar"/>
<!-- <zipfileset excludes="META-INF/*.SF" src="lib/hsqldb-2.3.1.jar"/> -->
<!-- <zipfileset excludes="META-INF/*.SF" src="lib/servlet-api-3.0.jar"/> -->
</jar>
......@@ -98,6 +99,7 @@
<fileset dir="lib" includes="*LICENSE*" />
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/disruptor-3.3.0.jar"/>
</jar>
</target>
......
This diff is collapsed.
......@@ -2,6 +2,8 @@ package explorviz.live_trace_processing.main;
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventFactory;
import explorviz.live_trace_processing.Constants;
public final class ByteBufferEvent {
......@@ -14,4 +16,11 @@ public final class ByteBufferEvent {
public void setValue(final ByteBuffer value) {
this.value = value;
}
public final static EventFactory<ByteBufferEvent> EVENT_FACTORY = new EventFactory<ByteBufferEvent>() {
@Override
public ByteBufferEvent newInstance() {
return new ByteBufferEvent();
}
};
}
\ No newline at end of file
......@@ -4,28 +4,29 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.aspectj.lang.Signature;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.probe.tracemanagement.TraceRegistry;
import explorviz.live_trace_processing.reader.IPeriodicTimeSignalReceiver;
import explorviz.live_trace_processing.reader.TimeSignalReader;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.system_mon.SystemMonitor;
import explorviz.live_trace_processing.writer.TCPWriter;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class MonitoringController implements IPeriodicTimeSignalReceiver {
public class MonitoringController {
private static final SystemMonitor systemMonitor;
private static final Map<Signature, Integer> signatureReg = new ConcurrentSkipListMap<Signature, Integer>(
......@@ -37,9 +38,7 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
private static boolean initialized = false;
private static Map<Thread, Queue<ByteBufferEvent>> threadToQueueMap = new ConcurrentHashMap<Thread, Queue<ByteBufferEvent>>();
private static PipesMerger<ByteBufferEvent> tcpPipesMerger;
private static RingBuffer<ByteBufferEvent> ringBuffer;
static {
// remoteConfigurationServlet = new RemoteConfigurationServlet();
......@@ -53,17 +52,32 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
TraceRegistry.init(configuration);
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteBufferEvent> disruptor = new Disruptor<ByteBufferEvent>(
ByteBufferEvent.EVENT_FACTORY, Constants.MONITORING_CONTROLLER_DISRUPTOR_SIZE,
exec, ProducerType.MULTI, new BlockingWaitStrategy());
final boolean androidMonitoring = configuration
.getBooleanProperty(ConfigurationFactory.ANDROID_MONITORING);
tcpPipesMerger = new PipesMerger<ByteBufferEvent>(configuration.getIntProperty(
ConfigurationFactory.MONITORING_CONTROLLER_DISRUPTOR_SIZE, 32));
@SuppressWarnings("unchecked")
final EventHandler<ByteBufferEvent>[] eventHandlers = new EventHandler[1];
final TCPWriter tcpWriter = new TCPWriter(tcpPipesMerger,
final TCPWriter tcpWriter = new TCPWriter(
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT, 10133),
androidMonitoring, configuration);
eventHandlers[0] = tcpWriter;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
// final TCPWriter tcpWriter = new TCPWriter(tcpPipesMerger,
// configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
// configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT,
// 10133),
// androidMonitoring, configuration);
final boolean loadBalancerEnabled = configuration
.getBooleanProperty(ConfigurationFactory.LOAD_BALANCER_ENABLED);
......@@ -76,9 +90,9 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpWriter);
tcpWriter);
} else {
try {
tcpWriter.connect();
......@@ -88,8 +102,6 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
}
}
tcpWriter.start();
if (systemMonitorEnabled) {
systemMonitor = new SystemMonitor(1000);
systemMonitor.start();
......@@ -97,7 +109,8 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
systemMonitor = null;
}
new TimeSignalReader(TimeUnit.SECONDS.toMillis(10), new MonitoringController()).start();
// new TimeSignalReader(TimeUnit.SECONDS.toMillis(10), new
// MonitoringController()).start();
initialized = true;
}
......@@ -120,33 +133,18 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
return;
}
buffer.flip();
final ByteBufferEvent bufferEvent = new ByteBufferEvent();
final ByteBuffer newBuffer = bufferEvent.getValue();
newBuffer.put(buffer);
putIntoQueue(bufferEvent);
final long hiseq = ringBuffer.next();
final ByteBufferEvent valueEvent = ringBuffer.get(hiseq);
buffer.flip();
final ByteBuffer oldBuffer = valueEvent.getValue();
oldBuffer.clear();
oldBuffer.put(buffer);
valueEvent.setValue(oldBuffer);
ringBuffer.publish(hiseq);
buffer.clear();
}
private static final void putIntoQueue(final ByteBufferEvent bufferEvent) {
Queue<ByteBufferEvent> queue = threadToQueueMap.get(Thread.currentThread());
if (queue == null) {
queue = tcpPipesMerger.registerProducer();
threadToQueueMap.put(Thread.currentThread(), queue);
}
while (!queue.offer(bufferEvent)) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
}
public static final Integer getIdForSignature(final Signature sig) {
Integer result = signatureReg.get(sig);
if (result == null) {
......@@ -158,15 +156,16 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
}
public static final void sendOutSystemRecord(final SystemMonitoringRecord record) {
final ByteBufferEvent byteBufferEvent = new ByteBufferEvent();
final ByteBuffer buffer = byteBufferEvent.getValue();
final long hiseq = ringBuffer.next();
final ByteBufferEvent valueEvent = ringBuffer.get(hiseq);
final ByteBuffer buffer = valueEvent.getValue();
buffer.clear();
buffer.put(SystemMonitoringRecord.CLAZZ_ID);
buffer.putDouble(record.getCpuUtilization());
buffer.putLong(record.getUsedRAM());
buffer.putLong(record.getAbsoluteRAM());
putIntoQueue(byteBufferEvent);
valueEvent.setValue(buffer);
ringBuffer.publish(hiseq);
}
public static void shutdown() {
......@@ -181,26 +180,12 @@ public class MonitoringController implements IPeriodicTimeSignalReceiver {
// remoteConfigurationServlet.stop();
tcpPipesMerger.terminate();
if (systemMonitor != null) {
systemMonitor.shutdown();
}
}
}).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
final Iterator<Entry<Thread, Queue<ByteBufferEvent>>> iterator = threadToQueueMap
.entrySet().iterator();
while (iterator.hasNext()) {
final Entry<Thread, Queue<ByteBufferEvent>> threadEntry = iterator.next();
if (!threadEntry.getKey().isAlive()) {
tcpPipesMerger.deregisterProducer(threadEntry.getValue());
}
}
}
}
final class SignatureComperator implements Comparator<Signature>, Serializable {
......
......@@ -31,7 +31,7 @@ public abstract class AbstractAspect {
private static final ThreadLocalLastSendingTime lastSendingTime = new ThreadLocalLastSendingTime();
private static final AdaptiveMonitoring adaptiveMonitoring = new AdaptiveMonitoring();
@Pointcut("!within(explorviz..*) && !within(org.jctools..*)")
@Pointcut("!within(explorviz..*) && !within(com.lmax..*)")
public void notWithinExplorViz() {
}
......@@ -56,7 +56,7 @@ public abstract class AbstractAspect {
AfterFailedOperationEventRecord.CLAZZ_ID,
AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID,
AfterOperationEventRecord.CLAZZ_ID, System.identityHashCode(thisObject), thisObject
.getClass().getName(), getInterface(thisJoinPoint));
.getClass().getName(), getInterface(thisJoinPoint));
}
@Around("monitoredOperation() && !this(java.lang.Object) && notWithinExplorViz()")
......@@ -73,7 +73,7 @@ public abstract class AbstractAspect {
AfterFailedStaticOperationEventRecord.CLAZZ_ID,
AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID,
AfterStaticOperationEventRecord.CLAZZ_ID, 0, thisJoinPoint.getSignature()
.getDeclaringTypeName(), getInterface(thisJoinPoint));
.getDeclaringTypeName(), getInterface(thisJoinPoint));
}
@Around("monitoredConstructor() && this(thisObject) && notWithinExplorViz()")
......@@ -118,7 +118,7 @@ public abstract class AbstractAspect {
final int beforeLength, final byte beforeId, final int afterFailedLength,
final byte afterFailedId, final int afterLength, final byte afterId,
final int objectId, final String clazz, final String implementedInterface)
throws Throwable {
throws Throwable {
final ByteBuffer buffer = bufferStore.get();
final ProbeTraceMetaData trace = TraceRegistry.getTrace();
......
......@@ -9,16 +9,18 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import com.lmax.disruptor.EventHandler;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.debug.DebugFileByteBufferWriter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.main.ByteBufferEvent;
import explorviz.live_trace_processing.main.MonitoringStringRegistry;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
public class TCPWriter extends Thread implements IPipeReceiver<ByteBufferEvent>, IWriter {
public class TCPWriter implements IPipeReceiver<ByteBufferEvent>, IWriter,
EventHandler<ByteBufferEvent> {
private final boolean debug;
private DebugFileByteBufferWriter debugWriter;
......@@ -40,15 +42,13 @@ public class TCPWriter extends Thread implements IPipeReceiver<ByteBufferEvent>,
private final boolean androidMonitoring;
private static String applicationName;
private final PipesMerger<ByteBufferEvent> merger;
public static String getApplicationName() {
return applicationName;
}
public TCPWriter(final PipesMerger<ByteBufferEvent> merger, final String hostname,
final int port, final boolean androidMonitoring, final Configuration configuration) {
this.merger = merger;
public TCPWriter(final String hostname, final int port, final boolean androidMonitoring,
final Configuration configuration) {
this.androidMonitoring = androidMonitoring;
this.configuration = configuration;
try {
......@@ -63,11 +63,6 @@ public class TCPWriter extends Thread implements IPipeReceiver<ByteBufferEvent>,
}
}
@Override
public void run() {
merger.process(this);
}
@Override
public URL getProviderURL() {
return providerURL;
......@@ -240,4 +235,10 @@ public class TCPWriter extends Thread implements IPipeReceiver<ByteBufferEvent>,
shouldDisconnect = true;
}
}
@Override
public void onEvent(final ByteBufferEvent arg0, final long arg1, final boolean arg2)
throws Exception {
processEvent(arg0);
}
}
......@@ -26,7 +26,7 @@ public class TCPWriterTest {
@Test
public void testSetProviderURL() throws Exception {
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10133, false,
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10133, false,
ConfigurationFactory.createSingletonConfiguration());
assertEquals("127.0.0.1", tcpWriter.getProviderURL().getHost());
assertEquals(10133, tcpWriter.getProviderURL().getPort());
......@@ -38,14 +38,14 @@ public class TCPWriterTest {
@Test
public void testInit() throws Exception {
MonitoringStringRegistry.reset();
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10100, false,
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10100, false,
ConfigurationFactory.createSingletonConfiguration());
tcpWriter.init();
assertEquals(3,
(int) MonitoringStringRegistry
.getIdForStringWithoutSending("<UNKNOWN-APPLICATION>"));
.getIdForStringWithoutSending("<UNKNOWN-APPLICATION>"));
assertEquals(2, (int) MonitoringStringRegistry.getIdForStringWithoutSending(HostnameFetcher
.getHostname()));
}
......@@ -88,7 +88,7 @@ public class TCPWriterTest {
@Test
public void testOnEvent() throws Exception {
MonitoringStringRegistry.reset();
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10144, false,
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10144, false,
ConfigurationFactory.createSingletonConfiguration());
tcpWriter.init();
......
......@@ -10,7 +10,7 @@ public class MonitoringStarter {
public static void main(final String[] args) {
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 3; i++) {
for (int i = 0; i <= 3; i++) {
final Thread thread = new Thread(new Runnable() {
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment