Commit 3b7fa326 authored by Sören Henning's avatar Sören Henning

minor fixes and documentation

parent 46b2ddc5
......@@ -27,13 +27,11 @@ public class AnalysisConfiguration extends Configuration {
final TcpReaderStage tcpReaderStage = new TcpReaderStage();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> flowRecordFilter = new InstanceOfFilter<>(IFlowRecord.class);
final RecordReconstructorStage recordReconstructor = new RecordReconstructorStage();
// final PrinterStage printerStage = new PrinterStage(); // TODO Temp
// Connect the stages
super.connectPorts(tcpReaderStage.getOutputPort(), flowRecordFilter.getInputPort());
super.connectPorts(flowRecordFilter.getMatchedOutputPort(), recordReconstructor.getInputPort());
super.connectPorts(recordReconstructor.getOutputPort(), this.distributor.getInputPort());
// super.connectPorts(this.distributor.getNewOutputPort(RecordFilter.builder().build()), printerStage.getInputPort()); // TODO Temp
}
......
......@@ -14,6 +14,8 @@ public class CassandraManager {
private final static int WAITING_SLEEP_MILLIS = 1000;
// BETTER A logger should be used to replace the System.out.println()
public CassandraManager(final String host, final int port, final String keyspace, final int timeoutInMillis) {
createSession(host, port, keyspace, timeoutInMillis);
}
......@@ -32,7 +34,7 @@ public class CassandraManager {
break;
} catch (NoHostAvailableException exception) {
// Host not unavailable
System.out.println("Waiting for host..."); // TODO
// System.out.println("Waiting for host...");
if (Duration.between(start, Instant.now()).toMillis() < timeoutInMillis) {
cluster.close();
cluster = Cluster.builder().addContactPoint(host).withPort(port).build();
......@@ -46,7 +48,7 @@ public class CassandraManager {
}
} catch (InvalidQueryException exception) {
// Keyspace does not exist
System.out.println("Create Keyspace..."); // TODO
// System.out.println("Create Keyspace...");
createKeyspaceIfNotExists(cluster, keyspace);
}
}
......
package kiekpad.analysis;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
import kiekpad.analysis.domain.MonitoringRecord;
import kiekpad.analysis.stage.RecordConverterStage;
import kiekpad.analysis.stage.RecordReconstructorStage;
import teead.AnomalyDetectionStage;
import teead.StorableAnomalyDetectionStage;
import teetime.framework.Configuration;
import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
import teetime.stage.io.network.TcpReaderStage;
public class SimpleConfiguration extends Configuration {
public SimpleConfiguration() {
// Create the stages
final TcpReaderStage tcpReaderStage = new TcpReaderStage();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> flowRecordFilter = new InstanceOfFilter<>(IFlowRecord.class);
final RecordReconstructorStage recordReconstructor = new RecordReconstructorStage();
final Distributor<MonitoringRecord> distributor = new Distributor<>(new CopyByReferenceStrategy());
final RecordConverterStage recordConverter = new RecordConverterStage();
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(null, null, null, null, null);
// Connect the stages
super.connectPorts(tcpReaderStage.getOutputPort(), flowRecordFilter.getInputPort());
super.connectPorts(flowRecordFilter.getMatchedOutputPort(), recordReconstructor.getInputPort());
super.connectPorts(recordReconstructor.getOutputPort(), distributor.getInputPort());
super.connectPorts(distributor.getNewOutputPort(), recordConverter.getInputPort());
super.connectPorts(recordConverter.getOutputPort(), anomalyDetector.getInputPort());
}
}
package kiekpad.analysis.domain;
import java.time.Instant;
public class MonitoringRecord {
private String operationSignature;
private String classSignature;
private String hostname;
private String sessionId;
private long threadId;
private Instant time;
private long duration; // in nanoseconds
public MonitoringRecord() {}
public MonitoringRecord(final String operationSignature, final String classSignature, final String hostname, final String sessionId, final long threadId,
final Instant time, final long duration) {
this.operationSignature = operationSignature;
this.classSignature = classSignature;
this.hostname = hostname;
this.sessionId = sessionId;
this.threadId = threadId;
this.time = time;
this.duration = duration;
}
public String getOperationSignature() {
return operationSignature;
}
public void setOperationSignature(final String operationSignature) {
this.operationSignature = operationSignature;
}
public String getClassSignature() {
return classSignature;
}
public void setClassSignature(final String classSignature) {
this.classSignature = classSignature;
}
public String getHostname() {
return hostname;
}
public void setHostname(final String hostname) {
this.hostname = hostname;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(final String sessionId) {
this.sessionId = sessionId;
}
public long getThreadId() {
return threadId;
}
public void setThreadId(final long threadId) {
this.threadId = threadId;
}
public Instant getTime() {
return time;
}
public void setTime(final Instant time) {
this.time = time;
}
public long getDuration() {
return duration;
}
public void setDuration(final long duration) {
this.duration = duration;
}
}
package kiekpad.analysis.domain;
import java.time.Instant;
/**
* This class represents a monitored operation call.
*
* @author Sören Henning
*
*/
public class MonitoringRecord {
private String operationSignature;
private String classSignature;
private String hostname;
private String sessionId;
private long threadId;
private Instant time;
private long duration; // in nanoseconds
public MonitoringRecord() {}
public MonitoringRecord(final String operationSignature, final String classSignature, final String hostname, final String sessionId, final long threadId,
final Instant time, final long duration) {
this.operationSignature = operationSignature;
this.classSignature = classSignature;
this.hostname = hostname;
this.sessionId = sessionId;
this.threadId = threadId;
this.time = time;
this.duration = duration;
}
public String getOperationSignature() {
return operationSignature;
}
public void setOperationSignature(final String operationSignature) {
this.operationSignature = operationSignature;
}
public String getClassSignature() {
return classSignature;
}
public void setClassSignature(final String classSignature) {
this.classSignature = classSignature;
}
public String getHostname() {
return hostname;
}
public void setHostname(final String hostname) {
this.hostname = hostname;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(final String sessionId) {
this.sessionId = sessionId;
}
public long getThreadId() {
return threadId;
}
public void setThreadId(final long threadId) {
this.threadId = threadId;
}
public Instant getTime() {
return time;
}
public void setTime(final Instant time) {
this.time = time;
}
public long getDuration() {
return duration;
}
public void setDuration(final long duration) {
this.duration = duration;
}
}
......@@ -2,6 +2,14 @@ package kiekpad.analysis.domain;
import java.util.function.Predicate;
/**
* A record filter tests whether monitoring records are of a certain type.
* It can be configured by the criteria operation signature, class signature, host name, session identifier, and thread identifier.
* If no value is set for a criteria, this means it is not tested and acts like a wildcard.
*
* @author Sören Henning
*
*/
public class RecordFilter implements Predicate<MonitoringRecord> {
private String operationSignature = null;
......
package kiekpad.analysis.domain;
import java.util.ArrayDeque;
import java.util.Deque;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
public class Trace {
private final Deque<BeforeOperationEvent> buffer = new ArrayDeque<>();
private final String hostname;
private final String sessionId;
private final long threadId;
public Trace(final TraceMetadata record) {
this.hostname = record.getHostname();
this.sessionId = record.getSessionId();
this.threadId = record.getThreadId();
}
public void pushEvent(final BeforeOperationEvent event) {
this.buffer.push(event);
}
public BeforeOperationEvent popEvent() {
return this.buffer.pop();
}
public boolean isBufferEmpty() {
return this.buffer.isEmpty();
}
public String getHostname() {
return hostname;
}
public String getSessionId() {
return sessionId;
}
public long getThreadId() {
return threadId;
}
}
package kiekpad.analysis.domain;
import java.util.ArrayDeque;
import java.util.Deque;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
/**
* A class represents traces.
*
* @author Sören Henning
*
*/
public class Trace {
private final Deque<BeforeOperationEvent> buffer = new ArrayDeque<>();
private final String hostname;
private final String sessionId;
private final long threadId;
public Trace(final TraceMetadata record) {
this.hostname = record.getHostname();
this.sessionId = record.getSessionId();
this.threadId = record.getThreadId();
}
public void pushEvent(final BeforeOperationEvent event) {
this.buffer.push(event);
}
public BeforeOperationEvent popEvent() {
return this.buffer.pop();
}
public boolean isBufferEmpty() {
return this.buffer.isEmpty();
}
public String getHostname() {
return hostname;
}
public String getSessionId() {
return sessionId;
}
public long getThreadId() {
return threadId;
}
}
......@@ -3,6 +3,12 @@ package kiekpad.analysis.stage;
import kiekpad.analysis.domain.MonitoringRecord;
import teetime.framework.AbstractConsumerStage;
/**
* A stage that prints the data of {@code MonitoringRecord}s to System.out
*
* @author Sören Henning
*
*/
public class PrinterStage extends AbstractConsumerStage<MonitoringRecord> {
@Override
......
package kiekpad.analysis.stage;
import java.time.Instant;
import kiekpad.analysis.domain.MonitoringRecord;
import teead.measurement.Measurement;
import teetime.stage.basic.AbstractTransformation;
public class RecordConverterStage extends AbstractTransformation<MonitoringRecord, Measurement> {
@Override
protected void execute(final MonitoringRecord record) {
final Instant time = record.getTime();
final double value = record.getDuration();
final Measurement measurement = new Measurement(time, value);
this.outputPort.send(measurement);
}
}
package kiekpad.analysis.stage;
import java.time.Instant;
import kiekpad.analysis.domain.MonitoringRecord;
import teead.measurement.Measurement;
import teetime.stage.basic.AbstractTransformation;
/**
* A TeeTime stage that transforms {@link MonitoringRecord}s to {@link Measurement}s
*
* @author Sören Henning
*
*/
public class RecordConverterStage extends AbstractTransformation<MonitoringRecord, Measurement> {
@Override
protected void execute(final MonitoringRecord record) {
final Instant time = record.getTime();
final double value = record.getDuration();
final Measurement measurement = new Measurement(time, value);
this.outputPort.send(measurement);
}
}
package kiekpad.analysis.stage;
import kiekpad.analysis.domain.MonitoringRecord;
import kiekpad.analysis.domain.RecordFilter;
import kiekpad.analysis.util.FilterStage;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
public class RecordDistributorStage extends CompositeStage {
private final InputPort<MonitoringRecord> inputPort;
private final Distributor<MonitoringRecord> distributor;
public RecordDistributorStage() {
this.distributor = new Distributor<>(new CopyByReferenceStrategy());
this.inputPort = this.distributor.getInputPort();
}
public OutputPort<MonitoringRecord> getNewOutputPort(final RecordFilter filter) {
final FilterStage<MonitoringRecord> filterStage = new FilterStage<>(filter);
super.connectPorts(this.distributor.getNewOutputPort(), filterStage.getInputPort());
return filterStage.getOutputPort();
}
public InputPort<MonitoringRecord> getInputPort() {
return this.inputPort;
}
}
package kiekpad.analysis.stage;
import kiekpad.analysis.domain.MonitoringRecord;
import kiekpad.analysis.domain.RecordFilter;
import kiekpad.analysis.util.FilterStage;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
/**
* A TeeTime stage that distributes {@link MonitoringRecord}s to output ports if they match a given {@link RecordFilter}
*
* @author Sören Henning
*
*/
public class RecordDistributorStage extends CompositeStage {
private final InputPort<MonitoringRecord> inputPort;
private final Distributor<MonitoringRecord> distributor;
public RecordDistributorStage() {
this.distributor = new Distributor<>(new CopyByReferenceStrategy());
this.inputPort = this.distributor.getInputPort();
}
public OutputPort<MonitoringRecord> getNewOutputPort(final RecordFilter filter) {
final FilterStage<MonitoringRecord> filterStage = new FilterStage<>(filter);
super.connectPorts(this.distributor.getNewOutputPort(), filterStage.getInputPort());
return filterStage.getOutputPort();
}
public InputPort<MonitoringRecord> getInputPort() {
return this.inputPort;
}
}
package kiekpad.analysis.stage;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
import kiekpad.analysis.domain.MonitoringRecord;
import kiekpad.analysis.domain.Trace;
import teetime.stage.basic.AbstractTransformation;
public class RecordReconstructorStage extends AbstractTransformation<IFlowRecord, MonitoringRecord> {
private final Map<Long, Trace> traces = new HashMap<>();
private Instant start = null;
private long offsetInNs;
@Override
protected void execute(final IFlowRecord record) {
if (this.start == null) {
this.start = Instant.now();
this.offsetInNs = record.getLoggingTimestamp();
}
if (record instanceof TraceMetadata) {
this.handleMetadataRecord((TraceMetadata) record);
} else if (record instanceof AbstractOperationEvent) {
this.handleOperationEventRecord((AbstractOperationEvent) record);
}
}
private void handleMetadataRecord(final TraceMetadata record) {
// Create a new trace buffer for the new incoming trace
final long traceID = record.getTraceId();
final Trace trace = new Trace(record);
this.traces.put(traceID, trace);
}
private void handleOperationEventRecord(final AbstractOperationEvent event) {
// Check whether there was an incoming trace meta data record before
if (this.traces.get(event.getTraceId()) != null) {
if (event instanceof BeforeOperationEvent) {
this.handleBeforeOperationEventRecord((BeforeOperationEvent) event);
} else if (event instanceof AfterOperationEvent) {
this.handleAfterOperationEventRecord((AfterOperationEvent) event);
}
} else {
// TODO how to handle this?
}
}
private void handleBeforeOperationEventRecord(final BeforeOperationEvent event) {
this.traces.get(event.getTraceId()).pushEvent(event);
}
private void handleAfterOperationEventRecord(final AfterOperationEvent event) {
final Trace trace = this.traces.get(event.getTraceId());
final BeforeOperationEvent beforeEvent = trace.popEvent();
if (trace.isBufferEmpty()) {
this.traces.remove(event.getTraceId());
}
if (event instanceof AfterOperationFailedEvent) {
// TODO
}
MonitoringRecord record = new MonitoringRecord();
record.setOperationSignature(event.getOperationSignature());
record.setClassSignature(event.getClassSignature());
record.setHostname(trace.getHostname());
record.setSessionId(trace.getSessionId());
record.setThreadId(trace.getThreadId());
record.setTime(this.start.plusNanos(beforeEvent.getTimestamp() - this.offsetInNs));
record.setDuration(event.getTimestamp() - beforeEvent.getTimestamp());
this.outputPort.send(record);
}
}
package kiekpad.analysis.stage;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
import kiekpad.analysis.domain.MonitoringRecord;
import kiekpad.analysis.domain.Trace;
import teetime.stage.basic.AbstractTransformation;
/**
* A TeeTime stage that reconstructs a sequence of {@link IFlowRecord}s to a sequence of {@link MonitoringRecord}s
*
* @author Sören Henning
*
*/
public class RecordReconstructorStage extends AbstractTransformation<IFlowRecord, MonitoringRecord> {
private final Map<Long, Trace> traces = new HashMap<>();
private Instant start = null;
private long offsetInNs;
@Override
protected void execute(final IFlowRecord record) {
if (this.start == null) {
this.start = Instant.now();
this.offsetInNs = record.getLoggingTimestamp();
}
if (record instanceof TraceMetadata) {
this.handleMetadataRecord((TraceMetadata) record);
} else if (record instanceof AbstractOperationEvent) {
this.handleOperationEventRecord((AbstractOperationEvent) record);
}
}
private void handleMetadataRecord(final TraceMetadata record) {
// Create a new trace buffer for the new incoming trace
final long traceID = record.getTraceId();
final Trace trace = new Trace(record);
this.traces.put(traceID, trace);
}
private void handleOperationEventRecord(final AbstractOperationEvent event) {
// Check whether there was an incoming trace meta data record before
if (this.traces.get(event.getTraceId()) != null) {
if (event instanceof BeforeOperationEvent) {
this.handleBeforeOperationEventRecord((BeforeOperationEvent) event);
} else if (event instanceof AfterOperationEvent) {
this.handleAfterOperationEventRecord((AfterOperationEvent) event);
}
} else {
// TODO how to handle this?
}
}
private void handleBeforeOperationEventRecord(final BeforeOperationEvent event) {
this.traces.get(event.getTraceId()).pushEvent(event);
}
private void handleAfterOperationEventRecord(final AfterOperationEvent event) {
final Trace trace = this.traces.get(event.getTraceId());
final BeforeOperationEvent beforeEvent = trace.popEvent();
if (trace.isBufferEmpty()) {
this.traces.remove(event.getTraceId());
}
if (event instanceof AfterOperationFailedEvent) {
// TODO
}
MonitoringRecord record = new MonitoringRecord();
record.setOperationSignature(event.getOperationSignature());
record.setClassSignature(event.getClassSignature());
record.setHostname(trace.getHostname());
record.setSessionId(trace.getSessionId());
record.setThreadId(trace.getThreadId());
record.setTime(this.start.plusNanos(beforeEvent.getTimestamp() - this.offsetInNs));
record.setDuration(event.getTimestamp() - beforeEvent.getTimestamp());
this.outputPort.send(record);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment