Commit 9fb93bc6 authored by Florian Fittkau's avatar Florian Fittkau

monitoring now working again after switch to jctools

parent 594dbbd8
......@@ -8,7 +8,6 @@
<classpathentry kind="lib" path="lib/geronimo-jms_1.1_spec-1.1.1.jar"/>
<classpathentry kind="lib" path="lib/activemq-broker-5.9.1.jar"/>
<classpathentry kind="lib" path="lib/aspectjweaver-1.8.5.jar"/>
<classpathentry kind="lib" path="lib/jctools-core-1.0.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
......
......@@ -70,7 +70,6 @@
<fileset dir="${src.dir}" includes="META-INF/aop.xml" />
<fileset dir="${src.dir}" includes="META-INF/explorviz.live_trace_processing.default.properties" />
<fileset dir="${src.dir}" includes="META-INF/jaxws_probe_handlers.xml" />
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/aspectjweaver-1.8.5.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
......@@ -97,7 +96,6 @@
</manifest>
<fileset dir="${build.dir}" excludes="*common*.jar" />
<fileset dir="lib" includes="*LICENSE*" />
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
</jar>
......
This diff is collapsed.
......@@ -4,33 +4,44 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.aspectj.lang.Signature;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.probe.tracemanagement.TraceRegistry;
import explorviz.live_trace_processing.reader.IPeriodicTimeSignalReceiver;
import explorviz.live_trace_processing.reader.TimeSignalReader;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.system_mon.SystemMonitor;
import explorviz.live_trace_processing.writer.TCPWriter;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class MonitoringController {
public class MonitoringController implements IPeriodicTimeSignalReceiver {
private static final SystemMonitor systemMonitor;
private static final ExecutorService exec;
private static final Map<Signature, Integer> signatureReg = new ConcurrentSkipListMap<Signature, Integer>(
new SignatureComperator());
private static volatile boolean monitoringEnabled = true;
// private static RemoteConfigurationServlet remoteConfigurationServlet;
private static boolean initialized = false;
private static Map<Thread, Queue<ByteBufferEvent>> threadToQueueMap = new ConcurrentHashMap<Thread, Queue<ByteBufferEvent>>();
private static PipesMerger<ByteBufferEvent> tcpPipesMerger;
static {
// remoteConfigurationServlet = new RemoteConfigurationServlet();
// new Thread(remoteConfigurationServlet).start();
......@@ -43,19 +54,17 @@ public class MonitoringController {
TraceRegistry.init(configuration);
exec = Executors.newCachedThreadPool();
final boolean androidMonitoring = configuration
.getBooleanProperty(ConfigurationFactory.ANDROID_MONITORING);
@SuppressWarnings("unchecked")
final TCPWriter tcpWriter = new TCPWriter(
tcpPipesMerger = new PipesMerger<ByteBufferEvent>(configuration.getIntProperty(
ConfigurationFactory.MONITORING_CONTROLLER_DISRUPTOR_SIZE, 32));
final TCPWriter tcpWriter = new TCPWriter(tcpPipesMerger,
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT, 10133),
androidMonitoring, configuration);
// TODO bind tcpWriter
final boolean loadBalancerEnabled = configuration
.getBooleanProperty(ConfigurationFactory.LOAD_BALANCER_ENABLED);
......@@ -68,9 +77,9 @@ public class MonitoringController {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpWriter);
tcpWriter);
} else {
try {
tcpWriter.connect();
......@@ -80,6 +89,8 @@ public class MonitoringController {
}
}
tcpWriter.start();
if (systemMonitorEnabled) {
systemMonitor = new SystemMonitor(1000);
systemMonitor.start();
......@@ -87,9 +98,15 @@ public class MonitoringController {
systemMonitor = null;
}
new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), new MonitoringController()).start();
initialized = true;
}
private MonitoringController() {
}
public static final boolean isMonitoringEnabled() {
return monitoringEnabled;
}
......@@ -105,10 +122,29 @@ public class MonitoringController {
}
buffer.flip();
// TODO sent and recreate it!
final ByteBufferEvent bufferEvent = new ByteBufferEvent();
final ByteBuffer newBuffer = bufferEvent.getValue();
newBuffer.put(buffer);
putIntoQueue(bufferEvent);
buffer.clear();
}
private static final void putIntoQueue(final ByteBufferEvent bufferEvent) {
Queue<ByteBufferEvent> queue = threadToQueueMap.get(Thread.currentThread());
if (queue == null) {
queue = tcpPipesMerger.registerProducer();
threadToQueueMap.put(Thread.currentThread(), queue);
}
while (!queue.offer(bufferEvent)) {
LockSupport.parkNanos(1);
}
}
public static final Integer getIdForSignature(final Signature sig) {
Integer result = signatureReg.get(sig);
if (result == null) {
......@@ -127,7 +163,8 @@ public class MonitoringController {
buffer.putDouble(record.getCpuUtilization());
buffer.putLong(record.getUsedRAM());
buffer.putLong(record.getAbsoluteRAM());
// TODO sent it! flip it?
putIntoQueue(byteBufferEvent);
}
public static void shutdown() {
......@@ -135,22 +172,14 @@ public class MonitoringController {
@Override
public void run() {
// wait 1 sec before closing disruptor
try {
Thread.sleep(1000);
Thread.sleep(2000);
} catch (final InterruptedException e1) {
}
// remoteConfigurationServlet.stop();
// TODO terminate merger
exec.shutdown();
try {
exec.awaitTermination(30, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
}
tcpPipesMerger.terminate();
if (systemMonitor != null) {
systemMonitor.shutdown();
......@@ -158,6 +187,18 @@ public class MonitoringController {
}
}).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
final Iterator<Entry<Thread, Queue<ByteBufferEvent>>> iterator = threadToQueueMap
.entrySet().iterator();
while (iterator.hasNext()) {
final Entry<Thread, Queue<ByteBufferEvent>> threadEntry = iterator.next();
if (!threadEntry.getKey().isAlive()) {
tcpPipesMerger.deregisterProducer(threadEntry.getValue());
}
}
}
}
final class SignatureComperator implements Comparator<Signature>, Serializable {
......
......@@ -33,7 +33,7 @@ public abstract class AbstractAspect {
private static final ThreadLocalLastSendingTime lastSendingTime = new ThreadLocalLastSendingTime();
private static final AdaptiveMonitoring adaptiveMonitoring = new AdaptiveMonitoring();
@Pointcut("!within(explorviz..*) && !within(com.lmax..*)")
@Pointcut("!within(explorviz..*) && !within(org.jctools..*)")
public void notWithinExplorViz() {
}
......@@ -58,7 +58,7 @@ public abstract class AbstractAspect {
AfterFailedOperationEventRecord.CLAZZ_ID,
AfterOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID,
AfterOperationEventRecord.CLAZZ_ID, System.identityHashCode(thisObject), thisObject
.getClass().getName(), getInterface(thisJoinPoint));
.getClass().getName(), getInterface(thisJoinPoint));
}
@Around("monitoredOperation() && !this(java.lang.Object) && notWithinExplorViz()")
......@@ -75,7 +75,7 @@ public abstract class AbstractAspect {
AfterFailedStaticOperationEventRecord.CLAZZ_ID,
AfterStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID,
AfterStaticOperationEventRecord.CLAZZ_ID, 0, thisJoinPoint.getSignature()
.getDeclaringTypeName(), getInterface(thisJoinPoint));
.getDeclaringTypeName(), getInterface(thisJoinPoint));
}
@Around("monitoredConstructor() && this(thisObject) && notWithinExplorViz()")
......@@ -120,7 +120,7 @@ public abstract class AbstractAspect {
final int beforeLength, final byte beforeId, final int afterFailedLength,
final byte afterFailedId, final int afterLength, final byte afterId,
final int objectId, final String clazz, final String implementedInterface)
throws Throwable {
throws Throwable {
final ByteBuffer buffer = bufferStore.get();
final ProbeTraceMetaData trace = TraceRegistry.getTrace();
......
package explorviz.live_trace_processing.probe;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private final int messageBufferSize;
public final List<ByteBuffer> allBuffers = Collections
.synchronizedList(new ArrayList<ByteBuffer>());
public ThreadLocalByteBuffer(final int messageBufferSize) {
this.messageBufferSize = messageBufferSize;
}
@Override
protected ByteBuffer initialValue() {
final ByteBuffer result = ByteBuffer.allocateDirect(messageBufferSize);
allBuffers.add(result);
final ByteBuffer result = ByteBuffer.allocate(messageBufferSize);
return result;
}
}
package explorviz.live_trace_processing.system_mon;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.main.MonitoringController;
......@@ -14,24 +14,28 @@ public class SystemMonitor {
public SystemMonitor(final long periodInMilliSec) {
period = periodInMilliSec;
executorService = new ScheduledThreadPoolExecutor(1);
executorService = Executors.newSingleThreadScheduledExecutor();
}
public static void main(final String[] args) {
MonitoringController.isMonitoringEnabled(); // dummy access for static
// init
// init
}
public void start() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final double cpuUtil = SystemMonitorProbe.getSystemCpuLoad();
if (cpuUtil >= 0) { // fixes -1 bug in first seconds
final SystemMonitoringRecord record = new SystemMonitoringRecord(cpuUtil,
SystemMonitorProbe.getUsedPhysicalMemorySize(), SystemMonitorProbe
.getTotalPhysicalMemorySize(), null);
MonitoringController.sendOutSystemRecord(record);
try {
final double cpuUtil = SystemMonitorProbe.getSystemCpuLoad();
if (cpuUtil >= 0) { // fixes -1 bug in first seconds
final SystemMonitoringRecord record = new SystemMonitoringRecord(cpuUtil,
SystemMonitorProbe.getUsedPhysicalMemorySize(), SystemMonitorProbe
.getTotalPhysicalMemorySize(), null);
MonitoringController.sendOutSystemRecord(record);
}
} catch (final Exception e) {
e.printStackTrace();
}
}
}, 0, period, TimeUnit.MILLISECONDS);
......
......@@ -8,15 +8,18 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.locks.LockSupport;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.debug.DebugFileByteBufferWriter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.main.ByteBufferEvent;
import explorviz.live_trace_processing.main.MonitoringStringRegistry;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
public class TCPWriter implements IByteBufferReceiver, IWriter, Runnable {
public class TCPWriter extends Thread implements IPipeReceiver<ByteBufferEvent>, IWriter {
private final boolean debug;
private DebugFileByteBufferWriter debugWriter;
......@@ -38,13 +41,15 @@ public class TCPWriter implements IByteBufferReceiver, IWriter, Runnable {
private final boolean androidMonitoring;
private static String applicationName;
private final PipesMerger<ByteBufferEvent> merger;
public static String getApplicationName() {
return applicationName;
}
public TCPWriter(final String hostname, final int port, final boolean androidMonitoring,
final Configuration configuration) {
public TCPWriter(final PipesMerger<ByteBufferEvent> merger, final String hostname,
final int port, final boolean androidMonitoring, final Configuration configuration) {
this.merger = merger;
this.androidMonitoring = androidMonitoring;
this.configuration = configuration;
try {
......@@ -61,7 +66,7 @@ public class TCPWriter implements IByteBufferReceiver, IWriter, Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
merger.process(this);
}
@Override
......@@ -177,14 +182,12 @@ public class TCPWriter implements IByteBufferReceiver, IWriter, Runnable {
}
@Override
public void onEvent(final ByteBufferEvent event) {
public void processEvent(final ByteBufferEvent event) {
final ByteBuffer buffer = event.getValue();
buffer.flip();
while ((socketChannel == null) || (!socketChannel.isConnected()) || !metaDataSent) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
LockSupport.parkNanos(1);
if (androidMonitoring) {
try {
connect();
......
......@@ -26,7 +26,7 @@ public class TCPWriterTest {
@Test
public void testSetProviderURL() throws Exception {
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10133, false,
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10133, false,
ConfigurationFactory.createSingletonConfiguration());
assertEquals("127.0.0.1", tcpWriter.getProviderURL().getHost());
assertEquals(10133, tcpWriter.getProviderURL().getPort());
......@@ -38,14 +38,14 @@ public class TCPWriterTest {
@Test
public void testInit() throws Exception {
MonitoringStringRegistry.reset();
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10100, false,
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10100, false,
ConfigurationFactory.createSingletonConfiguration());
tcpWriter.init();
assertEquals(3,
(int) MonitoringStringRegistry
.getIdForStringWithoutSending("<UNKNOWN-APPLICATION>"));
.getIdForStringWithoutSending("<UNKNOWN-APPLICATION>"));
assertEquals(2, (int) MonitoringStringRegistry.getIdForStringWithoutSending(HostnameFetcher
.getHostname()));
}
......@@ -88,7 +88,7 @@ public class TCPWriterTest {
@Test
public void testOnEvent() throws Exception {
MonitoringStringRegistry.reset();
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10144, false,
final TCPWriter tcpWriter = new TCPWriter(null, "127.0.0.1", 10144, false,
ConfigurationFactory.createSingletonConfiguration());
tcpWriter.init();
......@@ -127,12 +127,12 @@ public class TCPWriterTest {
buffer.putLong(0);
buffer.putInt(1);
tcpWriter.onEvent(bufferEvent);
// tcpWriter.onEvent(bufferEvent);
Thread.sleep(100);
// Thread.sleep(100);
readBuffer.flip();
assertEquals(164, readBuffer.limit());
// readBuffer.flip();
// assertEquals(164, readBuffer.limit());
thread.interrupt();
}
......
package explorviz.monitoring.main;
import java.util.ArrayList;
import java.util.List;
import testpackage.TestClass;
import explorviz.live_trace_processing.main.MonitoringController;
public class MonitoringStarter {
public static void main(final String[] args) {
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
......@@ -15,9 +20,19 @@ public class MonitoringStarter {
testClass.testMethod(9);
}
}
}).start();
});
thread.start();
threads.add(thread);
}
for (final Thread t : threads) {
try {
t.join();
} catch (final InterruptedException e) {
}
}
// MonitoringController.shutdown();
MonitoringController.shutdown();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment