Commit d67edc39 authored by Christian Wulf's avatar Christian Wulf

initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
/target
/big.txt
/timings.txt
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>wordcounter-2.x</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding//src/test/java=UTF-8
encoding//src/test/resources=UTF-8
encoding/<project>=UTF-8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.8
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
File added
set numWorkerThreadsParam=2
set numWarmUpsParam=1
set fileNameParam="./big.txt"
set monitoringEnabledParam=true
java -Dlogback.configurationFile=./src/main/resources/logback.xml -cp target/wordcounter-2.x-0.0.1-SNAPSHOT-jar-with-dependencies.jar teetime.wordcounter.WordCounterTest ^
%numWorkerThreadsParam% %numWarmUpsParam% %fileNameParam% %monitoringEnabledParam%
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.teetime</groupId>
<artifactId>wordcounter-2.x</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<javac.target>1.8</javac.target>
</properties>
<dependencies>
<dependency>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<version>2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<compilerVersion>${javac.target}</compilerVersion>
<source>${javac.target}</source>
<target>${javac.target}</target>
</configuration>
</plugin>
<!-- jar builder -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.wordcounter;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.AbstractPort;
import teetime.framework.Configuration;
import teetime.framework.MonitoringThread;
import teetime.stage.CountingMapMerger;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy;
import teetime.stage.basic.merger.Merger;
import teetime.stage.io.File2SeqOfWords;
import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap;
/**
* A simple configuration, which counts the words of a set of files. The execution of this configuration is demonstrated
* in {@link WordCountingTest}.
*
* This configuration is divided into three parts. The first part reads files and distributes them to different
* {@link WordCounter} instances. The second part are a certain number of WordCounter instances. On construction of this
* class the number of concurrent WordCounter instances is specified with the threads parameter. The third and last part
* collects the results from all WordCounter instances and merges them. This final result can be read afterwards.
*
*
* @author Nelson Tavares de Sousa
*
*/
public class WordCounterConfiguration extends Configuration {
// Last stage is saved as field, to retrieve the result after execution.
private final CountingMapMerger<String> result = new CountingMapMerger<>();
private final List<AbstractPort<?>> distributorPorts = new ArrayList<>();
private final List<AbstractPort<?>> mergerPorts = new ArrayList<>();
private final MonitoringThread monitoringThread;
private final Distributor<String> distributor;
public WordCounterConfiguration(final int threads, final File... input) {
// First part of the config
final InitialElementProducer<File> init = new InitialElementProducer<>(input);
// final File2Lines f2b = new File2Lines();
final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512);
distributor = new Distributor<>(new NonBlockingRoundRobinStrategy());
// last part
final Merger<CountingMap<String>> merger = new Merger<>();
// CountingMapMerger (already as field)
// Connecting the stages of the first part of the config
connectPorts(init.getOutputPort(), f2b.getInputPort());
connectPorts(f2b.getOutputPort(), distributor.getInputPort());
monitoringThread = new MonitoringThread();
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter
// stages
for (int i = 0; i < threads; i++) {
// final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>();
final WordCounter wc = new WordCounter();
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final WordCounter threadableStage = wc;
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1024);
connectPorts(wc.getOutputPort(), merger.getNewInputPort(), 1024);
// Add WordCounter as a threadable stage, so it runs in its own thread
threadableStage.getInputPort().getOwningStage().declareActive();
distributorPorts.add(threadableStage.getInputPort());
mergerPorts.add(wc.getOutputPort());
monitoringThread.addPort(threadableStage.getInputPort());
}
// Connect the stages of the last part
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
init.declareActive();
merger.declareActive();
}
public MonitoringThread getMonitoringThread() {
return monitoringThread;
}
// Further methods are allowed, e.g., to read data from certain stages.
public CountingMap<String> getResult() {
return result.getResult();
}
public List<AbstractPort<?>> getDistributorPorts() {
return distributorPorts;
}
public List<AbstractPort<?>> getMergerPorts() {
return mergerPorts;
}
public Distributor<String> getDistributor() {
return distributor;
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.wordcounter;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.primitives.Longs;
import teetime.framework.AbstractPort;
import teetime.framework.Execution;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy;
import teetime.util.StopWatch;
public class WordCounterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCounterTest.class);
public static void writeTimingsToFile(final File outputFile, final long[] timings) throws UnsupportedEncodingException, FileNotFoundException {
final PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8");
try {
final Joiner joiner = com.google.common.base.Joiner.on(' ');
final String timingsString = joiner.join(Longs.asList(timings));
ps.println(timingsString);
} finally {
ps.close();
}
}
public static void main(final String[] args) throws UnsupportedEncodingException, FileNotFoundException {
String numWorkerThreadsParam = (args.length > 0) ? args[0] : "3";
String numWarmUpsParam = (args.length > 1) ? args[1] : "1";
String fileNameParam = (args.length > 2) ? args[2] : "no default file name";
String monitoringEnabledParam = (args.length > 3) ? args[3] : "true";
int numWorkerThreads = parseAsInteger(numWorkerThreadsParam, 3);
LOGGER.info("# worker threads: " + numWorkerThreads);
int numWarmUps = parseAsInteger(numWarmUpsParam, 1);
LOGGER.info("# warm ups: " + numWarmUps);
final String fileName = fileNameParam;
final File testFile = new File(fileName);
boolean monitoringEnabled = Boolean.valueOf(monitoringEnabledParam);
final long[] timings = new long[1];
final StopWatch stopWatch = new StopWatch();
for (int i = 0; i < numWarmUps; i++) {
LOGGER.info("Warm up #" + i);
final WordCounterConfiguration wcc = new WordCounterConfiguration(numWorkerThreads, testFile);
final Execution<?> analysis = new Execution<>(wcc);
stopWatch.start();
analysis.executeBlocking();
stopWatch.end();
LOGGER.info("duration: " + TimeUnit.NANOSECONDS.toSeconds(stopWatch.getDurationInNs()) + " secs");
}
LOGGER.info("Starting analysis...");
final WordCounterConfiguration wcc = new WordCounterConfiguration(numWorkerThreads, testFile);
final Execution<?> analysis = new Execution<>(wcc);
if (monitoringEnabled) {
wcc.getMonitoringThread().start();
LOGGER.info("Monitoring started.");
}
stopWatch.start();
analysis.executeBlocking();
stopWatch.end();
wcc.getMonitoringThread().terminate();
LOGGER.info("duration: " + TimeUnit.NANOSECONDS.toSeconds(stopWatch.getDurationInNs()) + " secs");
timings[0] = stopWatch.getDurationInNs();
// results for some words to verify the correctness of the word counter
// final CountingMap<String> map = wcc.getResult();
// LOGGER.info("vero: " + (map.get("vero") == 3813850) + "->" + map.get("vero") + " should be " + 3813850);
// LOGGER.info("sit: " + (map.get("sit") == 7627700) + "->" + map.get("sit") + " should be " + 7627700);
final File outputFile = new File("timings.txt");
writeTimingsToFile(outputFile, timings);
// some statistics about the output pipes of the distributor
LOGGER.info("distributor pipes:");
for (final AbstractPort<?> port : wcc.getDistributorPorts()) {
final IMonitorablePipe spscPipe = (IMonitorablePipe) port.getPipe();
LOGGER.info("numWaits: " + spscPipe.getNumWaits());
}
LOGGER.info("distributor waits: " + ((NonBlockingRoundRobinStrategy) wcc.getDistributor().getStrategy()).getNumWaits());
// some statistics about the output pipes of the distributor
LOGGER.info("merger pipes:");
for (final AbstractPort<?> port : wcc.getMergerPorts()) {
final IMonitorablePipe spscPipe = (IMonitorablePipe) port.getPipe();
LOGGER.info("numWaits: " + spscPipe.getNumWaits());
}
}
private static int parseAsInteger(final String value, final int defaultValue) {
int numWorkerThreads;
try {
numWorkerThreads = Integer.valueOf(value);
} catch (final NumberFormatException e) {
numWorkerThreads = defaultValue;
}
return numWorkerThreads;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %level %logger - %msg%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="CONSOLE" />
</root>
<logger name="teetime.framework" level="INFO" />
<logger name="teetime.wordcounter" level="INFO" />
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment