Commit 652eb69a authored by Florian Fittkau's avatar Florian Fittkau

runtime for each host

parent 14a8964f
......@@ -9,6 +9,7 @@ import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEvent
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace;
public class RecordCountingFilter extends AbstractFilter {
......@@ -36,8 +37,11 @@ public class RecordCountingFilter extends AbstractFilter {
for (final AbstractEventRecord event : trace.getTraceEvents()) {
if (event instanceof AbstractBeforeOperationEventRecord) {
final AbstractBeforeOperationEventRecord abstractBeforeEventRecord = (AbstractBeforeOperationEventRecord) event;
counter.inputObjectsCount(abstractBeforeEventRecord
.getRuntimeStatisticInformation().getCount());
for (final RuntimeStatisticInformation runtime : abstractBeforeEventRecord
.getRuntimeStatisticInformationList()) {
counter.inputObjectsCount(runtime.getCount());
}
}
}
if (receiver != null) {
......
......@@ -9,6 +9,7 @@ import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEvent
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace;
public class TraceCountingFilter extends AbstractFilter {
......@@ -35,8 +36,10 @@ public class TraceCountingFilter extends AbstractFilter {
final AbstractEventRecord abstractEventRecord = trace.getTraceEvents().get(0);
if (abstractEventRecord instanceof AbstractBeforeOperationEventRecord) {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractEventRecord;
counter.inputObjectsCount(abstractBeforeOperationEventRecord
.getRuntimeStatisticInformation().getCount());
for (final RuntimeStatisticInformation runtime : abstractBeforeOperationEventRecord
.getRuntimeStatisticInformationList()) {
counter.inputObjectsCount(runtime.getCount());
}
}
}
deliver(record);
......
......@@ -33,9 +33,11 @@ class TraceReconstructionBuffer {
if ((event instanceof AbstractBeforeEventRecord)) {
openEvents++;
final AbstractBeforeEventRecord beforeEvent = (AbstractBeforeEventRecord) event;
if (beforeEvent.getRuntimeStatisticInformation() == null) {
beforeEvent.setRuntimeStatisticInformation(new RuntimeStatisticInformation(1, -1,
-1));
if (beforeEvent.getRuntimeStatisticInformationList() == null) {
final ArrayList<RuntimeStatisticInformation> runtime = new ArrayList<RuntimeStatisticInformation>(
1);
runtime.add(new RuntimeStatisticInformation(1, -1, -1));
beforeEvent.setRuntimeStatisticInformationList(runtime);
}
} else if ((event instanceof AbstractAfterFailedEventRecord)
|| (event instanceof AbstractAfterEventRecord)) {
......@@ -77,7 +79,7 @@ class TraceReconstructionBuffer {
return ((openEvents != 0) || events.isEmpty() || ((maxOrderIndex + 1) != events.size()));
}
public final Trace toTrace() {
public final Trace toTrace(final boolean valid) {
final Stack<AbstractBeforeEventRecord> stack = new Stack<AbstractBeforeEventRecord>();
boolean containsRemoteRecord = false;
......@@ -90,27 +92,32 @@ class TraceReconstructionBuffer {
containsRemoteRecord = true;
}
} else if (event instanceof AbstractAfterEventRecord) {
final AbstractAfterEventRecord abstractAfterEventRecord = (AbstractAfterEventRecord) event;
if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop();
if (beforeEvent.getRuntimeStatisticInformation().getAverage() < 0) {
final long time = abstractAfterEventRecord.getMethodDuration();
beforeEvent.getRuntimeStatisticInformation().set(1, time, time * time);
}
}
initRuntimeIfNeccessary(stack,
((AbstractAfterEventRecord) event).getMethodDuration());
} else if (event instanceof AbstractAfterFailedEventRecord) {
final AbstractAfterFailedEventRecord abstractAfterFailedEventRecord = (AbstractAfterFailedEventRecord) event;
if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop();
if (beforeEvent.getRuntimeStatisticInformation().getAverage() < 0) {
final long time = abstractAfterFailedEventRecord.getMethodDuration();
beforeEvent.getRuntimeStatisticInformation().set(1, time, time * time);
}
initRuntimeIfNeccessary(stack,
((AbstractAfterFailedEventRecord) event).getMethodDuration());
}
}
return new Trace(new ArrayList<AbstractEventRecord>(events), valid, containsRemoteRecord);
}
private void initRuntimeIfNeccessary(final Stack<AbstractBeforeEventRecord> stack,
final long methodDuration) {
if (!stack.isEmpty()) {
final AbstractBeforeEventRecord beforeEvent = stack.pop();
final List<RuntimeStatisticInformation> runtimeStatisticInformationList = beforeEvent
.getRuntimeStatisticInformationList();
if (runtimeStatisticInformationList.size() == 1) { // only on first
// reconstruction
final RuntimeStatisticInformation runtimeStatisticInformation = runtimeStatisticInformationList
.get(0);
if (runtimeStatisticInformation.getAverage() < 0) {
runtimeStatisticInformation.set(1, methodDuration, methodDuration
* methodDuration);
}
}
}
return new Trace(new ArrayList<AbstractEventRecord>(events), containsRemoteRecord);
}
}
package explorviz.live_trace_processing.filter.reconstruction;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
......@@ -13,11 +14,14 @@ import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter implements Runnable {
private final long maxTraceTimeout;
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new TreeMap<Long, TraceReconstructionBuffer>();
private final Map<AbstractEventRecord, TraceReconstructionBuffer> traceIdAndHost2trace = new TreeMap<AbstractEventRecord, TraceReconstructionBuffer>(
new TraceIdAndHostComperator());
private final PipesMerger<IRecord> traceReconstructionMerger;
......@@ -36,16 +40,15 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
@Override
public final void processRecord(final IRecord record) {
if (record instanceof AbstractEventRecord) {
final AbstractEventRecord abstractOperationEvent = ((AbstractEventRecord) record);
final AbstractEventRecord abstractEvent = ((AbstractEventRecord) record);
final long traceId = abstractOperationEvent.getTraceId();
final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(abstractOperationEvent
.getTraceId());
traceBuffer.insertEvent(abstractOperationEvent);
final TraceReconstructionBuffer traceBuffer = getBufferForRecord(abstractEvent);
traceBuffer.insertEvent(abstractEvent);
if (traceBuffer.isFinished()) {
deliver(traceBuffer.toTrace());
traceId2trace.remove(traceId);
final Trace trace = traceBuffer.toTrace(true);
traceIdAndHost2trace.remove(trace.getTraceEvents().get(0));
deliver(traceBuffer.toTrace(true));
}
} else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp());
......@@ -58,11 +61,11 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
}
}
private final TraceReconstructionBuffer getBufferForTraceId(final long traceId) {
TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId);
private final TraceReconstructionBuffer getBufferForRecord(final AbstractEventRecord record) {
TraceReconstructionBuffer traceBuffer = traceIdAndHost2trace.get(record);
if (traceBuffer == null) {
traceBuffer = new TraceReconstructionBuffer();
traceId2trace.put(traceId, traceBuffer);
traceIdAndHost2trace.put(record, traceBuffer);
}
return traceBuffer;
}
......@@ -70,11 +73,11 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
private void checkForTimeouts(final long timestamp) {
final long traceTimeout = timestamp - maxTraceTimeout;
final Iterator<Entry<Long, TraceReconstructionBuffer>> iterator = traceId2trace.entrySet()
.iterator();
final Iterator<Entry<AbstractEventRecord, TraceReconstructionBuffer>> iterator = traceIdAndHost2trace
.entrySet().iterator();
while (iterator.hasNext()) {
final Entry<Long, TraceReconstructionBuffer> entry = iterator.next();
final Entry<AbstractEventRecord, TraceReconstructionBuffer> entry = iterator.next();
final TraceReconstructionBuffer traceBuffer = entry.getValue();
if (traceBuffer.isUpdatedInThisPeriod()) {
traceBuffer.setUpdatedInThisPeriod(false);
......@@ -82,7 +85,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
traceBuffer.updateLastBufferInsert();
if ((timestamp - traceBuffer.getLastBufferInsert()) <= traceTimeout) {
deliver(traceBuffer.toTrace());
deliver(traceBuffer.toTrace(false));
iterator.remove();
}
}
......@@ -90,9 +93,42 @@ public final class TraceReconstructionFilter extends AbstractFilter implements R
}
private void terminate() {
for (final TraceReconstructionBuffer entry : traceId2trace.values()) {
deliver(entry.toTrace());
for (final TraceReconstructionBuffer entry : traceIdAndHost2trace.values()) {
deliver(entry.toTrace(false));
}
traceId2trace.clear();
traceIdAndHost2trace.clear();
}
}
\ No newline at end of file
}
class TraceIdAndHostComperator implements Comparator<AbstractEventRecord> {
@Override
public int compare(final AbstractEventRecord o1, final AbstractEventRecord o2) {
final long cmpTraceId = o1.getTraceId() - o2.getTraceId();
if (cmpTraceId != 0) {
return (int) cmpTraceId;
}
final int cmpHostLength = o1.getHostApplicationMetadataList().size()
- o2.getHostApplicationMetadataList().size();
if (cmpHostLength != 0) {
return cmpHostLength;
}
for (final HostApplicationMetaDataRecord hostMeta1 : o1.getHostApplicationMetadataList()) {
boolean foundMatch = false;
for (final HostApplicationMetaDataRecord hostMeta2 : o2
.getHostApplicationMetadataList()) {
if (hostMeta1.equals(hostMeta2)) {
foundMatch = true;
break;
}
}
if (!foundMatch) {
return -1;
}
}
return 0;
}
}
package explorviz.live_trace_processing.filter.reduction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import explorviz.live_trace_processing.record.event.AbstractBeforeEventRecord;
import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEventRecord;
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace;
class TracesSummarizationBuffer {
......@@ -34,26 +39,89 @@ class TracesSummarizationBuffer {
final AbstractEventRecord event = aggregatedRecords.get(i);
final AbstractEventRecord event2 = records.get(i);
event.getHostApplicationMetadataList().addAll(
event2.getHostApplicationMetadataList());
if (event instanceof AbstractBeforeEventRecord) {
final AbstractBeforeEventRecord abstractBeforeEventRecord = (AbstractBeforeEventRecord) event;
final AbstractBeforeEventRecord abstractBeforeEventRecord2 = (AbstractBeforeEventRecord) event2;
if (abstractBeforeEventRecord2 instanceof AbstractBeforeOperationEventRecord) {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord2;
abstractBeforeEventRecord
.getRuntimeStatisticInformation()
.merge(abstractBeforeOperationEventRecord
.getRuntimeStatisticInformation(),
abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().merge(
abstractBeforeEventRecord2.getRuntimeStatisticInformation(), 0);
final List<HostApplicationMetaDataRecord> hostMetaList1 = event
.getHostApplicationMetadataList();
final List<HostApplicationMetaDataRecord> hostMetaList2 = event2
.getHostApplicationMetadataList();
final List<RuntimeStatisticInformation> runtimeList1 = abstractBeforeEventRecord
.getRuntimeStatisticInformationList();
final List<RuntimeStatisticInformation> runtimeList2 = abstractBeforeEventRecord2
.getRuntimeStatisticInformationList();
final HashMap<HostApplicationMetaDataRecord, RuntimeStatisticInformation> toAdd = new HashMap<HostApplicationMetaDataRecord, RuntimeStatisticInformation>();
for (int j = 0; j < hostMetaList2.size(); j++) {
final HostApplicationMetaDataRecord host2 = hostMetaList2.get(j);
final int indexInHostList1 = indexOfHost(host2, hostMetaList1);
if (indexInHostList1 >= 0) {
// found, so merge the entries
final RuntimeStatisticInformation runtime1 = runtimeList1
.get(indexInHostList1);
final RuntimeStatisticInformation runtime2 = runtimeList2.get(j);
if ((runtimeList1.size() == 1) && (runtimeList2.size() == 1)) {
runtime1.merge(runtime2,
getRightObjectId(abstractBeforeEventRecord2));
} else {
runtime1.merge(runtime2);
}
} else {
// not found, so insert and merge later
toAdd.put(host2, runtimeList2.get(j));
System.out.println("not found...");
}
}
for (final Entry<HostApplicationMetaDataRecord, RuntimeStatisticInformation> entry : toAdd
.entrySet()) {
hostMetaList1.add(entry.getKey());
runtimeList1.add(entry.getValue());
}
} else {
final List<HostApplicationMetaDataRecord> hostMetaList1 = event
.getHostApplicationMetadataList();
final List<HostApplicationMetaDataRecord> hostMetaList2 = event2
.getHostApplicationMetadataList();
final List<HostApplicationMetaDataRecord> toAdd = new ArrayList<HostApplicationMetaDataRecord>(
2);
for (int j = 0; j < hostMetaList2.size(); j++) {
final int indexInHostList1 = indexOfHost(hostMetaList2.get(j),
hostMetaList1);
if (indexInHostList1 == -1) {
// not found, so insert later
toAdd.add(hostMetaList2.get(j));
}
}
hostMetaList1.addAll(toAdd);
}
}
}
}
private int indexOfHost(final HostApplicationMetaDataRecord hostToSeek,
final List<HostApplicationMetaDataRecord> hostMetaList) {
for (int i = 0; i < hostMetaList.size(); i++) {
if (hostMetaList.get(i).equals(hostToSeek)) {
return i;
}
}
return -1;
}
private int getRightObjectId(final AbstractBeforeEventRecord abstractBeforeEventRecord2) {
int objectId = 0;
if (abstractBeforeEventRecord2 instanceof AbstractBeforeOperationEventRecord) {
objectId = ((AbstractBeforeOperationEventRecord) abstractBeforeEventRecord2)
.getObjectId();
}
return objectId;
}
}
\ No newline at end of file
......@@ -16,6 +16,7 @@ import explorviz.live_trace_processing.record.event.AbstractBeforeOperationEvent
import explorviz.live_trace_processing.record.event.AbstractEventRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.record.trace.TraceComperator;
......@@ -45,10 +46,10 @@ public class TracesSummarizationFilter extends AbstractFilter {
if (record instanceof Trace) {
final Trace trace = (Trace) record;
if (!trace.containsRemoteRecord()) {
if (!trace.containsRemoteRecord() && trace.isValid()) {
insertIntoBuffer(trace);
} else {
// trace with remote records cant be reduced
// trace with remote records or invalid trace cant be reduced
makeTraceElementsAccumulator(trace);
deliver(trace);
}
......@@ -82,14 +83,17 @@ public class TracesSummarizationFilter extends AbstractFilter {
if (event instanceof AbstractBeforeEventRecord) {
final AbstractBeforeEventRecord abstractBeforeEventRecord = (AbstractBeforeEventRecord) event;
int objectId = 0;
if (abstractBeforeEventRecord instanceof AbstractBeforeOperationEventRecord) {
final AbstractBeforeOperationEventRecord abstractBeforeOperationEventRecord = (AbstractBeforeOperationEventRecord) abstractBeforeEventRecord;
objectId = abstractBeforeOperationEventRecord.getObjectId();
}
abstractBeforeOperationEventRecord.getRuntimeStatisticInformation()
.makeAccumulator(abstractBeforeOperationEventRecord.getObjectId());
} else {
abstractBeforeEventRecord.getRuntimeStatisticInformation().makeAccumulator(0);
final List<RuntimeStatisticInformation> runtimeStatisticInformationList = abstractBeforeEventRecord
.getRuntimeStatisticInformationList();
if (runtimeStatisticInformationList.size() == 1) {
runtimeStatisticInformationList.get(0).makeAccumulator(objectId);
}
}
}
......
......@@ -284,7 +284,9 @@ class TCPReaderOneClient extends Thread {
if (buffer.remaining() >= recordSize) {
try {
putInQueue(new BeforeOperationEventRecord(buffer, stringRegistry));
final BeforeOperationEventRecord beforeOperationEventRecord = new BeforeOperationEventRecord(
buffer, stringRegistry);
putInQueue(beforeOperationEventRecord);
} catch (final IdNotAvailableException e) {
// should not happen
e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment