Commit 643a2166 authored by Florian Fittkau's avatar Florian Fittkau

WiP

parent e2dd81f6
......@@ -9,7 +9,7 @@ explorviz.live_trace_processing.writer_load_balancing_port=9999
explorviz.live_trace_processing.writer_load_balancing_wait_time=20000
explorviz.live_trace_processing.writer_load_balancing_scaling_group=analysis-worker
explorviz.live_trace_processing.sending_buffer_size=2097152
explorviz.live_trace_processing.sending_buffer_size=262144
######################## Monitoring ########################
......
......@@ -6,6 +6,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.List;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
......@@ -14,22 +15,25 @@ import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.ISerializableRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.writer.IRecordSender;
import explorviz.live_trace_processing.writer.IStringRecordSender;
import explorviz.live_trace_processing.writer.IWriter;
public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender,
IRecordSender {
IRecordSender {
private URL providerURL;
private SocketChannel socketChannel;
private final StringRegistry stringRegistry = new StringRegistry(this);
private final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.SENDING_BUFFER_SIZE);
private final ByteBuffer sendingBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
private final ByteBuffer stringBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
private volatile boolean shouldDisconnect = false;
......@@ -38,7 +42,7 @@ IRecordSender {
public TCPConnector(final SinglePipeConnector<IRecord> tcpConnectorConnector,
final String hostname, final int port, final Configuration configuration) {
this.tcpConnectorConnector = tcpConnectorConnector;
buffer.clear();
sendingBuffer.clear();
try {
setProviderURL(new URL("http://" + hostname + ":" + port));
} catch (final MalformedURLException e) {
......@@ -77,24 +81,31 @@ IRecordSender {
@Override
public void sendOutStringRecord(final StringRegistryRecord record) {
final ByteBuffer stringBuffer = ByteBuffer.allocate(record.getRecordSizeInBytes());
record.putIntoByteBuffer(stringBuffer, stringRegistry, this);
send(stringBuffer);
}
@Override
public void processRecord(final IRecord record) {
if (record instanceof ISerializableRecord) {
if (record instanceof Trace) {
final Trace trace = (Trace) record;
final List<AbstractEventRecord> traceEvents = trace.getTraceEvents();
for (final AbstractEventRecord event : traceEvents) {
if (sendingBuffer.remaining() < event.getRecordSizeInBytes()) {
send(sendingBuffer);
}
event.putIntoByteBuffer(sendingBuffer, stringRegistry, this);
}
send(sendingBuffer);
} else if (record instanceof ISerializableRecord) {
final ISerializableRecord serializableRecord = (ISerializableRecord) record;
if (buffer.remaining() < serializableRecord.getRecordSizeInBytes()) {
send(buffer);
System.out.println("required: " + serializableRecord.getRecordSizeInBytes()
+ " and capacity is " + buffer.capacity());
if (sendingBuffer.remaining() < serializableRecord.getRecordSizeInBytes()) {
send(sendingBuffer);
}
serializableRecord.putIntoByteBuffer(buffer, stringRegistry, this);
serializableRecord.putIntoByteBuffer(sendingBuffer, stringRegistry, this);
} else if (record instanceof TimedPeriodRecord) {
// send(buffer);
// this the end of timedperiodrecords - master has own
// this the end of timedperiodrecords - master has its own
} else if (record instanceof TerminateRecord) {
terminate();
}
......
......@@ -77,7 +77,7 @@ class TraceReconstructionBuffer {
return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()));
}
public final Trace toTrace(final boolean valid) {
public final Trace toTrace() {
final Stack<AbstractBeforeEventRecord> stack = new Stack<AbstractBeforeEventRecord>();
boolean containsRemoteRecord = false;
......@@ -89,21 +89,30 @@ class TraceReconstructionBuffer {
|| (event instanceof BeforeSentRemoteCallRecord)) {
containsRemoteRecord = true;
}
} else if ((event instanceof AbstractAfterEventRecord)
|| (event instanceof AbstractAfterFailedEventRecord)) {
} else if (event instanceof AbstractAfterEventRecord) {
final AbstractAfterEventRecord abstractAfterEventRecord = (AbstractAfterEventRecord) event;
if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop();
if (beforeEvent.getRuntimeStatisticInformation() == null) {
final long time = event.getLoggingTimestamp()
- beforeEvent.getLoggingTimestamp();
final long time = abstractAfterEventRecord.getMethodDuration();
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(
1, time, time * time));
}
}
} else if (event instanceof AbstractAfterFailedEventRecord) {
final AbstractAfterFailedEventRecord abstractAfterFailedEventRecord = (AbstractAfterFailedEventRecord) event;
if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop();
if (beforeEvent.getRuntimeStatisticInformation() == null) {
final long time = abstractAfterFailedEventRecord.getMethodDuration();
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(
1, time, time * time));
}
}
}
}
return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord,
events.size());
return new Trace(new ArrayList<AbstractEventRecord>(events), containsRemoteRecord);
}
}
package explorviz.live_trace_processing.filter.reconstruction;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
......@@ -14,7 +13,6 @@ import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter implements Runnable {
private final long maxTraceTimeout;
......@@ -46,16 +44,9 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) {
deliver(traceBuffer.toTrace(true));
deliver(traceBuffer.toTrace());
traceId2trace.remove(traceId);
}
} else if (record instanceof Trace) {
final Trace trace = (Trace) record;
if (trace.isValid()) {
deliver(trace);
} else {
getBufferForTraceId(trace.getTraceEvents().get(0).getTraceId());
}
} else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp());
deliver(record);
......@@ -78,9 +69,12 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
final List<Long> traceIdsToRemove = new ArrayList<Long>();
for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) {
final Iterator<Entry<Long, TraceReconstructionBuffer>> iterator = traceId2trace.entrySet()
.iterator();
while (iterator.hasNext()) {
final Entry<Long, TraceReconstructionBuffer> entry = iterator.next();
final TraceReconstructionBuffer traceBuffer = entry.getValue();
if (traceBuffer.isUpdatedInThisPeriod()) {
traceBuffer.setUpdatedInThisPeriod(false);
......@@ -88,20 +82,16 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
traceBuffer.updateLastBufferInsert();
if ((timestamp - traceBuffer.getLastBufferInsert()) <= traceTimeout) {
deliver(traceBuffer.toTrace(false));
traceIdsToRemove.add(entry.getKey());
deliver(traceBuffer.toTrace());
iterator.remove();
}
}
}
for (final long traceIdToRemove : traceIdsToRemove) {
traceId2trace.remove(traceIdToRemove);
}
}
private void terminate() {
for (final TraceReconstructionBuffer entry : traceId2trace.values()) {
deliver(entry.toTrace(false));
deliver(entry.toTrace());
}
traceId2trace.clear();
}
......
......@@ -45,15 +45,10 @@ public class TracesSummarizationFilter extends AbstractFilter {
if (record instanceof Trace) {
final Trace trace = (Trace) record;
if (trace.isValid()) {
if (!trace.containsRemoteRecord()) {
insertIntoBuffer(trace);
} else {
// trace with remote records cant be reduced
makeTraceElementsAccumulator(trace);
deliver(trace);
}
if (!trace.containsRemoteRecord()) {
insertIntoBuffer(trace);
} else {
// trace with remote records cant be reduced
makeTraceElementsAccumulator(trace);
deliver(trace);
}
......@@ -91,7 +86,7 @@ public class TracesSummarizationFilter extends AbstractFilter {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
......
......@@ -19,13 +19,13 @@ public class FilterConfiguration {
final SinglePipeConnector<IRecord> traceReductionConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE);
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(4), sink)
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(3), sink)
.start();
final PipesMerger<IRecord> traceReconstructionMerger = new PipesMerger<IRecord>(
Constants.TCP_READER_DISRUPTOR_SIZE);
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(4),
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(3),
traceReductionConnector.registerProducer()).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
......
......@@ -3,9 +3,6 @@ package explorviz.live_trace_processing.reader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
......@@ -25,9 +22,6 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private ServerSocketChannel serversocket;
private final List<TCPReaderOneClient> threads = Collections
.synchronizedList(new ArrayList<TCPReaderOneClient>());
private final PipesMerger<IRecord> merger;
public TCPReader(final int listeningPort, final PipesMerger<IRecord> traceReconstructionMerger) {
......@@ -55,10 +49,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
try {
open();
while (active) {
final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(),
merger);
thread.start();
threads.add(thread);
new TCPReaderOneClient(serversocket.accept(), merger).start();
}
} catch (final IOException ex) {
LOG.info("Error in read() " + ex.getMessage());
......
......@@ -12,7 +12,7 @@ public class TraceReconstructionBufferTest {
@Test
public void testInsertEvent() throws Exception {
final TraceReconstructionBuffer traceReconstructionBuffer = new TraceReconstructionBuffer();
traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1000, 1, 0, 0, "test",
traceReconstructionBuffer.insertEvent(new BeforeOperationEventRecord(1, 0, 0, "test",
"TestClazz", "", new HostApplicationMetaDataRecord("testSystem", "testIp",
"testHost", "testApp", "Java")));
assertTrue(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment