Commit b99a9b87 authored by Sören Henning's avatar Sören Henning

added storage functions

parent 62ee09f2
......@@ -3,17 +3,26 @@ package anomalydetection;
import java.io.File;
import java.time.Duration;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import anomalydetection.aggregation.MeanAggregator;
import anomalydetection.forecast.RegressionForecaster;
import anomalydetection.storage.CassandraDriver;
import teetime.framework.Configuration;
public class AnomalyDetectionConfiguration extends Configuration {
public AnomalyDetectionConfiguration() {
// Create the stages
String ipAddress = "192.168.99.100";
int port = 32770;
String keyspace = "demo3";
MeasurementGenerator gen = new MeasurementGenerator();
Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
Session session = cluster.connect(keyspace);
final MeasurementGenerator gen = new MeasurementGenerator();
gen.setFunction(x -> 500 * Math.sin(x / 60) + 2000);
gen.setNoise(250);
gen.setAnomalyProbability(0.01);
......@@ -21,7 +30,8 @@ public class AnomalyDetectionConfiguration extends Configuration {
gen.setMinStepDistance(Duration.ofMillis(500));
gen.setMinStepDistance(Duration.ofSeconds(2));
final MeasurementGeneratorStage generator = new MeasurementGeneratorStage(gen, 3600);
// Create the stages
final RealTimeMeasurementGeneratorStage generator = new RealTimeMeasurementGeneratorStage(gen, 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 60) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(1),3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.pow(Math.sin(x / 60), 1 / 101) + 2000, 250, 0.01, 1000,
......@@ -30,8 +40,10 @@ public class AnomalyDetectionConfiguration extends Configuration {
// 900);
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(30), new MeanAggregator(), new
// ARIMAForecaster("192.168.99.100", 6311));
final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new RegressionForecaster());
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),new
// RegressionForecaster());
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new RegressionForecaster(), new CassandraDriver(session, "measurements"));
final AlertPrinterStage alerter = new AlertPrinterStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExportStage jsonExporter = new JSONExportStage(new File("values.json")); // TODO Temp
......
......@@ -19,17 +19,16 @@ public class AnomalyDetectionStage extends CompositeStage {
final Distributor<AnomalyScoredMeasurement> anomalyScoreDistributor = new Distributor<>(new CopyByReferenceStrategy());
public AnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
protected AnomalyDetectionStage(final BoundedTimeSeries slidingWindow, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster1) {
// Create the stages
final Distributor<Measurement> measurementDistributor = new Distributor<>(new CopyByReferenceStrategy());
final ExtractorStage extractor = new ExtractorStage(new BoundedTimeSeries(slidingWindowDuration));
final ExtractorStage extractor = new ExtractorStage(slidingWindow);
final NormalizerStage normalizerStage = new NormalizerStage(normalizationDuration, aggregator);
final ForecastStage forecaster = new ForecastStage(forecaster1);
final MeasurementForecastDecorationStage measurementForecastDecorator = new MeasurementForecastDecorationStage();
final AnomalyScoreCalculatorStage anomalyScoreCalculator = new AnomalyScoreCalculatorStage();
final StorageStage storager = new StorageStage();
this.inputPort = measurementDistributor.getInputPort();
......@@ -41,8 +40,11 @@ public class AnomalyDetectionStage extends CompositeStage {
super.connectPorts(measurementDistributor.getNewOutputPort(), measurementForecastDecorator.getInputPort2());
super.connectPorts(measurementForecastDecorator.getOutputPort(), anomalyScoreCalculator.getInputPort());
super.connectPorts(anomalyScoreCalculator.getOutputPort(), anomalyScoreDistributor.getInputPort());
super.connectPorts(anomalyScoreDistributor.getNewOutputPort(), storager.getInputPort());
}
public AnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster) {
this(new BoundedTimeSeries(slidingWindowDuration), normalizationDuration, aggregator, forecaster);
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort() {
......
package anomalydetection;
import java.time.Duration;
import java.time.Instant;
import anomalydetection.aggregation.Aggregator;
import anomalydetection.forecast.Forecaster;
import anomalydetection.storage.StorageDriver;
import anomalydetection.timeseries.BoundedTimeSeries;
public class StorableAnomalyDetectionStage extends AnomalyDetectionStage {
public StorableAnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster, final StorageDriver storageDriver) {
super(new BoundedTimeSeries(slidingWindowDuration, storageDriver.retrieveTimeSeries("abc", Instant.now().minus(slidingWindowDuration), Instant.now())),
normalizationDuration, aggregator, forecaster);
final StorageStage storageStage = new StorageStage(storageDriver);
super.connectPorts(super.getNewOutputPort(), storageStage.getInputPort());
}
}
package anomalydetection;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.storage.StorageDriver;
import teetime.framework.AbstractConsumerStage;
/**
......@@ -11,9 +12,15 @@ import teetime.framework.AbstractConsumerStage;
*/
public class StorageStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final StorageDriver storageDriver;
public StorageStage(final StorageDriver storageDriver) {
this.storageDriver = storageDriver;
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
// Do Nothing
storageDriver.storeMeasurement("temp", measurement); // TODO seriesid
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment