Commit a8a40570 authored by Sören Henning's avatar Sören Henning

reorganize code in/between stage and configuration

parent 1337601d
package anomalydetection;
import java.io.File;
import java.time.Duration;
import anomalydetection.aggregation.MeanAggregator;
import anomalydetection.forecast.ARIMAForecaster;
import teetime.framework.Configuration;
public class AnomalyDetectionConfiguration extends Configuration {
......@@ -9,14 +12,23 @@ public class AnomalyDetectionConfiguration extends Configuration {
public AnomalyDetectionConfiguration() {
// Create the stages
final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 240) + 2000, 250, 0.01, 1000, Duration.ofSeconds(1),
3600);
final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage();
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 240) + 2000, 250, 0.01, 1000, Duration.ofSeconds(1),
// 3600);
final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 60) + 2000, 500, 0.01, 1000, Duration.ofSeconds(1),
100);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 0.5 * x + 1000, 250, 0.01, 1000, Duration.ofSeconds(1),
// 900);
final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(30), new MeanAggregator(),
new ARIMAForecaster("192.168.99.100", 6311));
final SimpleAlertStage alerter = new SimpleAlertStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExporter jsonExporter = new JSONExporter(new File("values.json")); // TODO Temp
// Connect the stages
super.connectPorts(generator.getOutputPort(), anomalyDetector.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(0.9), alerter.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), printer.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), jsonExporter.getInputPort());
}
......
package anomalydetection;
import java.io.File;
import java.time.Duration;
import anomalydetection.aggregation.MeanAggregator;
import anomalydetection.forecast.RegressionForecaster;
import anomalydetection.aggregation.Aggregator;
import anomalydetection.forecast.Forecaster;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.measurement.Measurement;
import anomalydetection.timeseries.BoundedTimeSeries;
......@@ -20,17 +19,16 @@ public class AnomalyDetectionStage extends CompositeStage {
final Distributor<AnomalyScoredMeasurement> anomalyScoreDistributor = new Distributor<>(new CopyByReferenceStrategy());
public AnomalyDetectionStage() {
public AnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster1) {
// Create the stages
final Distributor<Measurement> measurementDistributor = new Distributor<>(new CopyByReferenceStrategy());
final ExtractorStage extractor = new ExtractorStage(new BoundedTimeSeries(Duration.ofHours(1)));
final NormalizerStage normalizerStage = new NormalizerStage(Duration.ofSeconds(5), new MeanAggregator());
final ForecastStage forecaster = new ForecastStage(new RegressionForecaster());
final ExtractorStage extractor = new ExtractorStage(new BoundedTimeSeries(slidingWindowDuration));
final NormalizerStage normalizerStage = new NormalizerStage(normalizationDuration, aggregator);
final ForecastStage forecaster = new ForecastStage(forecaster1);
final MeasurementForecastDecorationStage measurementForecastDecorator = new MeasurementForecastDecorationStage();
final AnomalyScoreCalculatorStage anomalyScoreCalculator = new AnomalyScoreCalculatorStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExporter jsonExporter = new JSONExporter(new File("values.json")); // TODO Temp
final StorageStage storager = new StorageStage();
this.inputPort = measurementDistributor.getInputPort();
......@@ -43,12 +41,14 @@ public class AnomalyDetectionStage extends CompositeStage {
super.connectPorts(measurementDistributor.getNewOutputPort(), measurementForecastDecorator.getInputPort2());
super.connectPorts(measurementForecastDecorator.getOutputPort(), anomalyScoreCalculator.getInputPort());
super.connectPorts(anomalyScoreCalculator.getOutputPort(), anomalyScoreDistributor.getInputPort());
super.connectPorts(anomalyScoreDistributor.getNewOutputPort(), printer.getInputPort());
super.connectPorts(anomalyScoreDistributor.getNewOutputPort(), jsonExporter.getInputPort());
super.connectPorts(anomalyScoreDistributor.getNewOutputPort(), storager.getInputPort());
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort() {
return this.anomalyScoreDistributor.getNewOutputPort();
}
public OutputPort<AnomalyScoredMeasurement> getNewOutputPort(final double threshold) {
return getNewOutputPort(new ThresholdFilter(threshold));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment