Commit 594dbbd8 authored by Florian Fittkau's avatar Florian Fittkau

moved from disruptor to jctools

parent cc0fb281
......@@ -8,9 +8,9 @@
<classpathentry kind="lib" path="lib/geronimo-jms_1.1_spec-1.1.1.jar"/>
<classpathentry kind="lib" path="lib/activemq-broker-5.9.1.jar"/>
<classpathentry kind="lib" path="lib/aspectjweaver-1.8.5.jar"/>
<classpathentry kind="lib" path="lib/jctools-core-1.0.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/common-monitoring"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
<classpathentry kind="lib" path="lib/disruptor-3.3.0.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
......@@ -70,7 +70,7 @@
<fileset dir="${src.dir}" includes="META-INF/aop.xml" />
<fileset dir="${src.dir}" includes="META-INF/explorviz.live_trace_processing.default.properties" />
<fileset dir="${src.dir}" includes="META-INF/jaxws_probe_handlers.xml" />
<zipfileset excludes="META-INF/*.SF" src="lib/disruptor-3.3.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/aspectjweaver-1.8.5.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
......@@ -97,7 +97,7 @@
</manifest>
<fileset dir="${build.dir}" excludes="*common*.jar" />
<fileset dir="lib" includes="*LICENSE*" />
<zipfileset excludes="META-INF/*.SF" src="lib/disruptor-3.3.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/jctools-core-1.0.jar"/>
<zipfileset excludes="META-INF/*.SF" src="${build.dir}/explorviz-common.jar"/>
<zipfileset excludes="META-INF/*.SF" src="lib/slf4j-api-1.7.5.jar"/>
</jar>
......
......@@ -2,16 +2,10 @@ package explorviz.live_trace_processing.main;
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventFactory;
import explorviz.live_trace_processing.Constants;
/**
* WARNING: This is a mutable object which will be recycled by the RingBuffer.
* You must take a copy of data it holds before the framework recycles it.
*/
public final class ByteBufferEvent {
private ByteBuffer value = ByteBuffer.allocateDirect(Constants.SENDING_BUFFER_SIZE);
private ByteBuffer value = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
public final ByteBuffer getValue() {
return value;
......@@ -20,11 +14,4 @@ public final class ByteBufferEvent {
public void setValue(final ByteBuffer value) {
this.value = value;
}
public final static EventFactory<ByteBufferEvent> EVENT_FACTORY = new EventFactory<ByteBufferEvent>() {
@Override
public ByteBufferEvent newInstance() {
return new ByteBufferEvent();
}
};
}
\ No newline at end of file
......@@ -12,14 +12,6 @@ import java.util.concurrent.TimeUnit;
import org.aspectj.lang.Signature;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.probe.tracemanagement.TraceRegistry;
......@@ -30,8 +22,6 @@ import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class MonitoringController {
private static final SystemMonitor systemMonitor;
private static final RingBuffer<ByteBufferEvent> ringBuffer;
private static final Disruptor<ByteBufferEvent> disruptor;
private static final ExecutorService exec;
private static final Map<Signature, Integer> signatureReg = new ConcurrentSkipListMap<Signature, Integer>(
......@@ -53,30 +43,18 @@ public class MonitoringController {
TraceRegistry.init(configuration);
final WaitStrategy waitStrategy = new BlockingWaitStrategy(); // new
// YieldingWaitStrategy();
// if ((monitoringEnabled == false) && (systemMonitorEnabled == true)) {
// waitStrategy = new BlockingWaitStrategy();
// }
exec = Executors.newCachedThreadPool();
disruptor = new Disruptor<ByteBufferEvent>(ByteBufferEvent.EVENT_FACTORY,
Constants.MONITORING_CONTROLLER_DISRUPTOR_SIZE, exec, ProducerType.MULTI,
waitStrategy);
final boolean androidMonitoring = configuration
.getBooleanProperty(ConfigurationFactory.ANDROID_MONITORING);
@SuppressWarnings("unchecked")
final EventHandler<ByteBufferEvent>[] eventHandlers = new EventHandler[1];
final TCPWriter tcpWriter = new TCPWriter(
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT, 10133),
androidMonitoring, configuration);
eventHandlers[0] = tcpWriter;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
// TODO bind tcpWriter
final boolean loadBalancerEnabled = configuration
.getBooleanProperty(ConfigurationFactory.LOAD_BALANCER_ENABLED);
......@@ -90,9 +68,9 @@ public class MonitoringController {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT, 9999),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME,
20000),
configuration
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
tcpWriter);
tcpWriter);
} else {
try {
tcpWriter.connect();
......@@ -126,15 +104,8 @@ public class MonitoringController {
return;
}
final long hiseq = ringBuffer.next();
final ByteBufferEvent valueEvent = ringBuffer.get(hiseq);
buffer.flip();
final ByteBuffer oldBuffer = valueEvent.getValue();
oldBuffer.clear();
oldBuffer.put(buffer);
valueEvent.setValue(oldBuffer);
ringBuffer.publish(hiseq);
// TODO sent and recreate it!
buffer.clear();
}
......@@ -149,16 +120,14 @@ public class MonitoringController {
}
public static final void sendOutSystemRecord(final SystemMonitoringRecord record) {
final long hiseq = ringBuffer.next();
final ByteBufferEvent valueEvent = ringBuffer.get(hiseq);
final ByteBuffer buffer = valueEvent.getValue();
final ByteBufferEvent byteBufferEvent = new ByteBufferEvent();
final ByteBuffer buffer = byteBufferEvent.getValue();
buffer.clear();
buffer.put(SystemMonitoringRecord.CLAZZ_ID);
buffer.putDouble(record.getCpuUtilization());
buffer.putLong(record.getUsedRAM());
buffer.putLong(record.getAbsoluteRAM());
valueEvent.setValue(buffer);
ringBuffer.publish(hiseq);
// TODO sent it! flip it?
}
public static void shutdown() {
......@@ -174,7 +143,7 @@ public class MonitoringController {
// remoteConfigurationServlet.stop();
disruptor.shutdown();
// TODO terminate merger
exec.shutdown();
......
package explorviz.live_trace_processing.writer;
import explorviz.live_trace_processing.main.ByteBufferEvent;
public interface IByteBufferReceiver {
void onEvent(ByteBufferEvent event);
}
......@@ -9,8 +9,6 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import com.lmax.disruptor.EventHandler;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.debug.DebugFileByteBufferWriter;
......@@ -18,7 +16,7 @@ import explorviz.live_trace_processing.main.ByteBufferEvent;
import explorviz.live_trace_processing.main.MonitoringStringRegistry;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
public class TCPWriter implements EventHandler<ByteBufferEvent>, IWriter {
public class TCPWriter implements IByteBufferReceiver, IWriter, Runnable {
private final boolean debug;
private DebugFileByteBufferWriter debugWriter;
......@@ -61,6 +59,11 @@ public class TCPWriter implements EventHandler<ByteBufferEvent>, IWriter {
}
}
@Override
public void run() {
// TODO Auto-generated method stub
}
@Override
public URL getProviderURL() {
return providerURL;
......@@ -174,14 +177,19 @@ public class TCPWriter implements EventHandler<ByteBufferEvent>, IWriter {
}
@Override
public void onEvent(final ByteBufferEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
public void onEvent(final ByteBufferEvent event) {
final ByteBuffer buffer = event.getValue();
buffer.flip();
while ((socketChannel == null) || (!socketChannel.isConnected()) || !metaDataSent) {
Thread.sleep(1);
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
if (androidMonitoring) {
connect();
try {
connect();
} catch (final IOException e) {
}
}
}
if (debug) {
......
......@@ -111,7 +111,7 @@ public class TCPWriterTest {
assertEquals(3, readBuffer.getInt());
readBuffer.clear();
final ByteBufferEvent bufferEvent = ByteBufferEvent.EVENT_FACTORY.newInstance();
final ByteBufferEvent bufferEvent = new ByteBufferEvent();
final ByteBuffer buffer = bufferEvent.getValue();
buffer.put(BeforeOperationEventRecord.CLAZZ_ID);
......@@ -127,7 +127,7 @@ public class TCPWriterTest {
buffer.putLong(0);
buffer.putInt(1);
tcpWriter.onEvent(bufferEvent, 0, true);
tcpWriter.onEvent(bufferEvent);
Thread.sleep(100);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment