Commit 6509abb0 authored by Sören Henning's avatar Sören Henning

Add 'src/main/java/anomalydetection/' from commit 'c7935d8b'

git-subtree-dir: src/main/java/anomalydetection
git-subtree-mainline: 2f4107e8
git-subtree-split: c7935d8b
parents 2f4107e8 c7935d8b
package anomalydetection;
import anomalydetection.measurement.Measurement;
import teetime.framework.AbstractProducerStage;
abstract class AbstractMeassurementsGeneratorStage extends AbstractProducerStage<Measurement> {
protected final MeasurementGenerator generator;
protected final long generations;
public AbstractMeassurementsGeneratorStage(final MeasurementGenerator generator, final long generations) {
super();
this.generator = generator;
this.generations = generations;
}
@Override
protected void execute() {
for (int i = 0; i < this.generations; i++) {
sendMeasurement();
}
this.terminateStage();
}
protected abstract void sendMeasurement();
}
package anomalydetection;
import java.io.PrintStream;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import teetime.framework.AbstractConsumerStage;
public class AlertPrinterStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final PrintStream stream;
public AlertPrinterStage() {
this.stream = System.out;
}
public AlertPrinterStage(final PrintStream stream) {
this.stream = stream;
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
this.stream.println("ALERT! Score: " + measurement.getAnomalyScore());
}
}
package anomalydetection;
import java.io.File;
import java.time.Duration;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import anomalydetection.aggregation.MeanAggregator;
import anomalydetection.forecast.RegressionForecaster;
import anomalydetection.storage.CassandraDriver;
import teetime.framework.Configuration;
public class AnomalyDetectionConfiguration extends Configuration {
public AnomalyDetectionConfiguration() {
String ipAddress = "192.168.99.100";
int port = 32770;
String keyspace = "demo3";
Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
Session session = cluster.connect(keyspace);
final MeasurementGenerator gen = new MeasurementGenerator();
gen.setFunction(x -> 500 * Math.sin(x / 60) + 2000);
gen.setNoise(250);
gen.setAnomalyProbability(0.01);
gen.setAnomayStrength(1000);
gen.setMinStepDistance(Duration.ofMillis(500));
gen.setMinStepDistance(Duration.ofSeconds(2));
// Create the stages
final RealTimeMeasurementGeneratorStage generator = new RealTimeMeasurementGeneratorStage(gen, 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 60) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(1),3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.pow(Math.sin(x / 60), 1 / 101) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(5), 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 0.5 * x + 1000, 250, 0.01, 1000, Duration.ofSeconds(1),
// 900);
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(30), new MeanAggregator(), new
// ARIMAForecaster("192.168.99.100", 6311));
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),new
// RegressionForecaster());
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new RegressionForecaster(), new CassandraDriver(session, "measurements", "temp"));
final AlertPrinterStage alerter = new AlertPrinterStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExportStage jsonExporter = new JSONExportStage(new File("values.json")); // TODO Temp
// Connect the stages
super.connectPorts(generator.getOutputPort(), anomalyDetector.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(0.9), alerter.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), printer.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), jsonExporter.getInputPort());
// cluster.close();
}
}
package anomalydetection;
import java.time.Duration;
import anomalydetection.aggregation.Aggregator;
import anomalydetection.forecast.Forecaster;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.measurement.Measurement;
import anomalydetection.timeseries.BoundedTimeSeries;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
public class AnomalyDetectionStage extends CompositeStage {
private final InputPort<Measurement> inputPort;
final Distributor<AnomalyScoredMeasurement> anomalyScoreDistributor = new Distributor<>(new CopyByReferenceStrategy());
protected AnomalyDetectionStage(final BoundedTimeSeries slidingWindow, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster1) {
// Create the stages
final Distributor<Measurement> measurementDistributor = new Distributor<>(new CopyByReferenceStrategy());
final ExtractorStage extractor = new ExtractorStage(slidingWindow);
final NormalizerStage normalizerStage = new NormalizerStage(normalizationDuration, aggregator);
final ForecastStage forecaster = new ForecastStage(forecaster1);
final MeasurementForecastDecorationStage measurementForecastDecorator = new MeasurementForecastDecorationStage();
final AnomalyScoreCalculatorStage anomalyScoreCalculator = new AnomalyScoreCalculatorStage();
this.inputPort = measurementDistributor.getInputPort();
// Connect the stages
super.connectPorts(measurementDistributor.getNewOutputPort(), extractor.getInputPort());
super.connectPorts(extractor.getOutputPort(), normalizerStage.getInputPort());
super.connectPorts(normalizerStage.getOutputPort(), forecaster.getInputPort());
super.connectPorts(forecaster.getOutputPort(), measurementForecastDecorator.getInputPort1());
super.connectPorts(measurementDistributor.getNewOutputPort(), measurementForecastDecorator.getInputPort2());
super.connectPorts(measurementForecastDecorator.getOutputPort(), anomalyScoreCalculator.getInputPort());
super.connectPorts(anomalyScoreCalculator.getOutputPort(), anomalyScoreDistributor.getInputPort());
}
public AnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster) {
this(new BoundedTimeSeries(slidingWindowDuration), normalizationDuration, aggregator, forecaster);
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort() {
return this.anomalyScoreDistributor.getNewOutputPort();
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort(final double threshold) {
return getNewOutputPort(new ThresholdFilter(threshold));
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort(final double threshold, final ThresholdFilter.Comparator comparator) {
return getNewOutputPort(new ThresholdFilter(threshold, comparator));
}
private OutputPort<AnomalyScoredMeasurement> getNewOutputPort(final ThresholdFilter thresholdFilter) {
super.connectPorts(this.anomalyScoreDistributor.getNewOutputPort(), thresholdFilter.getInputPort());
return thresholdFilter.getOutputPort();
}
public InputPort<Measurement> getInputPort() {
return inputPort;
}
}
package anomalydetection;
import teetime.framework.Execution;
public class AnomalyDetectionTool {
public static void main(final String[] args) {
// String ipAddress = "192.168.99.100";
// int port = 32770;
// String keyspace = "demo3";
//
// Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
// Session session = cluster.connect(keyspace);
//
// CassandraDriver cassandraDriver = new CassandraDriver(session, "measurements");
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1200, 1100, 0.11));
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1150, 1000, 0.12));
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1150, 1200, 0.13));
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1200, 1100, 0.12));
//
// TimeSeries timeSeries = cassandraDriver.retrieveTimeSeries("abc", Instant.now().minusSeconds(60 * 60), Instant.now());
// System.out.println(timeSeries);
//
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1300, 1000, 0.14));
// cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1350, 1050, 0.13));
//
// session.close();
final AnomalyDetectionConfiguration configuration = new AnomalyDetectionConfiguration();
final Execution<AnomalyDetectionConfiguration> analysis = new Execution<AnomalyDetectionConfiguration>(configuration);
analysis.executeBlocking();
}
}
package anomalydetection;
import anomalydetection.anomalyscore.AnomalyScoreCalculator;
import anomalydetection.anomalyscore.SimpleAnomalyScoreCalculator;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.measurement.ForecastedMeassurement;
import teetime.stage.basic.AbstractTransformation;
public class AnomalyScoreCalculatorStage extends AbstractTransformation<ForecastedMeassurement, AnomalyScoredMeasurement> {
private final AnomalyScoreCalculator anomalyScoreCalculator;
public AnomalyScoreCalculatorStage() {
this.anomalyScoreCalculator = new SimpleAnomalyScoreCalculator();
}
public AnomalyScoreCalculatorStage(final AnomalyScoreCalculator anomalyScoreCalculator) {
this.anomalyScoreCalculator = anomalyScoreCalculator;
}
@Override
protected void execute(final ForecastedMeassurement forecastedMeassurement) {
final double anomalyScore = anomalyScoreCalculator.calculate(forecastedMeassurement.getValue(), forecastedMeassurement.getPrediction());
final AnomalyScoredMeasurement scoredMeasurement = new AnomalyScoredMeasurement(forecastedMeassurement, anomalyScore);
this.outputPort.send(scoredMeasurement);
}
}
package anomalydetection;
import anomalydetection.measurement.Measurement;
import anomalydetection.timeseries.BoundedTimeSeries;
import anomalydetection.timeseries.TimeSeries;
import teetime.stage.basic.AbstractTransformation;
public class ExtractorStage extends AbstractTransformation<Measurement, TimeSeries> {
private final TimeSeriesExtractor extractor;
public ExtractorStage(final BoundedTimeSeries timeSeries) {
this.extractor = new TimeSeriesExtractor(timeSeries);
}
@Override
protected void execute(final Measurement measurement) {
this.outputPort.send(this.extractor.extract(measurement));
}
}
package anomalydetection;
import anomalydetection.forecast.Forecaster;
import anomalydetection.timeseries.EquidistantTimeSeries;
import teetime.stage.basic.AbstractTransformation;
public class ForecastStage extends AbstractTransformation<EquidistantTimeSeries, Double> {
private final Forecaster forecaster;
public ForecastStage(final Forecaster forecaster) {
super();
this.forecaster = forecaster;
}
@Override
protected void execute(final EquidistantTimeSeries timeSeries) {
final double forecast = forecaster.forecast(timeSeries);
this.outputPort.send(forecast);
}
}
package anomalydetection;
import java.io.File;
import java.io.IOException;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import teetime.framework.AbstractConsumerStage;
public class JSONExportStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final JsonGenerator jsonGenerator;
public JSONExportStage(final File file) {
try {
JsonFactory jsonFactory = new JsonFactory();
this.jsonGenerator = jsonFactory.createGenerator(file, JsonEncoding.UTF8);
this.jsonGenerator.writeStartArray();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new IllegalStateException(e);
}
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
try {
this.jsonGenerator.writeStartObject();
writeNumberFieldOrNull("time", measurement.getTime().toEpochMilli());
writeNumberFieldOrNull("measurement", measurement.getValue());
writeNumberFieldOrNull("prediction", measurement.getPrediction());
writeNumberFieldOrNull("anomalyscore", measurement.getAnomalyScore());
this.jsonGenerator.writeEndObject();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new IllegalStateException(e);
}
}
@Override
public void onTerminating() throws Exception {
jsonGenerator.writeEndArray();
jsonGenerator.close();
super.onTerminating();
}
private void writeNumberFieldOrNull(final String key, final double value) throws IOException {
if (Double.isFinite(value)) {
this.jsonGenerator.writeNumberField(key, value);
} else {
this.jsonGenerator.writeNullField(key);
}
}
}
package anomalydetection;
import kieker.analysis.util.AbstractBiCombinerStage;
import anomalydetection.measurement.ForecastedMeassurement;
import anomalydetection.measurement.Measurement;
import teetime.framework.OutputPort;
public class MeasurementForecastDecorationStage extends AbstractBiCombinerStage<Double, Measurement> {
private final OutputPort<ForecastedMeassurement> outputPort = createOutputPort();
@Override
protected void combine(final Double prediction, final Measurement measurement) {
final ForecastedMeassurement forecastedMeassurement = new ForecastedMeassurement(measurement, prediction);
this.outputPort.send(forecastedMeassurement);
}
public final OutputPort<ForecastedMeassurement> getOutputPort() {
return outputPort;
}
}
package anomalydetection;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import anomalydetection.measurement.Measurement;
public class MeasurementGenerator {
private Function<Double, Double> function;
private double noise;
private double anomalyProbability;
private double anomayStrength;
private Instant startTime;
private Duration minStepDistance;
private Duration maxStepDistance;
private Instant time;
private int index = 0;
public MeasurementGenerator() {
this.function = x -> 0.0;
this.noise = 0;
this.anomalyProbability = 0;
this.anomayStrength = 0;
this.startTime = Instant.now();
this.minStepDistance = Duration.ofSeconds(1);
this.maxStepDistance = Duration.ofSeconds(1);
this.time = this.startTime;
}
public MeasurementGenerator(final Function<Double, Double> function, final double noise, final double anomalyProbability, final double anomayStrength,
final Instant startTime,
final Duration minStepDistance, final Duration maxStepDistance) {
this.function = function;
this.noise = noise;
this.anomalyProbability = anomalyProbability;
this.anomayStrength = anomayStrength;
this.startTime = startTime;
this.minStepDistance = minStepDistance;
this.maxStepDistance = maxStepDistance;
this.time = this.startTime;
}
public Function<Double, Double> getFunction() {
return function;
}
public void setFunction(final Function<Double, Double> function) {
this.function = function;
}
public double getNoise() {
return noise;
}
public void setNoise(final double noise) {
this.noise = noise;
}
public double getAnomalyProbability() {
return anomalyProbability;
}
public void setAnomalyProbability(final double anomalyProbability) {
this.anomalyProbability = anomalyProbability;
}
public double getAnomayStrength() {
return anomayStrength;
}
public void setAnomayStrength(final double anomayStrength) {
this.anomayStrength = anomayStrength;
}
public Instant getStartTime() {
return startTime;
}
public void setStartTime(final Instant startTime) {
this.startTime = startTime;
}
public void setStepDistance(final Duration stepDistance) {
this.minStepDistance = stepDistance;
this.maxStepDistance = stepDistance;
}
public Duration getMinStepDistance() {
return minStepDistance;
}
public void setMinStepDistance(final Duration minStepDistance) {
this.minStepDistance = minStepDistance;
}
public Duration getMaxStepDistance() {
return maxStepDistance;
}
public void setMaxStepDistance(final Duration maxStepDistance) {
this.maxStepDistance = maxStepDistance;
}
public void reset() {
this.time = startTime;
this.index = 0;
}
public Measurement getNext() {
final double value = Math.max(0, calcFunctionValue() + calcNoise() + calcAnomaly());
final Measurement measurement = new Measurement(this.time, value);
this.index++;
this.time = this.time.plus(calcStepDistance());
return measurement;
}
private double calcFunctionValue() {
return this.function.apply((double) index);
}
private double calcNoise() {
// BETTER Should be the same
// return ((Math.random() * 2) - 1) * this.noise;
return (Math.random() * 2 * this.noise) - this.noise;
}
private double calcAnomaly() {
if (Math.random() < this.anomalyProbability) {
return this.anomayStrength * (Math.random() < 0.5 ? 1 : -1);
} else {
return 0;
}
}
private Duration calcStepDistance() {
final long minMillis = this.minStepDistance.toMillis();
final long maxMillis = this.maxStepDistance.toMillis();
final long randomMillis = minMillis + (long) (Math.random() * ((maxMillis - minMillis) + 1));
return Duration.ofMillis(randomMillis);
}
}
package anomalydetection;
import anomalydetection.measurement.Measurement;
public class MeasurementGeneratorStage extends AbstractMeassurementsGeneratorStage {
public MeasurementGeneratorStage(final MeasurementGenerator generator, final long generations) {
super(generator, generations);
}
@Override
protected void sendMeasurement() {
final Measurement measurement = this.generator.getNext();
this.outputPort.send(measurement);
}
}
package anomalydetection;
import java.time.Duration;
import anomalydetection.aggregation.Aggregator;
import anomalydetection.timeseries.EquidistantTimeSeries;
import anomalydetection.timeseries.TimeSeries;
import teetime.stage.basic.AbstractTransformation;
public class NormalizerStage extends AbstractTransformation<TimeSeries, EquidistantTimeSeries> {
private final TimeSeriesNormalizer normalizer;
public NormalizerStage(final Duration stepSize, final Aggregator aggregator) {
this.normalizer = new TimeSeriesNormalizer(stepSize, aggregator);
}
@Override
protected void execute(final TimeSeries timeSeries) {
EquidistantTimeSeries normalizedTimeSeries = this.normalizer.normalize(timeSeries);
this.outputPort.send(normalizedTimeSeries);
}
}
package anomalydetection;
import java.io.PrintStream;
import java.util.Formatter;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import teetime.framework.AbstractConsumerStage;
public class PrinterStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final PrintStream stream;
public PrinterStage() {
this.stream = System.out;
}
public PrinterStage(final PrintStream stream) {
this.stream = stream;
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
Formatter formatter = new Formatter();
formatter.format("T: %s | M: %6f | P: %6f | S: %6f", measurement.getTime(), measurement.getValue(), measurement.getPrediction(),
measurement.getAnomalyScore());
this.stream.println(formatter.toString());
formatter.close();
}
}
package anomalydetection;
import java.time.Duration;
import java.time.Instant;
import anomalydetection.measurement.Measurement;
public class RealTimeMeasurementGeneratorStage extends AbstractMeassurementsGeneratorStage {
private Instant lastTime = null;
public RealTimeMeasurementGeneratorStage(final MeasurementGenerator generator, final long generations) {
super(generator, generations);
}
@Override
protected void sendMeasurement() {
final Measurement measurement = this.generator.getNext();
if (lastTime != null) {
this.sleep(Duration.between(lastTime, measurement.getTime()));
}
this.outputPort.send(measurement);
this.lastTime = measurement.getTime();
}
private void sleep(final Duration delay) {
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
this.terminateStage();
}
}
}
package anomalydetection;
import java.time.Duration;
import java.time.Instant;
import anomalydetection.aggregation.Aggregator;
import anomalydetection.forecast.Forecaster;
import anomalydetection.storage.StorageDriver;
import anomalydetection.timeseries.BoundedTimeSeries;
public class StorableAnomalyDetectionStage extends AnomalyDetectionStage {
public StorableAnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster, final StorageDriver storageDriver) {
super(new BoundedTimeSeries(slidingWindowDuration, storageDriver.retrieveTimeSeries(Instant.now().minus(slidingWindowDuration), Instant.now())),
normalizationDuration, aggregator, forecaster);
final StorageStage storageStage = new StorageStage(storageDriver);
super.connectPorts(super.getNewOutputPort(), storageStage.getInputPort());