Commit bda33879 authored by Christian Wulf's avatar Christian Wulf

added TraceReconstructionAnalysis

parent efc2fea4
......@@ -3,6 +3,6 @@
java.util.logging.ConsoleHandler.level = INFO
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %2$s %4$s: %5$s %n
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
\ No newline at end of file
package teetime.variant.methodcallWithPorts.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
public class Cache<T> extends ConsumerStage<T, T> {
private final List<T> cachedObjects = new LinkedList<T>();
@Override
protected void execute5(final T element) {
this.cachedObjects.add(element);
}
@Override
public void onIsPipelineHead() {
this.logger.debug("Emitting cached elements...");
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
}
super.onIsPipelineHead();
}
}
......@@ -25,7 +25,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*
* @since 1.10
*/
public class CollectorSink<T> extends ConsumerStage<T, Object> {
public class CollectorSink<T> extends ConsumerStage<T, Void> {
private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead
......
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
public class CountingFilter<T> extends ConsumerStage<T, T> {
private int numElementsPassed;
@Override
protected void execute5(final T element) {
this.numElementsPassed++;
this.send(element);
}
public int getNumElementsPassed() {
return this.numElementsPassed;
}
}
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/**
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf
*
* @since 1.10
*/
public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
private Class<O> type;
/**
* @since 1.10
*/
public InstanceOfFilter(final Class<O> type) {
this.type = type;
}
@Override
protected void execute5(final I element) {
if (this.type.isInstance(element)) {
this.send(this.type.cast(element));
} else { // swallow up the element
if (this.logger.isDebugEnabled()) {
this.logger.debug("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
}
}
}
public Class<O> getType() {
return this.type;
}
public void setType(final Class<O> type) {
this.type = type;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceEventRecords> {
private TimeUnit timeunit;
private long maxTraceDuration = Long.MAX_VALUE;
private long maxTraceTimeout = Long.MAX_VALUE;
private boolean timeout;
private long maxEncounteredLoggingTimestamp = -1;
private static final Map<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
@Override
protected void execute5(final IFlowRecord element) {
final Long traceId = this.reconstructTrace(element);
if (traceId != null) {
this.putIfFinished(traceId);
this.processTimestamp(element);
}
}
private void processTimestamp(final IFlowRecord record) {
if (this.timeout) {
synchronized (this) {
final long loggingTimestamp = this.getTimestamp(record);
// can we assume a rough order of logging timestamps? (yes, except with DB reader)
if (loggingTimestamp > this.maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = loggingTimestamp;
}
this.processTimeoutQueue(this.maxEncounteredLoggingTimestamp);
}
}
}
private long getTimestamp(final IFlowRecord record) {
if (record instanceof AbstractTraceEvent) {
return ((AbstractTraceEvent) record).getTimestamp();
}
return -1;
}
private void putIfFinished(final Long traceId) {
final TraceBuffer traceBuffer = TraceReconstructionFilter.traceId2trace.get(traceId);
if (traceBuffer.isFinished()) {
synchronized (this) { // has to be synchronized because of timeout cleanup
TraceReconstructionFilter.traceId2trace.remove(traceId);
}
this.put(traceBuffer);
}
}
private Long reconstructTrace(final IFlowRecord record) {
Long traceId = null;
if (record instanceof TraceMetadata) {
traceId = ((TraceMetadata) record).getTraceId();
final TraceBuffer traceBuffer = TraceReconstructionFilter.traceId2trace.get(traceId);
traceBuffer.setTrace((TraceMetadata) record);
} else if (record instanceof AbstractTraceEvent) {
traceId = ((AbstractTraceEvent) record).getTraceId();
final TraceBuffer traceBuffer = TraceReconstructionFilter.traceId2trace.get(traceId);
traceBuffer.insertEvent((AbstractTraceEvent) record);
}
return traceId;
}
@Override
public void onStart() {
this.timeout = !((this.maxTraceTimeout == Long.MAX_VALUE) && (this.maxTraceDuration == Long.MAX_VALUE));
super.onStart();
}
@Override
public void onIsPipelineHead() {
this.logger.info("traceId2trace: " + TraceReconstructionFilter.traceId2trace.keySet());
Iterator<TraceBuffer> iterator = TraceReconstructionFilter.traceId2trace.values().iterator();
while (iterator.hasNext()) {
TraceBuffer traceBuffer = iterator.next();
this.put(traceBuffer);
iterator.remove();
}
super.onIsPipelineHead();
}
private void processTimeoutQueue(final long timestamp) {
final long duration = timestamp - this.maxTraceDuration;
final long traceTimeout = timestamp - this.maxTraceTimeout;
for (final Iterator<Entry<Long, TraceBuffer>> iterator = TraceReconstructionFilter.traceId2trace.entrySet().iterator(); iterator.hasNext();) {
final TraceBuffer traceBuffer = iterator.next().getValue();
if ((traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) // long time no see
|| (traceBuffer.getMinLoggingTimestamp() <= duration)) { // max duration is gone
this.put(traceBuffer);
iterator.remove();
}
}
}
private void put(final TraceBuffer traceBuffer) {
// final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort =
// (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort;
// context.put(outputPort, traceBuffer.toTraceEvents());
this.send(traceBuffer.toTraceEvents());
}
public TimeUnit getTimeunit() {
return this.timeunit;
}
public void setTimeunit(final TimeUnit timeunit) {
this.timeunit = timeunit;
}
public long getMaxTraceDuration() {
return this.maxTraceDuration;
}
public void setMaxTraceDuration(final long maxTraceDuration) {
this.maxTraceDuration = maxTraceDuration;
}
public long getMaxTraceTimeout() {
return this.maxTraceTimeout;
}
public void setMaxTraceTimeout(final long maxTraceTimeout) {
this.maxTraceTimeout = maxTraceTimeout;
}
public long getMaxEncounteredLoggingTimestamp() {
return this.maxEncounteredLoggingTimestamp;
}
public void setMaxEncounteredLoggingTimestamp(final long maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
}
// public Map<Long, TraceBuffer> getTraceId2trace() {
// return TraceReconstructionFilter.traceId2trace;
// }
//
// public void setTraceId2trace(final Map<Long, TraceBuffer> traceId2trace) {
// TraceReconstructionFilter.traceId2trace = traceId2trace;
// }
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.stringBuffer;
import java.util.Collection;
import java.util.LinkedList;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.stage.stringBuffer.handler.AbstractDataTypeHandler;
import teetime.variant.methodcallWithPorts.stage.stringBuffer.util.KiekerHashMap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class StringBufferFilter<T> extends ConsumerStage<T, T> {
// BETTER use a non shared data structure to avoid synchronization between threads
private KiekerHashMap kiekerHashMap = new KiekerHashMap();
private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>();
@Override
protected void execute5(final T element) {
final T returnedElement = this.handle(element);
this.send(returnedElement);
}
@Override
public void onStart() {
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
handler.setLogger(this.logger);
handler.setStringRepository(this.kiekerHashMap);
}
super.onStart();
}
private T handle(final T object) {
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
if (handler.canHandle(object)) {
@SuppressWarnings("unchecked")
final T returnedObject = ((AbstractDataTypeHandler<T>) handler).handle(object);
return returnedObject;
}
}
return object; // else relay given object
}
public KiekerHashMap getKiekerHashMap() {
return this.kiekerHashMap;
}
public void setKiekerHashMap(final KiekerHashMap kiekerHashMap) {
this.kiekerHashMap = kiekerHashMap;
}
public Collection<AbstractDataTypeHandler<?>> getDataTypeHandlers() {
return this.dataTypeHandlers;
}
public void setDataTypeHandlers(final Collection<AbstractDataTypeHandler<?>> dataTypeHandlers) {
this.dataTypeHandlers = dataTypeHandlers;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.stringBuffer.handler;
import teetime.variant.methodcallWithPorts.stage.stringBuffer.util.KiekerHashMap;
import kieker.common.logging.Log;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public abstract class AbstractDataTypeHandler<T> {
protected Log logger;
protected KiekerHashMap stringRepository;
/**
* @since 1.10
*/
public abstract boolean canHandle(Object object);
/**
* @since 1.10
*/
public abstract T handle(T object);
/**
* @since 1.10
*/
public void setLogger(final Log logger) {
this.logger = logger;
}
/**
* @since 1.10
*/
public void setStringRepository(final KiekerHashMap stringRepository) {
this.stringRepository = stringRepository;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.stringBuffer.handler;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class IMonitoringRecordHandler extends AbstractDataTypeHandler<IMonitoringRecord> {
@Override
public boolean canHandle(final Object object) {
return object instanceof IMonitoringRecord;
}
@Override
public IMonitoringRecord handle(final IMonitoringRecord monitoringRecord) {
final Object[] objects = monitoringRecord.toArray();
boolean stringBuffered = false;
for (int i = 0; i < objects.length; i++) {
if (objects[i] instanceof String) {
objects[i] = this.stringRepository.get((String) objects[i]);
stringBuffered = true;
}
}
if (stringBuffered) {
try {
final IMonitoringRecord newRecord = AbstractMonitoringRecord.createFromArray(monitoringRecord.getClass(), objects);
newRecord.setLoggingTimestamp(monitoringRecord.getLoggingTimestamp());
return newRecord;
} catch (final MonitoringRecordException ex) {
this.logger.warn("Failed to recreate buffered monitoring record: " + monitoringRecord.toString(), ex);
}
}
return monitoringRecord;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.stringBuffer.handler;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class StringHandler extends AbstractDataTypeHandler<String> {
@Override
public boolean canHandle(final Object object) {
return object instanceof String;
}
@Override
public String handle(final String object) {
return this.stringRepository.get(object);
}
}
$0=kieker.common.record.flow.trace.TraceMetadata
$1=kieker.common.record.flow.trace.operation.BeforeOperationEvent
$2=kieker.common.record.flow.trace.operation.AfterOperationEvent
This diff is collapsed.
$0=kieker.common.record.misc.KiekerMetadataRecord
$1=kieker.common.record.flow.trace.TraceMetadata
$2=kieker.common.record.flow.trace.operation.object.BeforeOperationObjectEvent
$3=kieker.common.record.flow.trace.operation.constructor.object.BeforeConstructorObjectEvent
$4=kieker.common.record.flow.trace.operation.constructor.object.AfterConstructorObjectEvent