Commit 69c482e7 authored by Florian Fittkau's avatar Florian Fittkau

refactoring and removed some TODOs

parent b0d07464
......@@ -266,7 +266,7 @@ org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
org.eclipse.jdt.core.formatter.lineSplit=80
org.eclipse.jdt.core.formatter.lineSplit=100
org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
......
explorviz.hpc_monitoring.host_name=
explorviz.hpc_monitoring.application_name=
explorviz.hpc_monitoring.monitoring_enabled=true
......@@ -7,7 +7,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.aspectj.lang.Signature;
......@@ -15,34 +14,32 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.configuration.Configuration;
import explorviz.hpc_monitoring.configuration.ConfigurationFactory;
import explorviz.hpc_monitoring.disruptor.ByteBufferEvent;
import explorviz.hpc_monitoring.helper.SignatureToStringConverter;
import explorviz.hpc_monitoring.threadlocal.ThreadLocalByteBuffer;
import explorviz.hpc_monitoring.writer.TCPWriter;
public class MonitoringController {
private static final int MESSAGE_BUFFER_SIZE = 131072;
public static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE); // TODO flush after 1 sec
private static final RingBuffer<ByteBufferEvent> ringBuffer;
private static final Map<String, Integer> stringReg = new ConcurrentSkipListMap<String, Integer>();
private static final AtomicInteger stringRegIndex = new AtomicInteger(0);
private static final Map<Signature, Integer> signatureReg = new ConcurrentSkipListMap<Signature, Integer>(
new SignatureComperator());
private static volatile boolean monitoringEnabled = true;
static {
final Configuration configuration = ConfigurationFactory.createSingletonConfiguration();
setMonitoringEnabled(configuration
.getBooleanProperty(ConfigurationFactory.MONITORING_ENABLED));
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<ByteBufferEvent> disruptor = new Disruptor<ByteBufferEvent>(
ByteBufferEvent.EVENT_FACTORY, 256, exec);
@SuppressWarnings("unchecked")
final EventHandler<ByteBufferEvent>[] eventHandlers = new EventHandler[1];
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10133);
final TCPWriter tcpWriter = new TCPWriter("127.0.0.1", 10133, configuration);
eventHandlers[0] = tcpWriter;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
......@@ -54,8 +51,7 @@ public class MonitoringController {
return monitoringEnabled;
}
public static final void setMonitoringEnabled(
final boolean monitoringEnabled) {
public static final void setMonitoringEnabled(final boolean monitoringEnabled) {
MonitoringController.monitoringEnabled = monitoringEnabled;
}
......@@ -68,6 +64,10 @@ public class MonitoringController {
buffer.clear();
}
public static void sendOutBufferWithoutCopy(final ByteBuffer buffer) {
putInRingbuffer(buffer);
}
private static final void putInRingbuffer(final ByteBuffer message) {
final long hiseq = ringBuffer.next();
final ByteBufferEvent valueEvent = ringBuffer.get(hiseq);
......@@ -78,42 +78,12 @@ public class MonitoringController {
public static final Integer getIdForSignature(final Signature sig) {
Integer result = signatureReg.get(sig);
if (result == null) {
final String value = SignatureToStringConverter
.signatureToLongString(sig);
result = getIdForString(value);
final String value = SignatureToStringConverter.signatureToLongString(sig);
result = StringRegistry.getIdForString(value);
signatureReg.put(sig, result);
}
return result;
}
public static final Integer getIdForString(final String value) {
Integer result = stringReg.get(value);
if (result == null) {
result = stringRegIndex.getAndIncrement();
stringReg.put(value, result);
final ByteBuffer stringRegistryRecord = buildStringRegistryRecord(
value, result);
putInRingbuffer(stringRegistryRecord);
}
return result;
}
private static ByteBuffer buildStringRegistryRecord(final String value,
final int result) {
final int regRecordLength = 1 + 4 + 4 + value.length();
final ByteBuffer buffer = ByteBuffer.allocateDirect(regRecordLength);
buffer.put((byte) 4);
buffer.putInt(result);
buffer.putInt(value.length());
buffer.put(value.getBytes());
buffer.flip();
return buffer;
}
}
final class SignatureComperator implements Comparator<Signature>, Serializable {
......
package explorviz.hpc_monitoring;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import explorviz.hpc_monitoring.record.StringRegistryRecord;
public class StringRegistry {
private static final Map<String, Integer> stringReg = new ConcurrentSkipListMap<String, Integer>();
private static final AtomicInteger stringRegIndex = new AtomicInteger(0);
public static final Integer getIdForString(final String value) {
Integer result = stringReg.get(value);
if (result == null) {
result = stringRegIndex.getAndIncrement();
stringReg.put(value, result);
final ByteBuffer stringRegistryRecord = buildStringRegistryRecord(
value, result);
MonitoringController.sendOutBufferWithoutCopy(stringRegistryRecord);
}
return result;
}
private static ByteBuffer buildStringRegistryRecord(final String value,
final int result) {
final int regRecordLength = StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID
+ value.length();
final ByteBuffer buffer = ByteBuffer.allocateDirect(regRecordLength);
buffer.put(StringRegistryRecord.CLAZZ_ID);
buffer.putInt(result);
buffer.putInt(value.length());
buffer.put(value.getBytes());
buffer.flip();
return buffer;
}
public static final ByteBuffer getAllStringRegistryRecords() {
int regRecordLengthTotal = 0;
for (final String key : stringReg.keySet()) {
regRecordLengthTotal += StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID
+ key.length();
}
final ByteBuffer buffer = ByteBuffer
.allocateDirect(regRecordLengthTotal);
for (final Entry<String, Integer> entry : stringReg.entrySet()) {
buffer.put(StringRegistryRecord.CLAZZ_ID);
buffer.putInt(entry.getValue());
buffer.putInt(entry.getKey().length());
buffer.put(entry.getKey().getBytes());
}
buffer.flip();
return buffer;
}
}
......@@ -8,13 +8,19 @@ import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import explorviz.hpc_monitoring.MonitoringController;
import explorviz.hpc_monitoring.StringRegistry;
import explorviz.hpc_monitoring.reader.TimeProvider;
import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent;
import explorviz.hpc_monitoring.threadlocal.ThreadLocalByteBuffer;
@Aspect
public abstract class AbstractAspect {
private static final int MESSAGE_BUFFER_SIZE = 131072;
public static final ThreadLocalByteBuffer bufferStore = new ThreadLocalByteBuffer(
MESSAGE_BUFFER_SIZE); // TODO flush after 1 sec
@Pointcut
public abstract void monitoredOperation();
......@@ -25,7 +31,7 @@ public abstract class AbstractAspect {
return thisJoinPoint.proceed();
}
final ByteBuffer buffer = MonitoringController.bufferStore.get();
final ByteBuffer buffer = bufferStore.get();
final ProbeTraceMetaData trace = TraceRegistry.getTrace();
final boolean newTrace = trace.isNewTrace();
......@@ -62,8 +68,7 @@ public abstract class AbstractAspect {
buffer.putInt(trace.getNextOrderId());
buffer.putInt(signatureId); // TODO might not be needed
final int errorId = MonitoringController.getIdForString(th
.toString());
final int errorId = StringRegistry.getIdForString(th.toString());
buffer.putInt(errorId);
if (newTrace) {
......
package explorviz.hpc_monitoring.writer;
import java.io.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URL;
final class LoadBalancerThread extends Thread {
private final URL loadBalancerProviderURL;
private final int timeIntervalToWait;
private final TCPWriter belongingWriter;
private final URL loadBalancerProviderURL;
private final int timeIntervalToWait;
private final TCPWriterWithLoadBalancer belongingWriter;
public LoadBalancerThread(final URL loadBalancerProviderURL,
final int timeIntervalToWait, final TCPWriter belongingWriter) {
this.loadBalancerProviderURL = loadBalancerProviderURL;
this.timeIntervalToWait = timeIntervalToWait;
this.belongingWriter = belongingWriter;
}
public LoadBalancerThread(final URL loadBalancerProviderURL, final int timeIntervalToWait,
final TCPWriterWithLoadBalancer belongingWriter) {
this.loadBalancerProviderURL = loadBalancerProviderURL;
this.timeIntervalToWait = timeIntervalToWait;
this.belongingWriter = belongingWriter;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
final URL newProviderURL = fetchNewProviderURL();
@Override
public void run() {
while (!Thread.interrupted()) {
try {
final URL newProviderURL = fetchNewProviderURL();
belongingWriter.setProvider(newProviderURL);
Thread.sleep(timeIntervalToWait);
} catch (final InterruptedException e) {
return;
} catch (final IOException e) {
return;
}
}
}
belongingWriter.setProvider(newProviderURL);
Thread.sleep(timeIntervalToWait);
}
catch (final InterruptedException e) {
return;
}
catch (final IOException e) {
return;
}
}
}
private URL fetchNewProviderURL() throws IOException, UnsupportedEncodingException {
final BufferedReader in = new BufferedReader(new InputStreamReader(
loadBalancerProviderURL.openStream(), "UTF-8"));
final String newProviderURL = in.readLine();
in.close();
return new URL(newProviderURL);
}
private URL fetchNewProviderURL() throws IOException, UnsupportedEncodingException {
final BufferedReader in = new BufferedReader(new InputStreamReader(
loadBalancerProviderURL.openStream(), "UTF-8"));
final String newProviderURL = in.readLine();
in.close();
return new URL(newProviderURL);
}
}
......@@ -10,110 +10,89 @@ import java.nio.channels.SocketChannel;
import com.lmax.disruptor.EventHandler;
import explorviz.hpc_monitoring.MonitoringController;
import explorviz.hpc_monitoring.StringRegistry;
import explorviz.hpc_monitoring.configuration.Configuration;
import explorviz.hpc_monitoring.configuration.ConfigurationFactory;
import explorviz.hpc_monitoring.disruptor.ByteBufferEvent;
import explorviz.hpc_monitoring.record.HostApplicationMetadata;
import explorviz.hpc_monitoring.record.HostApplicationMetaData;
public class TCPWriter implements EventHandler<ByteBufferEvent> {
private URL providerURL;
private URL loadBalancerURL;
private final int loadBalancerWaitTimeInNs;
private LoadBalancerThread loadBalanceThread;
private SocketChannel socketChannel;
private int hostnameId;
private int applicationId;
public TCPWriter(final String hostname, final int port) {
private final Configuration configuration;
public TCPWriter(final String hostname, final int port, final Configuration configuration) {
this.configuration = configuration;
try {
providerURL = new URL("http://" + hostname + ":" + port);
setProviderURL(new URL("http://" + hostname + ":" + port));
} catch (final MalformedURLException e) {
e.printStackTrace();
}
}
protected TCPWriter(final Configuration configuration) {
this.configuration = configuration;
providerURL = null;
}
loadBalancerWaitTimeInNs = 0;
loadBalancerURL = null;
protected URL getProviderURL() {
return providerURL;
}
protected void setProviderURL(final URL providerURL) {
this.providerURL = providerURL;
}
public void init() {
try {
connect();
} catch (final IOException e) {
e.printStackTrace();
}
}
public void init() {
String hostnameTmp = ""; // TODO
// configuration.getStringProperty(ConfigurationFactory.HOST_NAME);
if (hostnameTmp.length() == 0) {
hostnameTmp = "<UNKNOWN>";
String hostname = configuration.getStringProperty(ConfigurationFactory.HOST_NAME);
if (hostname.isEmpty()) {
try {
hostnameTmp = java.net.InetAddress.getLocalHost().getHostName();
hostname = java.net.InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException ex) {
hostname = "<UNKNOWN-HOST>";
}
}
hostnameId = MonitoringController.getIdForString(hostnameTmp);
applicationId = MonitoringController
.getIdForString("MonitoredApplication");
}
hostnameId = StringRegistry.getIdForString(hostname);
public TCPWriter(final String hostname, final int port,
final String loadBalancerHostname, final int loadBalancerPort,
final int loadBalancerWaitTimeInNs) {
try {
providerURL = new URL("http://" + hostname + ":" + port);
loadBalancerURL = new URL("http://" + loadBalancerHostname + ":"
+ loadBalancerPort);
} catch (final MalformedURLException e) {
loadBalancerURL = null;
e.printStackTrace();
String applicationName = configuration
.getStringProperty(ConfigurationFactory.APPLICATION_NAME);
if (applicationName.isEmpty()) {
applicationName = "<UNKNOWN-APPLICATION>";
}
this.loadBalancerWaitTimeInNs = loadBalancerWaitTimeInNs;
createLoadBalancer();
applicationId = StringRegistry.getIdForString(applicationName);
}
private void createLoadBalancer() {
providerURL = null;
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
protected void connect() throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort()));
final ByteBuffer bufferForMetaData = ByteBuffer
.allocateDirect(HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID);
loadBalanceThread = new LoadBalancerThread(loadBalancerURL,
loadBalancerWaitTimeInNs, this);
loadBalanceThread.start();
synchronized (this) {
while (providerURL == null) {
try {
this.wait();
} catch (final InterruptedException e) {
return;
}
}
}
}
bufferForMetaData.put(HostApplicationMetaData.CLAZZ_ID);
bufferForMetaData.putInt(hostnameId);
bufferForMetaData.putInt(applicationId);
bufferForMetaData.flip();
private void connect() throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(providerURL
.getHost(), providerURL.getPort()));
final ByteBuffer buffer = ByteBuffer
.allocateDirect(HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.put(HostApplicationMetadata.CLAZZ_ID);
buffer.putInt(hostnameId);
buffer.putInt(applicationId);
buffer.flip();
// TODO send also the string registry!
if (socketChannel.isConnected()) {
socketChannel.write(buffer);
socketChannel.write(bufferForMetaData);
socketChannel.write(StringRegistry.getAllStringRegistryRecords());
}
}
@Override
public void onEvent(final ByteBufferEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
public void onEvent(final ByteBufferEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
send(event.getValue());
}
......@@ -124,28 +103,14 @@ public class TCPWriter implements EventHandler<ByteBufferEvent> {
socketChannel.write(buffer);
}
} catch (final IOException e) {
System.out
.println("Error: Connection was closed - disabling monitoring"); // TODO
// reconnect
disconnectAndDisableMonitoring();
System.out.println("WARNING: Connection was closed");
// TODO reconnect
disconnect();
}
}
}
public final void cleanup() {
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
disconnect();
}
private final void disconnectAndDisableMonitoring() {
MonitoringController.setMonitoringEnabled(false);
disconnect();
}
private final void disconnect() {
protected final void disconnect() {
if (socketChannel.isConnected()) {
try {
socketChannel.close();
......@@ -155,19 +120,7 @@ public class TCPWriter implements EventHandler<ByteBufferEvent> {
}
}
public final void setProvider(final URL newProviderURL) {
synchronized (this) {
if (!newProviderURL.getHost().equals(providerURL.getHost())
|| (newProviderURL.getPort() != providerURL.getPort())) {
disconnect();
try {
connect();
providerURL = newProviderURL;
notifyAll();
} catch (final IOException e) {
e.printStackTrace();
}
}
}
public void cleanup() {
disconnect();
}
}
package explorviz.hpc_monitoring.writer;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import explorviz.hpc_monitoring.configuration.Configuration;
public class TCPWriterWithLoadBalancer extends TCPWriter {
private URL loadBalancerURL;
private final int loadBalancerWaitTimeInMillis;
private LoadBalancerThread loadBalanceThread;
public TCPWriterWithLoadBalancer(final String loadBalancerHostname, final int loadBalancerPort,
final int loadBalancerWaitTimeInMillis, final Configuration configuration) {
super(configuration);
try {
loadBalancerURL = new URL("http://" + loadBalancerHostname + ":" + loadBalancerPort);
} catch (final MalformedURLException e) {
e.printStackTrace();
}
this.loadBalancerWaitTimeInMillis = loadBalancerWaitTimeInMillis;
createLoadBalancer();
}
private void createLoadBalancer() {
setProviderURL(null);
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
loadBalanceThread = new LoadBalancerThread(loadBalancerURL, loadBalancerWaitTimeInMillis,
this);
loadBalanceThread.start();
synchronized (this) {
while (getProviderURL() == null) {
try {
this.wait();
} catch (final InterruptedException e) {
return;
}
}
}
}
@Override
public final void cleanup() {
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
disconnect();
}
public final void setProvider(final URL newProviderURL) {
synchronized (this) {
if (!newProviderURL.getHost().equals(getProviderURL().getHost())
|| (newProviderURL.getPort() != getProviderURL().getPort())) {
disconnect();
try {
setProviderURL(newProviderURL);
connect();
notifyAll();
} catch (final IOException e) {
setProviderURL(null);
e.printStackTrace();
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment