Commit 83f07347 authored by Florian Fittkau's avatar Florian Fittkau

monitoring now working again after switch to jctools

parent 518b1ac3
......@@ -4,12 +4,7 @@
<classpathentry kind="src" path="test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
<classpathentry kind="lib" path="lib/jctools-core-1.0.jar">
<attributes>
<attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/jctools-core-1.0-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
</launchConfiguration>
......@@ -30,7 +30,6 @@
</manifest>
<fileset dir="${build.dir}" excludes="*common*.jar" />
<fileset dir="lib" includes="*LICENSE*" />
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
</jar>
......
......@@ -6,11 +6,14 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.locks.LockSupport;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.ISerializableRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
......@@ -28,11 +31,15 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
private final StringRegistry stringRegistry = new StringRegistry(this);
private final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.SENDING_BUFFER_SIZE);
private final ByteBuffer buffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
private volatile boolean shouldDisconnect = false;
public TCPConnector(final String hostname, final int port, final Configuration configuration) {
private final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector;
public TCPConnector(final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector,
final String hostname, final int port, final Configuration configuration) {
this.tcpConnectorConnector = tcpConnectorConnector;
buffer.clear();
try {
setProviderURL(new URL("http://" + hostname + ":" + port));
......@@ -41,6 +48,11 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
}
}
@Override
public void run() {
tcpConnectorConnector.process(this);
}
@Override
public URL getProviderURL() {
return providerURL;
......@@ -54,10 +66,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
@Override
public void connect() throws IOException {
while (shouldDisconnect) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
LockSupport.parkNanos(1);
}
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
......@@ -67,13 +76,14 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
@Override
public void sendOutStringRecord(final StringRegistryRecord record) {
final ByteBuffer stringBuffer = ByteBuffer.allocateDirect(record.getRecordSizeInBytes());
final ByteBuffer stringBuffer = ByteBuffer.allocate(record.getRecordSizeInBytes());
record.putIntoByteBuffer(stringBuffer, stringRegistry, this);
send(stringBuffer);
}
@Override
protected void processRecord(final IRecord record) {
System.out.println(record);
if (record instanceof ISerializableRecord) {
final ISerializableRecord serializableRecord = (ISerializableRecord) record;
if (buffer.remaining() < serializableRecord.getRecordSizeInBytes()) {
......@@ -94,10 +104,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
@Override
public void send(final ByteBuffer buffer) {
while ((socketChannel == null) || (!socketChannel.isConnected())) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
LockSupport.parkNanos(1);
}
try {
......
package explorviz.live_trace_processing.filter.counting;
import java.util.Queue;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEventRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -12,11 +14,20 @@ import explorviz.live_trace_processing.record.trace.Trace;
public class RecordCountingFilter extends AbstractFilter {
public RecordCountingFilter(final IPipeReceiver<RecordArrayEvent> receiver) {
super(receiver, 16, 64, "MethodCalls/10 sec", 1000 * 10);
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
public RecordCountingFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final Queue<RecordArrayEvent> receiver) {
super(receiver, 64, "MethodCalls/10 sec", 1000 * 10);
this.pipeConnector = pipeConnector;
counter.setEnabled(true);
}
@Override
public void run() {
pipeConnector.process(this);
}
@Override
public void processRecord(final IRecord record) {
if (record instanceof Trace) {
......
package explorviz.live_trace_processing.filter.counting;
import java.util.Queue;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
......@@ -10,11 +12,20 @@ import explorviz.live_trace_processing.record.trace.Trace;
public class TraceCountingFilter extends AbstractFilter {
public TraceCountingFilter(final IPipeReceiver<RecordArrayEvent> receiver) {
super(receiver, 16, 64, "TraceCalls/10 sec", 1000 * 10);
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
public TraceCountingFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final Queue<RecordArrayEvent> receiver) {
super(receiver, 64, "TraceCalls/10 sec", 1000 * 10);
this.pipeConnector = pipeConnector;
counter.setEnabled(true);
}
@Override
public void run() {
pipeConnector.process(this);
}
@Override
public void processRecord(final IRecord record) {
if (record instanceof Trace) {
......
package explorviz.live_trace_processing.filter.reconstruction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
public interface ITraceReconstruction extends IPipeReceiver<RecordArrayEvent> {
void setMerger(Merger<RecordArrayEvent> traceReconstructionMerger);
void setMerger(PipesMerger<RecordArrayEvent> traceReconstructionMerger);
}
......@@ -4,12 +4,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
......@@ -18,31 +18,24 @@ import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter implements
ITraceReconstruction, Runnable {
public final class TraceReconstructionFilter extends AbstractFilter implements Runnable {
private final long maxTraceTimeout;
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new TreeMap<Long, TraceReconstructionBuffer>();
private Merger<RecordArrayEvent> traceReconstructionMerger;
private final PipesMerger<RecordArrayEvent> traceReconstructionMerger;
public TraceReconstructionFilter(final long maxTraceTimeout,
final IPipeReceiver<RecordArrayEvent> traceReduction) {
super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec", 1000);
public TraceReconstructionFilter(final PipesMerger<RecordArrayEvent> traceReconstructionMerger,
final long maxTraceTimeout, final Queue<RecordArrayEvent> receiverQueue) {
super(receiverQueue, Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE,
"Reconstructed traces/sec", 1000);
this.traceReconstructionMerger = traceReconstructionMerger;
this.maxTraceTimeout = maxTraceTimeout;
}
@Override
public void run() {
traceReconstructionMerger.setReceiver(this);
traceReconstructionMerger.process();
}
@Override
public void setMerger(final Merger<RecordArrayEvent> traceReconstructionMerger) {
this.traceReconstructionMerger = traceReconstructionMerger;
traceReconstructionMerger.process(this);
}
@Override
......
......@@ -3,12 +3,13 @@ package explorviz.live_trace_processing.filter.reduction;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeEventRecord;
......@@ -25,14 +26,22 @@ public class TracesSummarizationFilter extends AbstractFilter {
private final Map<Trace, TracesSummarizationBuffer> trace2buffer = new TreeMap<Trace, TracesSummarizationBuffer>(
new TraceComperator());
public TracesSummarizationFilter(final long maxCollectionDuration,
final IPipeReceiver<RecordArrayEvent> sinkReceiver) {
super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec", 1000);
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
public TracesSummarizationFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final long maxCollectionDuration, final Queue<RecordArrayEvent> sinkReceiver) {
super(sinkReceiver, Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec",
1000);
this.pipeConnector = pipeConnector;
this.maxCollectionDuration = maxCollectionDuration;
}
@Override
public void run() {
pipeConnector.process(this);
}
@Override
public void processRecord(final IRecord record) {
if (record instanceof Trace) {
......@@ -84,7 +93,7 @@ public class TracesSummarizationFilter extends AbstractFilter {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
......@@ -103,7 +112,6 @@ public class TracesSummarizationFilter extends AbstractFilter {
if (traceBufferEntry.getValue().getBufferCreatedTimestamp() <= bufferTimeout) {
final Trace aggregatedTrace = traceBufferEntry.getValue().getAggregatedTrace();
deliver(aggregatedTrace);
iter.remove();
}
......
package explorviz.live_trace_processing.main;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reduction.TracesSummarizationFilter;
import explorviz.live_trace_processing.reader.TCPReader;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final IPipeReceiver<RecordArrayEvent> sink) {
final Queue<RecordArrayEvent> sink) {
final SinglePipeConnector<RecordArrayEvent> traceReductionConnector = new SinglePipeConnector<RecordArrayEvent>(
Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE);
final IPipeReceiver<RecordArrayEvent> traceReduction = new TracesSummarizationFilter(
TimeUnit.MILLISECONDS.toNanos(990), sink);
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.MILLISECONDS.toNanos(990),
sink).start();
final Merger<RecordArrayEvent> traceReconstructionMerger = new Merger<RecordArrayEvent>(
final PipesMerger<RecordArrayEvent> traceReconstructionMerger = new PipesMerger<RecordArrayEvent>(
Constants.TCP_READER_DISRUPTOR_SIZE);
final TraceReconstructionFilter traceReconstruction = new TraceReconstructionFilter(
TimeUnit.SECONDS.toNanos(4), traceReduction);
traceReconstruction.setMerger(traceReconstructionMerger);
new Thread(traceReconstruction).start();
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(4),
traceReductionConnector.registerProducer()).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
10133), traceReconstructionMerger).read();
......
package explorviz.live_trace_processing.main;
import java.io.IOException;
import java.util.Queue;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.filter.counting.RecordCountingFilter;
import explorviz.live_trace_processing.filter.counting.TraceCountingFilter;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
......@@ -18,19 +20,33 @@ public class WorkerStarter {
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
IPipeReceiver sink = null;
Queue<RecordArrayEvent> sink = null;
if (isWorker) {
final TCPConnector connector = new TCPConnector(
final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
final TCPConnector connector = new TCPConnector(tcpConnectorConnector,
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT, 10133),
configuration);
configureLoadBalancerIfEnabled(configuration, connector);
sink = connector;
connector.start();
sink = tcpConnectorConnector.registerProducer();
} else {
final RecordCountingFilter recordCountingFilter = new RecordCountingFilter(sink);
sink = new TraceCountingFilter(recordCountingFilter);
final SinglePipeConnector<RecordArrayEvent> recordCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
new RecordCountingFilter(recordCountingConnector, null).start();
final SinglePipeConnector<RecordArrayEvent> traceCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
new TraceCountingFilter(traceCountingConnector,
recordCountingConnector.registerProducer()).start();
sink = traceCountingConnector.registerProducer();
}
FilterConfiguration.configureAndStartFilters(configuration, sink);
......@@ -47,9 +63,9 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector);
tcpConnector);
} else {
try {
tcpConnector.connect();
......
......@@ -4,13 +4,18 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
public final class TCPReader implements IPeriodicTimeSignalReceiver {
......@@ -22,11 +27,13 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private ServerSocketChannel serversocket;
private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>();
private final List<TCPReaderOneClient> threads = Collections
.synchronizedList(new ArrayList<TCPReaderOneClient>());
private final Merger merger;
private final PipesMerger<RecordArrayEvent> merger;
public TCPReader(final int listeningPort, final Merger traceReconstructionMerger) {
public TCPReader(final int listeningPort,
final PipesMerger<RecordArrayEvent> traceReconstructionMerger) {
this.listeningPort = listeningPort;
merger = traceReconstructionMerger;
......@@ -36,25 +43,35 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
@Override
public void periodicTimeSignal(final long timestamp) {
final List<TCPReaderOneClient> toRemove = new ArrayList<TCPReaderOneClient>();
boolean alreadySent = false;
for (final TCPReaderOneClient thread : threads) {
if (!thread.isAlive()) {
toRemove.add(thread);
}
if (!alreadySent) {
thread.putInRingBuffer(new TimedPeriodRecord());
alreadySent = true;
synchronized (threads) {
final Iterator<TCPReaderOneClient> iterator = threads.iterator();
while (iterator.hasNext()) {
final TCPReaderOneClient thread = iterator.next();
if (!thread.isAlive()) {
iterator.remove();
} else {
if (!alreadySent) {
thread.putInQueue(new TimedPeriodRecord());
alreadySent = true;
}
thread.flushOutputBuffer();
}
}
thread.flushOutputBuffer();
}
for (final TCPReaderOneClient toRemoveThread : toRemove) {
threads.remove(toRemoveThread);
if (!alreadySent) {
final Queue<RecordArrayEvent> queue = merger.registerProducer();
final RecordArrayEvent recordArrayEvent = new RecordArrayEvent(1);
recordArrayEvent.getValues()[0] = new TimedPeriodRecord();
recordArrayEvent.setValueSize(1);
while (!queue.offer(recordArrayEvent)) {
LockSupport.parkNanos(1);
}
merger.deregisterProducer(queue);
}
}
......
......@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -58,9 +58,10 @@ class TCPReaderOneClient extends Thread {
private Trace currentlyOpenTrace;
private final Queue<RecordArrayEvent> queue;
private final Merger merger;
private final PipesMerger<RecordArrayEvent> merger;
public TCPReaderOneClient(final SocketChannel socketChannel, final Merger merger) {
public TCPReaderOneClient(final SocketChannel socketChannel,
final PipesMerger<RecordArrayEvent> merger) {
this.socketChannel = socketChannel;
this.merger = merger;
queue = merger.registerProducer();
......@@ -68,7 +69,7 @@ class TCPReaderOneClient extends Thread {
@Override
public void run() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 1024);
final ByteBuffer buffer = ByteBuffer.allocateDirect(8 * 1024 * 1024);
try {
String remoteAddress = "";
if (socketChannel.isConnected()) {
......@@ -90,6 +91,7 @@ class TCPReaderOneClient extends Thread {
RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress);
LOG.info("Error in read() " + ex.getMessage());
} finally {
flushOutputBuffer();
merger.deregisterProducer(queue);
}
}
......@@ -328,7 +330,7 @@ class TCPReaderOneClient extends Thread {
case SystemMonitoringRecord.CLAZZ_ID_FROM_WORKER: {
if (buffer.remaining() >= SystemMonitoringRecord.BYTE_LENGTH) {
try {
putInRingBuffer(SystemMonitoringRecord.createFromByteBuffer(buffer,
putInQueue(SystemMonitoringRecord.createFromByteBuffer(buffer,
stringRegistry));
} catch (final IdNotAvailableException e) {
// should not happen
......@@ -372,7 +374,7 @@ class TCPReaderOneClient extends Thread {
if (currentlyOpenTrace.getTraceEvents().size() == currentlyOpenTrace
.getEventsLength()) {
putInRingBuffer(currentlyOpenTrace);
putInQueue(currentlyOpenTrace);
currentlyOpenTrace = null;
return true;
}
......@@ -421,8 +423,8 @@ class TCPReaderOneClient extends Thread {
final String clazz = stringRegistry.getStringFromId(clazzId);
final String implementedInterface = stringRegistry.getStringFromId(interfaceId);
putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
objectId, operation, clazz, implementedInterface, hostApplicationMetadata));
putInQueue(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, objectId,
operation, clazz, implementedInterface, hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer, BeforeOperationEventRecord.COMPRESSED_BYTE_LENGTH + 1);
}
......@@ -437,8 +439,8 @@ class TCPReaderOneClient extends Thread {
try {
final String cause = stringRegistry.getStringFromId(causeId);
putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
cause, hostApplicationMetadata));
putInQueue(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, cause,
hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer,
AfterFailedOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID);
......@@ -449,7 +451,7 @@ class TCPReaderOneClient extends Thread {
final long timestamp = buffer.getLong();
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
putInQueue(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
hostApplicationMetadata));
}
......@@ -458,7 +460,7 @@ class TCPReaderOneClient extends Thread {
final long usedRAM = buffer.getLong();
final long absoluteRAM = buffer.getLong();
putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
putInQueue(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
hostApplicationMetadata));
}
......@@ -476,8 +478,8 @@ class TCPReaderOneClient extends Thread {
final String clazz = stringRegistry.getStringFromId(clazzId);
final String implementedInterface = stringRegistry.getStringFromId(interfaceId);
putInRingBuffer(new BeforeConstructorEventRecord(timestamp, traceId, orderIndex,
objectId, operation, clazz, implementedInterface, hostApplicationMetadata));
putInQueue(new BeforeConstructorEventRecord(timestamp, traceId, orderIndex, objectId,
operation, clazz, implementedInterface, hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer,
BeforeConstructorEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID);
......@@ -491,7 +493,7 @@ class TCPReaderOneClient extends Thread {
final int causeId = buffer.getInt();
try {
putInRingBuffer(new AfterFailedConstructorEventRecord(timestamp, traceId, orderIndex,
putInQueue(new AfterFailedConstructorEventRecord(timestamp, traceId, orderIndex,
stringRegistry.getStringFromId(causeId), hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer,
......@@ -504,7 +506,7 @@ class TCPReaderOneClient extends Thread {
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
putInRingBuffer(new AfterConstructorEventRecord(timestamp, traceId, orderIndex,
putInQueue(new AfterConstructorEventRecord(timestamp, traceId, orderIndex,
hostApplicationMetadata));
}
......@@ -516,8 +518,8 @@ class TCPReaderOneClient extends Thread {
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
putInRingBuffer(new BeforeReceivedRemoteCallRecord(timestamp, callerTraceId,
callerOrderIndex, traceId, orderIndex, hostApplicationMetadata));
putInQueue(new BeforeReceivedRemoteCallRecord(timestamp, callerTraceId, callerOrderIndex,
traceId, orderIndex, hostApplicationMetadata));
}
private final void readInBeforeStaticOperationEvent(final ByteBuffer buffer) {
......@@ -533,7 +535,7 @@ class TCPReaderOneClient extends Thread {
final String clazz = stringRegistry.getStringFromId(clazzId);
final String implementedInterface = stringRegistry.getStringFromId(interfaceId);
putInRingBuffer(new BeforeStaticOperationEventRecord(timestamp, traceId, orderIndex,
putInQueue(new BeforeStaticOperationEventRecord(timestamp, traceId, orderIndex,
operation, clazz, implementedInterface, hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer,
......@@ -550,8 +552,8 @@ class TCPReaderOneClient extends Thread {
try {
final String cause = stringRegistry.getStringFromId(causeId);
putInRingBuffer(new AfterFailedStaticOperationEventRecord(timestamp, traceId,
orderIndex, cause, hostApplicationMetadata));
putInQueue(new AfterFailedStaticOperationEventRecord(timestamp, traceId, orderIndex,
cause, hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
putInWaitingMessages(buffer,
AfterFailedStaticOperationEventRecord.COMPRESSED_BYTE_LENGTH_WITH_CLAZZ_ID);
......@@ -563,7 +565,7 @@ class TCPReaderOneClient extends Thread {
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
putInRingBuffer(new AfterStaticOperationEventRecord(timestamp, traceId, orderIndex,
putInQueue(new AfterStaticOperationEventRecord(timestamp, traceId, orderIndex,
hostApplicationMetadata));
}
......@@ -574,7 +576,7 @@ class TCPReaderOneClient extends Thread {
final int technologyId = buffer.getInt();
try {
putInRingBuffer(new BeforeSentRemoteCallRecord(timestamp,
putInQueue(new BeforeSentRemoteCallRecord(timestamp,
stringRegistry.getStringFromId(technologyId), traceId, orderIndex,
hostApplicationMetadata));
} catch (final IdNotAvailableException e) {
......@@ -588,7 +590,7 @@ class TCPReaderOneClient extends Thread {
final long traceId = buffer.getLong();
final int orderIndex = buffer.getInt();
putInRingBuffer(new AfterSentRemoteCallRecord(timestamp, traceId, orderIndex,
putInQueue(new AfterSentRemoteCallRecord(timestamp, traceId, orderIndex,
hostApplicationMetadata));
}
......@@ -602,7 +604,7 @@ class TCPReaderOneClient extends Thread {
final int destionationId = buffer.getInt();
try {
putInRingBuffer(new BeforeUnknownReceivedRemoteCallRecord(timestamp,