Commit 1dc8d1bf authored by Florian Fittkau's avatar Florian Fittkau

record batching now superfluous

parent d418ab7c
......@@ -36,13 +36,9 @@ explorviz.live_trace_processing.worker_enabled=true
explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.tcp_reader_output_buffer_size=512
explorviz.live_trace_processing.tcp_reader_disruptor_size=16
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=128
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=16
explorviz.live_trace_processing.tcp_reader_disruptor_size=16384
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=4096
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_summarization_output_buffer_size=128
explorviz.live_trace_processing.trace_summarization_disruptor_size=16
explorviz.live_trace_processing.trace_summarization_disruptor_size=256
......@@ -11,7 +11,6 @@ import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.ISerializableRecord;
......@@ -23,7 +22,7 @@ import explorviz.live_trace_processing.writer.IStringRecordSender;
import explorviz.live_trace_processing.writer.IWriter;
public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender,
IRecordSender {
IRecordSender {
private URL providerURL;
private SocketChannel socketChannel;
......@@ -34,9 +33,9 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
private volatile boolean shouldDisconnect = false;
private final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector;
private final SinglePipeConnector<IRecord> tcpConnectorConnector;
public TCPConnector(final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector,
public TCPConnector(final SinglePipeConnector<IRecord> tcpConnectorConnector,
final String hostname, final int port, final Configuration configuration) {
this.tcpConnectorConnector = tcpConnectorConnector;
buffer.clear();
......@@ -84,7 +83,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
}
@Override
protected void processRecord(final IRecord record) {
public void processRecord(final IRecord record) {
if (record instanceof ISerializableRecord) {
final ISerializableRecord serializableRecord = (ISerializableRecord) record;
if (buffer.remaining() < serializableRecord.getRecordSizeInBytes()) {
......
......@@ -3,7 +3,6 @@ package explorviz.live_trace_processing.filter.counting;
import java.util.Queue;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEventRecord;
......@@ -14,12 +13,14 @@ import explorviz.live_trace_processing.record.trace.Trace;
public class RecordCountingFilter extends AbstractFilter {
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
private final SinglePipeConnector<IRecord> pipeConnector;
private final Queue<IRecord> receiver;
public RecordCountingFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final Queue<RecordArrayEvent> receiver) {
super(receiver, 64, "MethodCalls/10 sec", 1000 * 10);
public RecordCountingFilter(final SinglePipeConnector<IRecord> pipeConnector,
final Queue<IRecord> receiver) {
super(receiver, "MethodCalls/10 sec", 1000 * 10);
this.pipeConnector = pipeConnector;
this.receiver = receiver;
counter.setEnabled(true);
}
......@@ -39,18 +40,27 @@ public class RecordCountingFilter extends AbstractFilter {
.getRuntimeStatisticInformation().getCount());
}
}
deliver(record);
if (receiver != null) {
deliver(record);
}
} else if (record instanceof AbstractBeforeOperationEventRecord) {
counter.inputObjectsCount(1);
deliver(record);
if (receiver != null) {
deliver(record);
}
} else if (record instanceof TimedPeriodRecord) {
periodicFlush(record);
// deliver(record);
if (receiver != null) {
deliver(record);
}
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
if (receiver != null) {
deliver(record);
}
} else {
deliver(record);
if (receiver != null) {
deliver(record);
}
}
}
......
......@@ -3,7 +3,6 @@ package explorviz.live_trace_processing.filter.counting;
import java.util.Queue;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
......@@ -12,11 +11,11 @@ import explorviz.live_trace_processing.record.trace.Trace;
public class TraceCountingFilter extends AbstractFilter {
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
private final SinglePipeConnector<IRecord> pipeConnector;
public TraceCountingFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final Queue<RecordArrayEvent> receiver) {
super(receiver, 64, "TraceCalls/10 sec", 1000 * 10);
public TraceCountingFilter(final SinglePipeConnector<IRecord> pipeConnector,
final Queue<IRecord> receiver) {
super(receiver, "TraceCalls/10 sec", 1000 * 10);
this.pipeConnector = pipeConnector;
counter.setEnabled(true);
}
......@@ -33,8 +32,7 @@ public class TraceCountingFilter extends AbstractFilter {
counter.inputObjectsCount(trace.getCalledTimes());
deliver(record);
} else if (record instanceof TimedPeriodRecord) {
periodicFlush(record);
// deliver(record);
deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
......
package explorviz.live_trace_processing.filter.reconstruction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
public interface ITraceReconstruction extends IPipeReceiver<RecordArrayEvent> {
void setMerger(PipesMerger<RecordArrayEvent> traceReconstructionMerger);
}
......@@ -7,10 +7,8 @@ import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -23,12 +21,11 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new TreeMap<Long, TraceReconstructionBuffer>();
private final PipesMerger<RecordArrayEvent> traceReconstructionMerger;
private final PipesMerger<IRecord> traceReconstructionMerger;
public TraceReconstructionFilter(final PipesMerger<RecordArrayEvent> traceReconstructionMerger,
final long maxTraceTimeout, final Queue<RecordArrayEvent> receiverQueue) {
super(receiverQueue, Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE,
"Reconstructed traces/sec", 1000);
public TraceReconstructionFilter(final PipesMerger<IRecord> traceReconstructionMerger,
final long maxTraceTimeout, final Queue<IRecord> receiverQueue) {
super(receiverQueue, "Reconstructed traces/sec", 1000);
this.traceReconstructionMerger = traceReconstructionMerger;
this.maxTraceTimeout = maxTraceTimeout;
}
......@@ -61,8 +58,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
}
} else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp());
periodicFlush(record);
// deliver(record);
deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
......
......@@ -6,9 +6,7 @@ import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
......@@ -26,12 +24,11 @@ public class TracesSummarizationFilter extends AbstractFilter {
private final Map<Trace, TracesSummarizationBuffer> trace2buffer = new TreeMap<Trace, TracesSummarizationBuffer>(
new TraceComperator());
private final SinglePipeConnector<RecordArrayEvent> pipeConnector;
private final SinglePipeConnector<IRecord> pipeConnector;
public TracesSummarizationFilter(final SinglePipeConnector<RecordArrayEvent> pipeConnector,
final long maxCollectionDuration, final Queue<RecordArrayEvent> sinkReceiver) {
super(sinkReceiver, Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec",
1000);
public TracesSummarizationFilter(final SinglePipeConnector<IRecord> pipeConnector,
final long maxCollectionDuration, final Queue<IRecord> sinkReceiver) {
super(sinkReceiver, "Reduced traces/sec", 1000);
this.pipeConnector = pipeConnector;
this.maxCollectionDuration = maxCollectionDuration;
......@@ -61,8 +58,7 @@ public class TracesSummarizationFilter extends AbstractFilter {
}
} else if (record instanceof TimedPeriodRecord) {
processTimeoutQueue(TimeProvider.getCurrentTimestamp());
periodicFlush(record);
// deliver(record);
deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
......@@ -93,7 +89,7 @@ public class TracesSummarizationFilter extends AbstractFilter {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
......
......@@ -7,22 +7,22 @@ import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reduction.TracesSummarizationFilter;
import explorviz.live_trace_processing.reader.TCPReader;
import explorviz.live_trace_processing.record.IRecord;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final Queue<RecordArrayEvent> sink) {
final SinglePipeConnector<RecordArrayEvent> traceReductionConnector = new SinglePipeConnector<RecordArrayEvent>(
final Queue<IRecord> sink) {
final SinglePipeConnector<IRecord> traceReductionConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE);
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(1), sink)
.start();
final PipesMerger<RecordArrayEvent> traceReconstructionMerger = new PipesMerger<RecordArrayEvent>(
final PipesMerger<IRecord> traceReconstructionMerger = new PipesMerger<IRecord>(
Constants.TCP_READER_DISRUPTOR_SIZE);
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(4),
......
......@@ -7,10 +7,10 @@ import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.filter.counting.RecordCountingFilter;
import explorviz.live_trace_processing.filter.counting.TraceCountingFilter;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class WorkerStarter {
......@@ -21,10 +21,10 @@ public class WorkerStarter {
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
Queue<RecordArrayEvent> sink = null;
Queue<IRecord> sink = null;
if (isWorker) {
final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector = new SinglePipeConnector<RecordArrayEvent>(
final SinglePipeConnector<IRecord> tcpConnectorConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
final TCPConnector connector = new TCPConnector(tcpConnectorConnector,
......@@ -38,11 +38,11 @@ public class WorkerStarter {
sink = tcpConnectorConnector.registerProducer();
} else {
final SinglePipeConnector<RecordArrayEvent> recordCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
final SinglePipeConnector<IRecord> recordCountingConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
new RecordCountingFilter(recordCountingConnector, null).start();
final SinglePipeConnector<RecordArrayEvent> traceCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
final SinglePipeConnector<IRecord> traceCountingConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
new TraceCountingFilter(traceCountingConnector,
recordCountingConnector.registerProducer()).start();
......@@ -64,9 +64,9 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 10200),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector);
tcpConnector);
} else {
try {
tcpConnector.connect();
......
......@@ -5,7 +5,6 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
......@@ -14,7 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
public final class TCPReader implements IPeriodicTimeSignalReceiver {
......@@ -29,10 +28,9 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private final List<TCPReaderOneClient> threads = Collections
.synchronizedList(new ArrayList<TCPReaderOneClient>());
private final PipesMerger<RecordArrayEvent> merger;
private final PipesMerger<IRecord> merger;
public TCPReader(final int listeningPort,
final PipesMerger<RecordArrayEvent> traceReconstructionMerger) {
public TCPReader(final int listeningPort, final PipesMerger<IRecord> traceReconstructionMerger) {
this.listeningPort = listeningPort;
merger = traceReconstructionMerger;
......@@ -42,39 +40,15 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
@Override
public void periodicTimeSignal(final long timestamp) {
boolean alreadySent = false;
synchronized (threads) {
final Iterator<TCPReaderOneClient> iterator = threads.iterator();
while (iterator.hasNext()) {
final TCPReaderOneClient thread = iterator.next();
if (!thread.isAlive()) {
iterator.remove();
} else {
if (!alreadySent) {
thread.putInQueue(new TimedPeriodRecord());
alreadySent = true;
}
thread.flushOutputBuffer();
}
}
}
if (!alreadySent) {
final Queue<RecordArrayEvent> queue = merger.registerProducer();
final RecordArrayEvent recordArrayEvent = new RecordArrayEvent(1);
recordArrayEvent.getValues()[0] = new TimedPeriodRecord();
recordArrayEvent.setValueSize(1);
while (!queue.offer(recordArrayEvent)) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
final Queue<IRecord> queue = merger.registerProducer();
final TimedPeriodRecord periodRecord = new TimedPeriodRecord();
while (!queue.offer(periodRecord)) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
merger.deregisterProducer(queue);
}
merger.deregisterProducer(queue);
}
public final void read() {
......
......@@ -11,11 +11,9 @@ import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.event.constructor.AfterConstructorEventRecord;
......@@ -49,18 +47,14 @@ class TCPReaderOneClient extends Thread {
private final SocketChannel socketChannel;
private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE];
private int outputBufferIndex = 0;
private final String remoteAddress = "";
private Trace currentlyOpenTrace;
private final Queue<RecordArrayEvent> queue;
private final PipesMerger<RecordArrayEvent> merger;
private final Queue<IRecord> queue;
private final PipesMerger<IRecord> merger;
public TCPReaderOneClient(final SocketChannel socketChannel,
final PipesMerger<RecordArrayEvent> merger) {
public TCPReaderOneClient(final SocketChannel socketChannel, final PipesMerger<IRecord> merger) {
this.socketChannel = socketChannel;
this.merger = merger;
queue = merger.registerProducer();
......@@ -90,7 +84,6 @@ class TCPReaderOneClient extends Thread {
LOG.info("Error in read() " + ex.getMessage());
} finally {
LOG.info("Client " + remoteAddress + " disconnected.");
flushOutputBuffer();
merger.deregisterProducer(queue);
}
}
......@@ -746,33 +739,10 @@ class TCPReaderOneClient extends Thread {
}
public final void putInQueue(final IRecord message) {
synchronized (this) {
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == Constants.TCP_READER_OUTPUT_BUFFER_SIZE) {
flushOutputBuffer();
}
}
}
public final void flushOutputBuffer() {
synchronized (this) {
if (outputBufferIndex > 0) {
final RecordArrayEvent arrayEvent = new RecordArrayEvent(
Constants.TCP_READER_OUTPUT_BUFFER_SIZE);
final IRecord[] savedBuffer = arrayEvent.getValues();
arrayEvent.setValues(outputBuffer);
arrayEvent.setValueSize(outputBufferIndex);
while (!queue.offer(arrayEvent)) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
outputBuffer = savedBuffer;
outputBufferIndex = 0;
while (!queue.offer(message)) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment