Commit d3fc9234 authored by Florian Fittkau's avatar Florian Fittkau

WiP

parent c6419ebe
...@@ -46,10 +46,11 @@ public class TracesSummarizationFilter extends AbstractFilter { ...@@ -46,10 +46,11 @@ public class TracesSummarizationFilter extends AbstractFilter {
if (record instanceof Trace) { if (record instanceof Trace) {
final Trace trace = (Trace) record; final Trace trace = (Trace) record;
if (!trace.containsRemoteRecord() && trace.isValid()) { if (trace.isValid() && !trace.containsRemoteRecord()) {
insertIntoBuffer(trace); insertIntoBuffer(trace);
} else { } else {
// trace with remote records or invalid trace cant be reduced // trace with remote records or invalid trace cant be reduced
System.out.println("invalid trace..." + trace.getTraceEvents().size());
makeTraceElementsAccumulator(trace); makeTraceElementsAccumulator(trace);
deliver(trace); deliver(trace);
} }
......
...@@ -9,7 +9,6 @@ import explorviz.live_trace_processing.configuration.ConfigurationFactory; ...@@ -9,7 +9,6 @@ import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector; import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.filter.SinglePipeConnector; import explorviz.live_trace_processing.filter.SinglePipeConnector;
import explorviz.live_trace_processing.filter.counting.RecordCountingFilter; import explorviz.live_trace_processing.filter.counting.RecordCountingFilter;
import explorviz.live_trace_processing.filter.counting.TraceCountingFilter;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer; import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
...@@ -42,12 +41,13 @@ public class WorkerStarter { ...@@ -42,12 +41,13 @@ public class WorkerStarter {
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE); Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
new RecordCountingFilter(recordCountingConnector, null).start(); new RecordCountingFilter(recordCountingConnector, null).start();
final SinglePipeConnector<IRecord> traceCountingConnector = new SinglePipeConnector<IRecord>( // final SinglePipeConnector<IRecord> traceCountingConnector = new
Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE); // SinglePipeConnector<IRecord>(
new TraceCountingFilter(traceCountingConnector, // Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE);
recordCountingConnector.registerProducer()).start(); // new TraceCountingFilter(traceCountingConnector,
// recordCountingConnector.registerProducer()).start();
sink = traceCountingConnector.registerProducer(); sink = recordCountingConnector.registerProducer();
} }
FilterConfiguration.configureAndStartFilters(configuration, sink); FilterConfiguration.configureAndStartFilters(configuration, sink);
...@@ -64,9 +64,9 @@ public class WorkerStarter { ...@@ -64,9 +64,9 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 10200), configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 10200),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME, configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000), 20000),
configuration configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP), .getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpConnector); tcpConnector);
} else { } else {
try { try {
tcpConnector.connect(); tcpConnector.connect();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment