Commit edb4c297 authored by Maximilian	Brütt's avatar Maximilian Brütt

added boundary for ws victim choice

parent f1097240
......@@ -34,15 +34,19 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
// statistics
private int numWaits;
private int numSteals;
// TODO strategy
// private final int WORK_STEALING_MAX_DEPTH = 1;
// TODO strategy implementation to slim the paramters
private final int haulSize;
private final int instanceIndex;
private final int indexTheftBound;
protected WorkStealingPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int haulSize) {
protected WorkStealingPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int haulSize, final int instanceIndex,
final int indexTheftBound) {
super(sourcePort, targetPort, 1024);
this.peers = new ArrayList<WorkStealingPipe<T>>();
this.queue = new ObservableSpMcArrayQueue<Object>(1024);
this.haulSize = haulSize;
this.instanceIndex = instanceIndex;
this.indexTheftBound = indexTheftBound;
}
@Override
......@@ -50,9 +54,15 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
Object obj = this.queue.poll();
if (obj != null) {
return obj;
} else { // when the pipe's queue is empty...
// choose victim
WorkStealingPipe<T> victim = peers.get((new Random()).nextInt(peers.size()));
} else {
int index;
// indexTheftBound of 0 equals unbounded and thus randomized stealing behaviour.
if (indexTheftBound == 0) {
index = (new Random()).nextInt(peers.size());
} else {
index = (instanceIndex + (new Random()).nextInt(indexTheftBound)) % peers.size();
}
WorkStealingPipe<T> victim = peers.get(index);
// int maxSteal = Math.min(victim.size(), haulSize);
for (int i = 0; i < haulSize; i++) {
......@@ -75,7 +85,6 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
}
}
return this.queue.poll();
// return null;
}
}
......
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.ArrayList;
......@@ -12,9 +27,24 @@ import teetime.framework.OutputPort;
public class WorkStealingPipeFactory<T> {
private final List<WorkStealingPipe<T>> instances = new ArrayList<WorkStealingPipe<T>>();
private int numOfInstances = 0;
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int haulSize) {
WorkStealingPipe<T> newPipe = new WorkStealingPipe<T>(source, target, haulSize);
/**
* Create a new {@link WorkStealingPipe} and inform it about its peers previously created by this factory.
* Pipes created afterwards will also be informed about its existence.
*
* @param source
* {@link OutputPort} the pipe takes its input from
* @param target
* {@link InputPort} the pipe takes its input to
* @param haulSize
* Number of objects the pipe is able to steal in one operation. Defaults to 1.
* @param theftBound
* The number of peers the pipe should be able to steal from. Enter 0 for default (randomized) behaviour.
* @return The pipe, ready to use.
*/
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound, final int haulSize) {
WorkStealingPipe<T> newPipe = new WorkStealingPipe<T>(source, target, haulSize, numOfInstances++, theftBound);
for (WorkStealingPipe<T> pipe : instances) {
pipe.addPeer(newPipe);
......@@ -24,8 +54,12 @@ public class WorkStealingPipeFactory<T> {
return newPipe;
}
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound) {
return create(source, target, theftBound, 1);
}
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target) {
return create(source, target, 1);
return create(source, target, 0, 1);
}
}
......@@ -92,7 +92,7 @@ public class WordCounterWorkStealingConfiguration extends Configuration {
final WordCounter threadableStage = wc;
if (WS) {
registerCustomPipe(factory.create(distributor.getNewOutputPort(), threadableStage.getInputPort()));
registerCustomPipe(factory.create(distributor.getNewOutputPort(), threadableStage.getInputPort(), 3));
} else {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1024);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment