Commit 518b1ac3 authored by Florian Fittkau's avatar Florian Fittkau

moved from disruptor to jctools

parent e3092294
......@@ -5,7 +5,11 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
<classpathentry kind="lib" path="lib/jctools-core-1.0.jar">
<attributes>
<attribute name="javadoc_location" value="jar:platform:/resource/worker/lib/jctools-core-1.0-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="lib" path="lib/disruptor-3.3.0.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
</launchConfiguration>
......@@ -30,7 +30,7 @@
</manifest>
<fileset dir="${build.dir}" excludes="*common*.jar" />
<fileset dir="lib" includes="*LICENSE*" />
<zipfileset excludes="META-INF/*.SF" src="lib/disruptor-3.3.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
</jar>
......
This diff is collapsed.
package explorviz.live_trace_processing.filter.counting;
import explorviz.live_trace_processing.filter.IPipeReceiver;
public interface IRecordCounting extends IPipeReceiver {
}
......@@ -2,6 +2,7 @@ package explorviz.live_trace_processing.filter.counting;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEventRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -9,9 +10,9 @@ import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public class RecordCountingFilter extends AbstractFilter implements IRecordCounting {
public class RecordCountingFilter extends AbstractFilter {
public RecordCountingFilter(final IPipeReceiver receiver) {
public RecordCountingFilter(final IPipeReceiver<RecordArrayEvent> receiver) {
super(receiver, 16, 64, "MethodCalls/10 sec", 1000 * 10);
counter.setEnabled(true);
}
......
......@@ -2,14 +2,15 @@ package explorviz.live_trace_processing.filter.counting;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public class TraceCountingFilter extends AbstractFilter implements IRecordCounting {
public class TraceCountingFilter extends AbstractFilter {
public TraceCountingFilter(final IPipeReceiver receiver) {
public TraceCountingFilter(final IPipeReceiver<RecordArrayEvent> receiver) {
super(receiver, 16, 64, "TraceCalls/10 sec", 1000 * 10);
counter.setEnabled(true);
}
......
package explorviz.live_trace_processing.filter.reconstruction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
public interface ITraceReconstruction extends IPipeReceiver {
public interface ITraceReconstruction extends IPipeReceiver<RecordArrayEvent> {
void setMerger(Merger<RecordArrayEvent> traceReconstructionMerger);
}
......@@ -8,7 +8,9 @@ import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -16,18 +18,33 @@ import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter implements ITraceReconstruction {
public final class TraceReconstructionFilter extends AbstractFilter implements
ITraceReconstruction, Runnable {
private final long maxTraceTimeout;
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new TreeMap<Long, TraceReconstructionBuffer>();
private Merger<RecordArrayEvent> traceReconstructionMerger;
public TraceReconstructionFilter(final long maxTraceTimeout,
final ITraceReduction traceReduction) {
final IPipeReceiver<RecordArrayEvent> traceReduction) {
super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec", 1000);
this.maxTraceTimeout = maxTraceTimeout;
}
@Override
public void run() {
traceReconstructionMerger.setReceiver(this);
traceReconstructionMerger.process();
}
@Override
public void setMerger(final Merger<RecordArrayEvent> traceReconstructionMerger) {
this.traceReconstructionMerger = traceReconstructionMerger;
}
@Override
public final void processRecord(final IRecord record) {
if (record instanceof AbstractEventRecord) {
......
package explorviz.live_trace_processing.filter.reduction;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public abstract class AbstractReductionFilter extends AbstractFilter implements ITraceReduction {
public AbstractReductionFilter(final IPipeReceiver receiver, final String counterString) {
super(receiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, counterString, 1000);
}
public Trace testReduction(final Trace trace) {
return reduceTrace(trace);
}
@Override
protected void processRecord(final IRecord record) {
if (record instanceof Trace) {
final Trace trace = (Trace) record;
if (trace.isValid()) {
final Trace reducedTrace = reduceTrace(trace);
deliver(reducedTrace);
} else {
deliver(trace);
}
} else if (record instanceof TimedPeriodRecord) {
periodicFlush(record);
deliver(record);
} else if (record instanceof TerminateRecord) {
System.out.println("terminate...");
deliver(record);
} else {
deliver(record);
}
}
protected abstract Trace reduceTrace(Trace trace);
}
package explorviz.live_trace_processing.filter.reduction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.record.trace.Trace;
public class DummyFilter extends AbstractReductionFilter {
public DummyFilter(final IPipeReceiver receiver) {
super(receiver, "dummy");
}
@Override
protected Trace reduceTrace(final Trace trace) {
return trace;
}
}
package explorviz.live_trace_processing.filter.reduction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
public interface ITraceReduction extends IPipeReceiver {
}
......@@ -8,6 +8,7 @@ import java.util.TreeMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeEventRecord;
......@@ -18,14 +19,14 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.record.trace.TraceComperator;
public class TracesSummarizationFilter extends AbstractFilter implements ITraceReduction {
public class TracesSummarizationFilter extends AbstractFilter {
private final long maxCollectionDuration;
private final Map<Trace, TracesSummarizationBuffer> trace2buffer = new TreeMap<Trace, TracesSummarizationBuffer>(
new TraceComperator());
public TracesSummarizationFilter(final long maxCollectionDuration,
final IPipeReceiver sinkReceiver) {
final IPipeReceiver<RecordArrayEvent> sinkReceiver) {
super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces/sec", 1000);
......@@ -83,7 +84,7 @@ public class TracesSummarizationFilter extends AbstractFilter implements ITraceR
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
......
......@@ -2,26 +2,33 @@ package explorviz.live_trace_processing.main;
import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.filter.reduction.TracesSummarizationFilter;
import explorviz.live_trace_processing.reader.TCPReader;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final IPipeReceiver sink) {
final IPipeReceiver<RecordArrayEvent> sink) {
final ITraceReduction traceReduction = new TracesSummarizationFilter(
final IPipeReceiver<RecordArrayEvent> traceReduction = new TracesSummarizationFilter(
TimeUnit.MILLISECONDS.toNanos(990), sink);
final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter(
final Merger<RecordArrayEvent> traceReconstructionMerger = new Merger<RecordArrayEvent>(
Constants.TCP_READER_DISRUPTOR_SIZE);
final TraceReconstructionFilter traceReconstruction = new TraceReconstructionFilter(
TimeUnit.SECONDS.toNanos(4), traceReduction);
traceReconstruction.setMerger(traceReconstructionMerger);
new Thread(traceReconstruction).start();
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT,
10133), traceReconstruction).read();
10133), traceReconstructionMerger).read();
}
}
......@@ -5,22 +5,12 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.RecordArrayEventFactory;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
public final class TCPReader implements IPeriodicTimeSignalReceiver {
......@@ -32,23 +22,14 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
private ServerSocketChannel serversocket;
private final RingBuffer<RecordArrayEvent> ringBuffer;
private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>();
public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) {
this.listeningPort = listeningPort;
private final Merger merger;
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE),
Constants.TCP_READER_DISRUPTOR_SIZE, Executors.newCachedThreadPool(),
ProducerType.MULTI, new BlockingWaitStrategy());
public TCPReader(final int listeningPort, final Merger traceReconstructionMerger) {
this.listeningPort = listeningPort;
@SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = traceReconstruction;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
merger = traceReconstructionMerger;
new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start();
}
......@@ -82,7 +63,7 @@ public final class TCPReader implements IPeriodicTimeSignalReceiver {
open();
while (active) {
final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(),
ringBuffer);
merger);
thread.start();
threads.add(thread);
}
......
......@@ -6,19 +6,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Queue;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.RingBuffer;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.IdNotAvailableException;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.adaptive_monitoring.AdaptiveMonitoringPatternList;
import explorviz.live_trace_processing.filter.Merger;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
......@@ -41,7 +38,6 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.writer.RemoteConfigurator;
class TCPReaderOneClient extends Thread {
......@@ -53,7 +49,6 @@ class TCPReaderOneClient extends Thread {
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private final SocketChannel socketChannel;
private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE];
private int outputBufferIndex = 0;
......@@ -62,10 +57,13 @@ class TCPReaderOneClient extends Thread {
private Trace currentlyOpenTrace;
public TCPReaderOneClient(final SocketChannel socketChannel,
final RingBuffer<RecordArrayEvent> ringBuffer) {
private final Queue<RecordArrayEvent> queue;
private final Merger merger;
public TCPReaderOneClient(final SocketChannel socketChannel, final Merger merger) {
this.socketChannel = socketChannel;
this.ringBuffer = ringBuffer;
this.merger = merger;
queue = merger.registerProducer();
}
@Override
......@@ -91,18 +89,21 @@ class TCPReaderOneClient extends Thread {
} catch (final IOException ex) {
RemoteConfigurationServlet.getConnectedChildren().remove(remoteAddress);
LOG.info("Error in read() " + ex.getMessage());
} finally {
merger.deregisterProducer(queue);
}
}
private void sendAdaptiveMonitoringList() {
final ConcurrentHashMap<String, Set<String>> patternMap = AdaptiveMonitoringPatternList
.getApplicationToPatternMap();
for (final Entry<String, Set<String>> entry : patternMap.entrySet()) {
for (final String pattern : entry.getValue()) {
RemoteConfigurator.addPattern(remoteAddress, pattern, entry.getKey());
}
}
}
// private void sendAdaptiveMonitoringList() {
// final ConcurrentHashMap<String, Set<String>> patternMap =
// AdaptiveMonitoringPatternList
// .getApplicationToPatternMap();
// for (final Entry<String, Set<String>> entry : patternMap.entrySet()) {
// for (final String pattern : entry.getValue()) {
// RemoteConfigurator.addPattern(remoteAddress, pattern, entry.getKey());
// }
// }
// }
private final void messagesfromByteArray(final ByteBuffer buffer) {
boolean shouldProceed = true;
......@@ -714,15 +715,18 @@ class TCPReaderOneClient extends Thread {
public final void flushOutputBuffer() {
synchronized (this) {
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] oldValues = valueEvent.getValues();
valueEvent.setValues(outputBuffer);
valueEvent.setValueSize(outputBufferIndex);
ringBuffer.publish(hiseq);
final RecordArrayEvent arrayEvent = new RecordArrayEvent(
Constants.TCP_READER_OUTPUT_BUFFER_SIZE);
final IRecord[] savedBuffer = arrayEvent.getValues();
arrayEvent.setValues(outputBuffer);
arrayEvent.setValueSize(outputBufferIndex);
outputBuffer = oldValues;
while (!queue.offer(arrayEvent)) {
LockSupport.parkNanos(1);
}
outputBuffer = savedBuffer;
outputBufferIndex = 0;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment