Commit a1abe1ff authored by Florian Fittkau's avatar Florian Fittkau

minor

parent 576f0072
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="kieker/monitoring/writer/mq/|kieker/monitoring/writer/" kind="src" path="src"/>
<classpathentry kind="src" path="test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/aspectjweaver.jar"/>
<classpathentry kind="lib" path="lib/disruptor-3.2.0.jar">
......
......@@ -2,91 +2,12 @@
<aspectj>
<weaver options=""> <!-- options="-verbose -showWeaveInfo -Xjoinpoints:synchronization" -->
<!-- Use the exclude/include directives to specify which classes are (not) to be considered for weaving.
Some examples are given below. -->
<!-- Use * to consider all classes. In this case, you typically need some additional excludes. -->
<include within="*"/>
<!-- The monitoring solution used within in Kieker must not be monitored! -->
<!-- <exclude within="org.apache.commons.logging..*" /> -->
<!-- <exclude within="org.slf4j..*" /> -->
<!-- <exclude within="java.util.logging..*" /> -->
<!-- <exclude within="org.apache.log4j..*" /> -->
<!-- instrument the kieker.tests -->
<!-- <include within="kieker.tests..*" /> -->
<!-- Example instrumentation for the iBATIS JPetStore -->
<!-- <include within="com.ibatis.jpetstore..*"/> -->
<!-- <include within="org.apache.struts.action.ActionServlet" /> -->
<!-- Example instrumentation for Sun's Java Pet Store -->
<!-- <include within="com.sun.j2ee.blueprints..*"/> -->
<!-- <exclude within="java..*,com.sun.corba..*,com.sun.enterprise..*,com.sun.appserv..*"/> -->
<!-- <exclude within="com.sun.j2ee.blueprints.waf.controller.web.EventMapping,com.sun.j2ee.blueprints.waf.processmanager..*"/> -->
<!-- Instrumentation of the dacapo eclipse benchmark -->
<!-- <include within="org.eclipse..*"/> -->
<!-- include this to enable javac compilation (instead of ajc) of the probes -->
<!-- <include within="kieker.monitoring.probe.aspectj..*"/> -->
</weaver>
<aspects>
<!-- Use the aspect directives to specify the aspect(s) to use (typically only one). -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectAnnotation"/> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectAnnotationServlet"/> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectFull"/> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectFullNoGetterAndSetter"/> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectFullServlet"/> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.operationExecution.OperationExecutionAspectFullNoGetterAndSetterServlet"/> -->
<!-- KiekerFlow Aspects -->
<!-- ################## -->
<!-- Concurrency -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.concurrency.ThreadingAspect" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.concurrency.SynchronizedAspect" /> -->
<!-- Construction -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.construction.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.construction.Annotation" /> -->
<!-- ConstructorExecution -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.constructorExecution.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.constructorExecution.Annotation" /> -->
<!-- ConstructorCall -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.constructorCall.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.constructorCall.Annotation" /> -->
<!-- OperationExecution -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecution.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecution.FullInstrumentationNoGetterAndSetter" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecution.Annotation" /> -->
<!-- OperationExecutionObject -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecutionObject.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecutionObject.FullInstrumentationNoGetterAndSetter" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationExecutionObject.Annotation" /> -->
<!-- OperationCall -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCall.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCall.FullInstrumentationNoGetterAndSetter" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCall.Annotation" /> -->
<!-- ObjectOperationCall -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCallObject.FullInstrumentation" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCallObject.FullInstrumentationNoGetterAndSetter" /> -->
<!-- <aspect name="kieker.monitoring.probe.aspectj.flow.operationCallObject.Annotation" /> -->
<!-- Definition of additional targeted aspects (example for a single additional getter) -->
<concrete-aspect name="explorviz.hpc_monitoring.probe.TargetedAspect" extends="explorviz.hpc_monitoring.probe.AbstractAspect">
<pointcut name="monitoredOperation" expression="execution(* testpackage.TestClass*.*(..))" />
</concrete-aspect>
</aspects>
</aspectj>
This diff is collapsed.
......@@ -2,136 +2,133 @@ package explorviz.hpc_monitoring.writer;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.*;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.URL;
import com.lmax.disruptor.EventHandler;
public class TCPWriter implements EventHandler<ByteArrayEvent> {
private static final int MESSAGE_BUFFER_SIZE = 65536;
private URL providerURL;
private URL loadBalancerURL;
private final int loadBalancerWaitTimeInNs;
private LoadBalancerThread loadBalanceThread;
private Socket socket;
private BufferedOutputStream bufferedOutputStream;
public TCPWriter(final String hostname, final int port) {
try {
providerURL = new URL("http://" + hostname + ":" + port);
}
catch (final MalformedURLException e) {
e.printStackTrace();
}
loadBalancerWaitTimeInNs = 0;
loadBalancerURL = null;
try {
connect();
}
catch (final IOException e) {
e.printStackTrace();
}
}
public TCPWriter(final String hostname, final int port,
final String loadBalancerHostname, final int loadBalancerPort,
final int loadBalancerWaitTimeInNs) {
try {
providerURL = new URL("http://" + hostname + ":" + port);
loadBalancerURL = new URL("http://" + loadBalancerHostname + ":"
+ loadBalancerPort);
}
catch (final MalformedURLException e) {
loadBalancerURL = null;
e.printStackTrace();
}
this.loadBalancerWaitTimeInNs = loadBalancerWaitTimeInNs;
createLoadBalancer();
}
private void createLoadBalancer() {
providerURL = null;
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
loadBalanceThread = new LoadBalancerThread(loadBalancerURL,
loadBalancerWaitTimeInNs, this);
loadBalanceThread.start();
synchronized (this) {
while (providerURL == null) {
try {
this.wait();
}
catch (final InterruptedException e) {
return;
}
}
}
}
private void connect() throws IOException {
socket = new Socket(providerURL.getHost(), providerURL.getPort());
bufferedOutputStream = new BufferedOutputStream(
socket.getOutputStream(), MESSAGE_BUFFER_SIZE);
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
sendMessage(event.getValue(), event.getLength());
}
private final void sendMessage(final byte[] message, final int length) {
try {
bufferedOutputStream.write(message, 0, length);
// if (endOfBatch) {
// bufferedOutputStream.flush();
// }
}
catch (final IOException e) {
e.printStackTrace();
}
}
public final void cleanup() {
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
disconnect();
}
private void disconnect() {
if (socket.isConnected()) {
try {
socket.close();
}
catch (final IOException e) {
e.printStackTrace();
}
}
}
public void setProvider(final URL newProviderURL) {
synchronized (this) {
if (!newProviderURL.getHost().equals(providerURL.getHost())
|| (newProviderURL.getPort() != providerURL.getPort())) {
disconnect();
try {
connect();
providerURL = newProviderURL;
notifyAll();
}
catch (final IOException e) {
e.printStackTrace();
}
}
}
}
private static final int MESSAGE_BUFFER_SIZE = 65536;
private URL providerURL;
private URL loadBalancerURL;
private final int loadBalancerWaitTimeInNs;
private LoadBalancerThread loadBalanceThread;
private Socket socket;
private BufferedOutputStream bufferedOutputStream;
public TCPWriter(final String hostname, final int port) {
try {
providerURL = new URL("http://" + hostname + ":" + port);
} catch (final MalformedURLException e) {
e.printStackTrace();
}
loadBalancerWaitTimeInNs = 0;
loadBalancerURL = null;
try {
connect();
} catch (final IOException e) {
e.printStackTrace();
}
}
public TCPWriter(final String hostname, final int port,
final String loadBalancerHostname, final int loadBalancerPort,
final int loadBalancerWaitTimeInNs) {
try {
providerURL = new URL("http://" + hostname + ":" + port);
loadBalancerURL = new URL("http://" + loadBalancerHostname + ":"
+ loadBalancerPort);
} catch (final MalformedURLException e) {
loadBalancerURL = null;
e.printStackTrace();
}
this.loadBalancerWaitTimeInNs = loadBalancerWaitTimeInNs;
createLoadBalancer();
}
private void createLoadBalancer() {
providerURL = null;
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
loadBalanceThread = new LoadBalancerThread(loadBalancerURL,
loadBalancerWaitTimeInNs, this);
loadBalanceThread.start();
synchronized (this) {
while (providerURL == null) {
try {
this.wait();
} catch (final InterruptedException e) {
return;
}
}
}
}
private void connect() throws IOException {
socket = new Socket(providerURL.getHost(), providerURL.getPort());
bufferedOutputStream = new BufferedOutputStream(
socket.getOutputStream(), MESSAGE_BUFFER_SIZE);
}
@Override
public void onEvent(final ByteArrayEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
sendMessage(event.getValue(), event.getLength(), endOfBatch);
}
private final void sendMessage(final byte[] message, final int length,
final boolean endOfBatch) {
try {
bufferedOutputStream.write(message, 0, length);
if (endOfBatch) {
bufferedOutputStream.flush();
}
} catch (final IOException e) {
e.printStackTrace();
}
}
public final void cleanup() {
if (loadBalanceThread != null) {
loadBalanceThread.interrupt();
}
disconnect();
}
private void disconnect() {
if (socket.isConnected()) {
try {
socket.close();
} catch (final IOException e) {
e.printStackTrace();
}
}
}
public void setProvider(final URL newProviderURL) {
synchronized (this) {
if (!newProviderURL.getHost().equals(providerURL.getHost())
|| (newProviderURL.getPort() != providerURL.getPort())) {
disconnect();
try {
connect();
providerURL = newProviderURL;
notifyAll();
} catch (final IOException e) {
e.printStackTrace();
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment