Commit e3092294 authored by Florian Fittkau's avatar Florian Fittkau

Merge branch 'master' of build.se.informatik.uni-kiel.de:explorviz/worker

parents fee094cf e8fdabc8
......@@ -36,13 +36,13 @@ explorviz.live_trace_processing.worker_enabled=true
explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.tcp_reader_output_buffer_size=8192
explorviz.live_trace_processing.tcp_reader_disruptor_size=32
explorviz.live_trace_processing.tcp_reader_output_buffer_size=16384
explorviz.live_trace_processing.tcp_reader_disruptor_size=64
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=64
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=16
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=1024
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=64
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=1024
explorviz.live_trace_processing.trace_summarization_output_buffer_size=64
explorviz.live_trace_processing.trace_summarization_disruptor_size=16
explorviz.live_trace_processing.trace_summarization_output_buffer_size=256
explorviz.live_trace_processing.trace_summarization_disruptor_size=64
......@@ -5,21 +5,22 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.RecordArrayEventFactory;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
public final class TCPReader implements IPeriodicTimeSignalReceiver {
......@@ -40,7 +41,8 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE),
Constants.TCP_READER_DISRUPTOR_SIZE, AbstractFilter.cachedThreadPool);
Constants.TCP_READER_DISRUPTOR_SIZE, Executors.newCachedThreadPool(),
ProducerType.MULTI, new BlockingWaitStrategy());
@SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
......@@ -55,26 +57,24 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
public void periodicTimeSignal(final long timestamp) {
final List<TCPReaderOneClient> toRemove = new ArrayList<TCPReaderOneClient>();
boolean alreadySent = false;
for (final TCPReaderOneClient thread : threads) {
if (!thread.isAlive()) {
toRemove.add(thread);
}
if (!alreadySent) {
thread.putInRingBuffer(new TimedPeriodRecord());
alreadySent = true;
}
thread.flushOutputBuffer();
}
for (final TCPReaderOneClient toRemoveThread : toRemove) {
threads.remove(toRemoveThread);
}
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] buffer = valueEvent.getValues();
buffer[0] = new TimedPeriodRecord();
valueEvent.setValues(buffer);
valueEvent.setValueSize(1);
ringBuffer.publish(hiseq);
}
public final void read() {
......
......@@ -70,7 +70,7 @@ class TCPReaderOneClient extends Thread {
@Override
public void run() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024 * 1024);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 1024);
try {
String remoteAddress = "";
if (socketChannel.isConnected()) {
......@@ -702,7 +702,7 @@ class TCPReaderOneClient extends Thread {
}
}
private final void putInRingBuffer(final IRecord message) {
public final void putInRingBuffer(final IRecord message) {
synchronized (this) {
outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == Constants.TCP_READER_OUTPUT_BUFFER_SIZE) {
......@@ -711,7 +711,7 @@ class TCPReaderOneClient extends Thread {
}
}
public void flushOutputBuffer() {
public final void flushOutputBuffer() {
synchronized (this) {
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment