Commit d418ab7c authored by Florian Fittkau's avatar Florian Fittkau

optimizations

parent fc300807
......@@ -36,13 +36,13 @@ explorviz.live_trace_processing.worker_enabled=true
explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.tcp_reader_output_buffer_size=16384
explorviz.live_trace_processing.tcp_reader_disruptor_size=64
explorviz.live_trace_processing.tcp_reader_output_buffer_size=512
explorviz.live_trace_processing.tcp_reader_disruptor_size=16
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=1024
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=64
explorviz.live_trace_processing.trace_reconstruction_output_buffer_size=128
explorviz.live_trace_processing.trace_reconstruction_disruptor_size=16
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=1024
explorviz.live_trace_processing.trace_reconstruction_buffer_initial_size=128
explorviz.live_trace_processing.trace_summarization_output_buffer_size=256
explorviz.live_trace_processing.trace_summarization_disruptor_size=64
explorviz.live_trace_processing.trace_summarization_output_buffer_size=128
explorviz.live_trace_processing.trace_summarization_disruptor_size=16
......@@ -19,8 +19,8 @@ public class FilterConfiguration {
final SinglePipeConnector<RecordArrayEvent> traceReductionConnector = new SinglePipeConnector<RecordArrayEvent>(
Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE);
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.MILLISECONDS.toNanos(990),
sink).start();
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(1), sink)
.start();
final PipesMerger<RecordArrayEvent> traceReconstructionMerger = new PipesMerger<RecordArrayEvent>(
Constants.TCP_READER_DISRUPTOR_SIZE);
......
......@@ -3,6 +3,7 @@ package explorviz.live_trace_processing.main;
import java.io.IOException;
import java.util.Queue;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
......@@ -24,7 +25,7 @@ public class WorkerStarter {
if (isWorker) {
final SinglePipeConnector<RecordArrayEvent> tcpConnectorConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
final TCPConnector connector = new TCPConnector(tcpConnectorConnector,
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
......@@ -38,11 +39,11 @@ public class WorkerStarter {
sink = tcpConnectorConnector.registerProducer();
} else {
final SinglePipeConnector<RecordArrayEvent> recordCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
new RecordCountingFilter(recordCountingConnector, null).start();
final SinglePipeConnector<RecordArrayEvent> traceCountingConnector = new SinglePipeConnector<RecordArrayEvent>(
16);
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
new TraceCountingFilter(traceCountingConnector,
recordCountingConnector.registerProducer()).start();
......@@ -60,12 +61,12 @@ public class WorkerStarter {
if (loadBalancerEnabled) {
new LoadBalancer(
configuration.getStringProperty(ConfigurationFactory.LOAD_BALANCER_IP),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 10200),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector);
tcpConnector);
} else {
try {
tcpConnector.connect();
......
......@@ -45,7 +45,7 @@ class TCPReaderOneClient extends Thread {
private HostApplicationMetaDataRecord hostApplicationMetadata;
private final StringRegistry stringRegistry = new StringRegistry(null);
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(128);
private final SocketChannel socketChannel;
......@@ -68,7 +68,7 @@ class TCPReaderOneClient extends Thread {
@Override
public void run() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024 * 1024);
final ByteBuffer buffer = ByteBuffer.allocate(16 * 1024 * 1024);
try {
String remoteAddress = "";
if (socketChannel.isConnected()) {
......@@ -85,11 +85,11 @@ class TCPReaderOneClient extends Thread {
buffer.flip();
messagesfromByteArray(buffer);
}
LOG.info("Client " + remoteAddress + " disconnected.");
} catch (final IOException ex) {
RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress);
LOG.info("Error in read() " + ex.getMessage());
} finally {
LOG.info("Client " + remoteAddress + " disconnected.");
flushOutputBuffer();
merger.deregisterProducer(queue);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment