Commit a1561372 authored by Florian Fittkau's avatar Florian Fittkau

WiP

parent d3fc9234
......@@ -9,7 +9,7 @@ import java.nio.channels.SocketChannel;
import java.util.List;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.StringRegistrySender;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.filter.SinglePipeConnector;
......@@ -30,7 +30,7 @@ IRecordSender {
private SocketChannel socketChannel;
private final StringRegistry stringRegistry = new StringRegistry(this);
private final StringRegistrySender stringRegistry = new StringRegistrySender(this);
private final ByteBuffer sendingBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
private final ByteBuffer stringBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
......
......@@ -84,7 +84,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
} else {
traceBuffer.updateLastBufferInsert();
if ((timestamp - traceBuffer.getLastBufferInsert()) <= traceTimeout) {
if (traceBuffer.getLastBufferInsert() <= traceTimeout) {
deliver(traceBuffer.toTrace(false));
iterator.remove();
}
......
......@@ -50,7 +50,9 @@ public class TracesSummarizationFilter extends AbstractFilter {
insertIntoBuffer(trace);
} else {
// trace with remote records or invalid trace cant be reduced
System.out.println("invalid trace..." + trace.getTraceEvents().size());
System.out.println("invalid trace... size:" + trace.getTraceEvents().size());
// System.out.println("invalid trace... trace:" +
// trace.toString());
makeTraceElementsAccumulator(trace);
deliver(trace);
}
......
......@@ -19,13 +19,13 @@ public class FilterConfiguration {
final SinglePipeConnector<IRecord> traceReductionConnector = new SinglePipeConnector<IRecord>(
Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE);
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(2), sink)
.start();
new TracesSummarizationFilter(traceReductionConnector, TimeUnit.SECONDS.toNanos(3), sink)
.start();
final PipesMerger<IRecord> traceReconstructionMerger = new PipesMerger<IRecord>(
Constants.TCP_READER_DISRUPTOR_SIZE);
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(2),
new TraceReconstructionFilter(traceReconstructionMerger, TimeUnit.SECONDS.toNanos(5),
traceReductionConnector.registerProducer()).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
......
......@@ -14,11 +14,12 @@ import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class WorkerStarter {
public static boolean isWorker;
public static void main(final String[] args) {
final Configuration configuration = ConfigurationFactory.createSingletonConfiguration();
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
isWorker = configuration.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
Queue<IRecord> sink = null;
......@@ -64,9 +65,9 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 10200),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector);
tcpConnector);
} else {
try {
tcpConnector.connect();
......
......@@ -12,7 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.StringRegistryReceiver;
import explorviz.live_trace_processing.filter.PipesMerger;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.constructor.AfterConstructorEventRecord;
......@@ -40,7 +40,7 @@ class TCPReaderOneClient extends Thread {
private HostApplicationMetaDataRecord hostApplicationMetadata;
private final StringRegistry stringRegistry = new StringRegistry(null);
private final StringRegistryReceiver stringRegistry = new StringRegistryReceiver();
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(32);
private final SocketChannel socketChannel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment