Commit dd9a7e86 authored by Florian Fittkau's avatar Florian Fittkau

major refactoring

added CPU and memory monitoring
parent 78e1a50f
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.jdt.launching.localJavaApplication">
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/monitored-application/test/testpackage/TestStart.java"/>
<listEntry value="/monitored-application/test/explorviz/monitoring/main/MonitoringStarter.java"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="1"/>
......@@ -9,7 +9,7 @@
<listAttribute key="org.eclipse.debug.ui.favoriteGroups">
<listEntry value="org.eclipse.debug.ui.launchGroup.run"/>
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="testpackage.TestStart"/>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.monitoring.main.MonitoringStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="monitored-application"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-javaagent:lib/aspectjweaver-1.7.4.jar -Xmx4G -Dkieker.monitoring.writer=kieker.monitoring.writer.tcp.TCPDisruptorWriter"/>
</launchConfiguration>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.jdt.launching.localJavaApplication">
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/monitored-application/test/testpackage/TestStart.java"/>
<listEntry value="/monitored-application/test/explorviz/monitoring/main/MonitoringStarter.java"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="1"/>
......@@ -9,7 +9,7 @@
<listAttribute key="org.eclipse.debug.ui.favoriteGroups">
<listEntry value="org.eclipse.debug.ui.launchGroup.run"/>
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="testpackage.TestStart"/>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.monitoring.main.MonitoringStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="monitored-application"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-javaagent:lib/aspectjweaver-1.7.4.jar -Xmx4G"/>
</launchConfiguration>
......@@ -6,7 +6,7 @@
</weaver>
<aspects>
<concrete-aspect name="explorviz.hpc_monitoring.probe.TargetedAspect" extends="explorviz.hpc_monitoring.probe.AbstractAspect">
<concrete-aspect name="explorviz.live_trace_processing.probe.TargetedAspect" extends="explorviz.live_trace_processing.probe.AbstractAspect">
<pointcut name="monitoredOperation" expression="execution(* testpackage.TestClass*.*(..))" />
</concrete-aspect>
</aspects>
......
package explorviz.hpc_monitoring.writer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URL;
final class LoadBalancerThread extends Thread {
private final URL loadBalancerProviderURL;
private final int timeIntervalToWait;
private final TCPWriterWithLoadBalancer belongingWriter;
public LoadBalancerThread(final URL loadBalancerProviderURL, final int timeIntervalToWait,
final TCPWriterWithLoadBalancer belongingWriter) {
this.loadBalancerProviderURL = loadBalancerProviderURL;
this.timeIntervalToWait = timeIntervalToWait;
this.belongingWriter = belongingWriter;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
final URL newProviderURL = fetchNewProviderURL();
belongingWriter.setProvider(newProviderURL);
Thread.sleep(timeIntervalToWait);
} catch (final InterruptedException e) {
return;
} catch (final IOException e) {
return;
}
}
}
private URL fetchNewProviderURL() throws IOException, UnsupportedEncodingException {
final BufferedReader in = new BufferedReader(new InputStreamReader(
loadBalancerProviderURL.openStream(), "UTF-8"));
final String newProviderURL = in.readLine();
in.close();
return new URL(newProviderURL);
}
}
package explorviz.hpc_monitoring.writer;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import explorviz.hpc_monitoring.configuration.Configuration;
public class TCPWriterWithLoadBalancer extends TCPWriter {
private URL loadBalancerURL;
private final int loadBalancerWaitTimeInMillis;
private LoadBalancerThread loadBalanceThread;
public TCPWriterWithLoadBalancer(final String loadBalancerHostname, final int loadBalancerPort,
final int loadBalancerWaitTimeInMillis, final Configuration configuration) {
super(configuration);
try {
loadBalancerURL = new URL("http://" + loadBalancerHostname + ":" + loadBalancerPort);
} catch (final MalformedURLException e) {
e.printStackTrace();
}
this.loadBalancerWaitTimeInMillis = loadBalancerWaitTimeInMillis;
createLoadBalancer();
}
private void createLoadBalancer() {
setProviderURL(null);
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
loadBalanceThread = new LoadBalancerThread(loadBalancerURL, loadBalancerWaitTimeInMillis,
this);
loadBalanceThread.start();
synchronized (this) {
while (getProviderURL() == null) {
try {
this.wait();
} catch (final InterruptedException e) {
return;
}
}
}
}
@Override
public final void cleanup() {
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
disconnect();
}
public final void setProvider(final URL newProviderURL) {
synchronized (this) {
if (!newProviderURL.getHost().equals(getProviderURL().getHost())
|| (newProviderURL.getPort() != getProviderURL().getPort())) {
disconnect();
try {
setProviderURL(newProviderURL);
connect();
notifyAll();
} catch (final IOException e) {
setProviderURL(null);
e.printStackTrace();
}
}
}
}
}
package explorviz.live_trace_processing.main;
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventFactory;
import explorviz.live_trace_processing.Constants;
/**
* WARNING: This is a mutable object which will be recycled by the RingBuffer.
* You must take a copy of data it holds before the framework recycles it.
*/
public final class ByteBufferEvent {
private ByteBuffer value = ByteBuffer
.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE);
public final ByteBuffer getValue() {
return value;
}
public void setValue(final ByteBuffer value) {
this.value = value;
}
public final static EventFactory<ByteBufferEvent> EVENT_FACTORY = new EventFactory<ByteBufferEvent>() {
@Override
public ByteBufferEvent newInstance() {
return new ByteBufferEvent();
}
};
}
\ No newline at end of file
package explorviz.hpc_monitoring;
package explorviz.live_trace_processing.main;
import java.io.Serializable;
import java.nio.ByteBuffer;
......@@ -16,11 +16,9 @@ import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import explorviz.hpc_monitoring.configuration.Configuration;
import explorviz.hpc_monitoring.configuration.ConfigurationFactory;
import explorviz.hpc_monitoring.disruptor.ByteBufferEvent;
import explorviz.hpc_monitoring.helper.SignatureToStringConverter;
import explorviz.hpc_monitoring.writer.TCPWriter;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.writer.TCPWriter;
public class MonitoringController {
private static final RingBuffer<ByteBufferEvent> ringBuffer;
......
package explorviz.hpc_monitoring.helper;
package explorviz.live_trace_processing.main;
import java.lang.reflect.Modifier;
......
package explorviz.hpc_monitoring;
package explorviz.live_trace_processing.main;
import java.nio.ByteBuffer;
import java.util.Map;
......@@ -6,7 +6,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import explorviz.hpc_monitoring.record.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
public class StringRegistry {
private static final Map<String, Integer> stringReg = new ConcurrentSkipListMap<String, Integer>();
......
package explorviz.hpc_monitoring.probe;
package explorviz.live_trace_processing.probe;
import java.nio.ByteBuffer;
......@@ -7,14 +7,15 @@ import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import explorviz.hpc_monitoring.Constants;
import explorviz.hpc_monitoring.MonitoringController;
import explorviz.hpc_monitoring.StringRegistry;
import explorviz.hpc_monitoring.reader.TimeProvider;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
import explorviz.hpc_monitoring.threadlocal.ThreadLocalByteBuffer;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.main.MonitoringController;
import explorviz.live_trace_processing.main.StringRegistry;
import explorviz.live_trace_processing.probe.tracemanagement.ProbeTraceMetaData;
import explorviz.live_trace_processing.probe.tracemanagement.TraceRegistry;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord;
@Aspect
public abstract class AbstractAspect {
......@@ -43,11 +44,11 @@ public abstract class AbstractAspect {
final int signatureId = MonitoringController
.getIdForSignature(thisJoinPoint.getSignature());
if (BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
if (BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
MonitoringController.sendOutBuffer(buffer);
}
buffer.put(BeforeOperationEvent.CLAZZ_ID);
buffer.put(BeforeOperationEventRecord.CLAZZ_ID);
buffer.putLong(TimeProvider.getCurrentTimestamp());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
......@@ -58,11 +59,11 @@ public abstract class AbstractAspect {
try {
retval = thisJoinPoint.proceed();
} catch (final Throwable th) {
if (AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
if (AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
MonitoringController.sendOutBuffer(buffer);
}
buffer.put(AfterFailedOperationEvent.CLAZZ_ID);
buffer.put(AfterFailedOperationEventRecord.CLAZZ_ID);
buffer.putLong(TimeProvider.getCurrentTimestamp());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
......@@ -78,11 +79,11 @@ public abstract class AbstractAspect {
throw th;
}
if (AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
if (AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID > buffer.remaining()) {
MonitoringController.sendOutBuffer(buffer);
}
buffer.put(AfterOperationEvent.CLAZZ_ID);
buffer.put(AfterOperationEventRecord.CLAZZ_ID);
buffer.putLong(TimeProvider.getCurrentTimestamp());
buffer.putLong(trace.getTraceId());
buffer.putInt(trace.getNextOrderId());
......
package explorviz.live_trace_processing.probe;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private final int messageBufferSize;
public final List<ByteBuffer> allBuffers = Collections
.synchronizedList(new ArrayList<ByteBuffer>());
public ThreadLocalByteBuffer(final int messageBufferSize) {
this.messageBufferSize = messageBufferSize;
}
@Override
protected ByteBuffer initialValue() {
final ByteBuffer result = ByteBuffer.allocateDirect(messageBufferSize);
allBuffers.add(result);
return result;
}
}
package explorviz.hpc_monitoring.probe;
package explorviz.live_trace_processing.probe.tracemanagement;
public class ProbeTraceMetaData {
private long traceId;
......
package explorviz.hpc_monitoring.probe;
package explorviz.live_trace_processing.probe.tracemanagement;
public final class ThreadLocalProbeTraceMetaData extends
......
package explorviz.hpc_monitoring.probe;
package explorviz.live_trace_processing.probe.tracemanagement;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
......
package explorviz.live_trace_processing.system_mon;
import java.lang.management.ManagementFactory;
import com.sun.management.OperatingSystemMXBean;
public class SystemMonitor {
private final static OperatingSystemMXBean osBean = ManagementFactory
.getPlatformMXBean(OperatingSystemMXBean.class);
public static double getSystemCpuLoad() {
return osBean.getSystemCpuLoad();
}
public static long getTotalPhysicalMemorySize() {
return osBean.getTotalPhysicalMemorySize();
}
public static long getFreePhysicalMemorySize() {
return osBean.getFreePhysicalMemorySize();
}
public static long getUsedPhysicalMemorySize() {
return osBean.getTotalPhysicalMemorySize() - osBean.getFreePhysicalMemorySize();
}
}
package explorviz.hpc_monitoring.writer;
package explorviz.live_trace_processing.writer;
import java.io.IOException;
import java.net.InetSocketAddress;
......@@ -10,13 +10,14 @@ import java.nio.channels.SocketChannel;
import com.lmax.disruptor.EventHandler;
import explorviz.hpc_monitoring.StringRegistry;
import explorviz.hpc_monitoring.configuration.Configuration;
import explorviz.hpc_monitoring.configuration.ConfigurationFactory;
import explorviz.hpc_monitoring.disruptor.ByteBufferEvent;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.main.ByteBufferEvent;
import explorviz.live_trace_processing.main.StringRegistry;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.writer.IWriter;
public class TCPWriter implements EventHandler<ByteBufferEvent> {
public class TCPWriter implements EventHandler<ByteBufferEvent>, IWriter {
private URL providerURL;
private SocketChannel socketChannel;
......@@ -40,11 +41,13 @@ public class TCPWriter implements EventHandler<ByteBufferEvent> {
providerURL = null;
}
protected URL getProviderURL() {
@Override
public URL getProviderURL() {
return providerURL;
}
protected void setProviderURL(final URL providerURL) {
@Override
public void setProviderURL(final URL providerURL) {
this.providerURL = providerURL;
}
......@@ -73,13 +76,13 @@ public class TCPWriter implements EventHandler<ByteBufferEvent> {
applicationId = StringRegistry.getIdForString(applicationName);
}
protected void connect() throws IOException {
public void connect() throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort()));
final ByteBuffer bufferForMetaData = ByteBuffer
.allocateDirect(HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID);
.allocateDirect(HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
bufferForMetaData.put(HostApplicationMetaData.CLAZZ_ID);
bufferForMetaData.put(HostApplicationMetaDataRecord.CLAZZ_ID);
bufferForMetaData.putInt(hostnameId);
bufferForMetaData.putInt(applicationId);
bufferForMetaData.flip();
......@@ -113,7 +116,7 @@ public class TCPWriter implements EventHandler<ByteBufferEvent> {
}
}
protected final void disconnect() {
public final void disconnect() {
if (socketChannel.isConnected()) {
try {
socketChannel.close();
......
package testpackage;
package explorviz.monitoring.main;
public class TestStart {
import testpackage.TestClass;
public class MonitoringStarter {
public static void main(final String[] args) {
for (int i = 0; i < 4; i++) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment