Commit b0b47170 authored by Florian Fittkau's avatar Florian Fittkau

WiP

parent 6597f68d
......@@ -16,16 +16,24 @@ public class MonitoringStringRegistry {
private static final ConcurrentHashMap<String, Integer> stringReg = new ConcurrentHashMap<String, Integer>();
private static final AtomicInteger stringRegIndex = new AtomicInteger(1);
private static final ByteBuffer sendOutAllStringRecordsBuffer = ByteBuffer
.allocate(Constants.SENDING_BUFFER_SIZE);
public static final Integer getIdForString(final String value) {
Integer result = stringReg.get(value);
final Integer result = stringReg.get(value);
if (result == null) {
result = stringRegIndex.getAndIncrement();
final Integer wasAbsent = stringReg.putIfAbsent(value, result);
final Integer perhapsNewIndex = stringRegIndex.getAndIncrement();
final Integer wasPresent = stringReg.putIfAbsent(value, perhapsNewIndex);
if (wasAbsent == null) {
final ByteBuffer stringRegistryRecord = buildStringRegistryRecord(value, result);
if (wasPresent == null) {
final ByteBuffer stringRegistryRecord = buildStringRegistryRecord(value,
perhapsNewIndex);
MonitoringController.sendOutBuffer(stringRegistryRecord);
return perhapsNewIndex;
} else {
return wasPresent;
}
}
......@@ -68,13 +76,12 @@ public class MonitoringStringRegistry {
stringReg.entrySet());
int currentIndex = 0;
int lastWrittenIndex = 0;
final ByteBuffer buffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
final ByteBuffer buffer = sendOutAllStringRecordsBuffer;
while (currentIndex < keyValues.size()) {
final Entry<String, Integer> entry = keyValues.get(currentIndex);
final byte[] keyAsBytes = entry.getKey().getBytes();
while (buffer.remaining() >= (keyAsBytes.length + StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID)) {
while (buffer.remaining() >= (keyValues.get(currentIndex).getKey().getBytes().length + StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID)) {
final Entry<String, Integer> entry = keyValues.get(currentIndex);
final byte[] keyAsBytes = entry.getKey().getBytes();
buffer.put(StringRegistryRecord.CLAZZ_ID);
buffer.putInt(entry.getValue());
buffer.putInt(keyAsBytes.length);
......@@ -90,7 +97,7 @@ public class MonitoringStringRegistry {
.println("No progress in sendOutAllStringRegistry. Buffer size: "
+ buffer.remaining()
+ " and wanted to write: "
+ (keyAsBytes.length + StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID));
+ (keyValues.get(currentIndex).getKey().getBytes().length + StringRegistryRecord.BYTE_LENGTH_WITHOUT_STRING_WITH_CLAZZ_ID));
}
lastWrittenIndex = currentIndex;
......
......@@ -13,7 +13,6 @@ import com.lmax.disruptor.EventHandler;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.debug.DebugFileByteBufferWriter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.main.ByteBufferEvent;
import explorviz.live_trace_processing.main.MonitoringStringRegistry;
......@@ -22,9 +21,6 @@ import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecor
public class TCPWriter implements IPipeReceiver<ByteBufferEvent>, IWriter,
EventHandler<ByteBufferEvent> {
private final boolean debug;
private DebugFileByteBufferWriter debugWriter;
private URL providerURL;
private SocketChannel socketChannel;
......@@ -43,6 +39,9 @@ EventHandler<ByteBufferEvent> {
private static String applicationName;
private final ByteBuffer bufferForMetaData = ByteBuffer
.allocate(HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
public static String getApplicationName() {
return applicationName;
}
......@@ -57,10 +56,6 @@ EventHandler<ByteBufferEvent> {
e.printStackTrace();
}
debug = configuration.getBooleanProperty(ConfigurationFactory.DEBUG);
if (debug) {
debugWriter = new DebugFileByteBufferWriter("debug.trace");
}
}
@Override
......@@ -151,8 +146,6 @@ EventHandler<ByteBufferEvent> {
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort()));
final ByteBuffer bufferForMetaData = ByteBuffer
.allocateDirect(HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
bufferForMetaData.put(HostApplicationMetaDataRecord.CLAZZ_ID);
bufferForMetaData.putInt(systemnameId);
......@@ -162,11 +155,19 @@ EventHandler<ByteBufferEvent> {
bufferForMetaData.putInt(languageId);
bufferForMetaData.flip();
while (!socketChannel.isConnected()) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
if (socketChannel.isConnected()) {
socketChannel.write(bufferForMetaData);
MonitoringStringRegistry.sendOutAllStringRegistryRecords(socketChannel);
metaDataSent = true;
} else {
System.out.println("SEVERE: Could not send meta data buffer");
}
}
......@@ -187,9 +188,6 @@ EventHandler<ByteBufferEvent> {
}
}
}
if (debug) {
debugWriter.writeBuffer(buffer);
}
send(buffer);
buffer.clear();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment