Wednesday, April 11, 2012

Understanding Context Switch From a Java Programmer's View

Multithreads is often referred in high performance programming. At the early beginning, there was a understanding came into our mind: The executing speed of multithreads which runs in parallel is always faster than single thread.

Wednesday, January 11, 2012

Terrible memory leak on jvm native heap when running a storm topology

Issue


Days ago, our testing storm cluster was damaged without any clue on /var/log/messages. Fortunately, I found a terrible memory increasing before machines went down through Ganglia. Below is a slave node hadn't survive.

Memory usage increased very fast, and occupied the rest swap space in a rapid manner until there is no space left.

Analysis


Later, I found a topology written by my colleague could caused that phenomenon. I resubmitted it for finding out the reason on the following steps.
  1. Find the pid of a worker by jps

    $ jps
    23361 worker
    7237 supervisor
    23690 Jps


  2. Check its memory usage by jmap


    $ jmap -heap 23361
    Attaching to process ID 23361, please wait...
    Debugger attached successfully.
    Server compiler detected.
    JVM version is 19.0-b09

    using thread-local object allocation.
    Parallel GC with 13 thread(s)

    Heap Configuration:
    MinHeapFreeRatio = 40
    MaxHeapFreeRatio = 70
    MaxHeapSize = 805306368 (768.0MB)
    NewSize = 1310720 (1.25MB)
    MaxNewSize = 17592186044415 MB
    OldSize = 5439488 (5.1875MB)
    NewRatio = 2
    SurvivorRatio = 8
    PermSize = 21757952 (20.75MB)
    MaxPermSize = 268435456 (256.0MB)

    Heap Usage:
    PS Young Generation
    Eden Space:
    capacity = 263454720 (251.25MB)
    used = 74242272 (70.80294799804688MB)
    free = 189212448 (180.44705200195312MB)
    28.18027781016791% used
    From Space:
    capacity = 2424832 (2.3125MB)
    used = 2423600 (2.3113250732421875MB)
    free = 1232 (0.0011749267578125MB)
    99.94919235641892% used
    To Space:
    capacity = 2555904 (2.4375MB)
    used = 0 (0.0MB)
    free = 2555904 (2.4375MB)
    0.0% used
    PS Old Generation
    capacity = 536870912 (512.0MB)
    used = 20846872 (19.881126403808594MB)
    free = 516024040 (492.1188735961914MB)
    3.883032500743866% used
    PS Perm Generation
    capacity = 28442624 (27.125MB)
    used = 28372328 (27.057960510253906MB)
    free = 70296 (0.06703948974609375MB)
    99.75284980738768% used

    To my surprise, quite a few memory are used by this process.
  3. Confirm that there is no jvm memory problems on this process by jstat

    $ jstat -gcutil 23361 1000
    S0 S1 E O P YGC YGCT FGC FGCT GCT
    39.43 0.00 60.61 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 60.72 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 62.06 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 87.31 2.18 99.60 118 0.232 0 0.000 0.232
    0.00 64.42 38.64 2.18 99.61 119 0.233 0 0.000 0.233
    0.00 64.42 67.66 2.18 99.61 119 0.233 0 0.000 0.233
    67.51 0.00 16.28 2.19 99.61 120 0.234 0 0.000 0.234
    67.51 0.00 61.42 2.19 99.61 120 0.234 0 0.000 0.234
    0.00 43.17 26.09 2.19 99.61 121 0.236 0 0.000 0.236
    0.00 43.17 80.43 2.19 99.62 121 0.236 0 0.000 0.236
    64.52 0.00 7.98 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 30.61 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 44.38 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 78.44 2.20 99.62 122 0.237 0 0.000 0.237
    0.00 70.64 8.95 2.21 99.62 123 0.238 0 0.000 0.238
    0.00 70.64 83.90 2.21 99.62 123 0.238 0 0.000 0.2389


  4. List the top 1 of those occupied large piece of memories after submitted the bad topology a few minutes.

    $ ps auxk -rss | less
    USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
    50957 23361 10.3 10.4 3630988 2589912 ? Sl 09:53 2:04 java -server -Xmx768m -Xms768m -XX:MaxPermSize=256m -XX:+UseParallelGC -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64:/lib64:/usr/local/lib64 -Dlogfile.name=worker-6700.log -Dlog4j.configuration=storm.log.properties -cp /home/min/storm-0.6.0/storm-0.6.0.jar:/home/min/storm-0.6.0/lib/slf4j-log4j12-1.5.8.jar:/home/min/storm-0.6.0/lib/clout-0.4.1.jar:/home/min/storm-0.6.0/lib/commons-io-1.4.jar:/home/min/storm-0.6.0/lib/ring-servlet-0.3.11.jar:/home/min/storm-0.6.0/lib/jvyaml-1.0.0.jar:/home/min/storm-0.6.0/lib/json-simple-1.1.jar:/home/min/storm-0.6.0/lib/compojure-0.6.4.jar:/home/min/storm-0.6.0/lib/clojure-1.2.0.jar:/home/min/storm-0.6.0/lib/httpcore-4.0.1.jar:/home/min/storm-0.6.0/lib/httpclient-4.0.1.jar:/home/min/storm-0.6.0/lib/clojure-contrib-1.2.0.jar:/home/min/storm-0.6.0/lib/clj-time-0.3.0.jar:/home/min/storm-0.6.0/lib/ring-core-0.3.10.jar:/home/min/storm-0.6.0/lib/log4j-1.2.16.jar:/home/min/storm-0.6.0/lib/commons-codec-1.3.jar:/home/min/storm-0.6.0/lib/carbonite-1.0.0.jar:/home/min/storm-0.6.0/lib/asm-3.2.jar:/home/min/storm-0.6.0/lib/kryo-1.04.jar:/home/min/storm-0.6.0/lib/slf4j-api-1.5.8.jar:/home/min/storm-0.6.0/lib/hiccup-0.3.6.jar:/home/min/storm-0.6.0/lib/commons-fileupload-1.2.1.jar:/home/min/storm-0.6.0/lib/ring-jetty-adapter-0.3.11.jar:/home/min/storm-0.6.0/lib/commons-lang-2.5.jar:/home/min/storm-0.6.0/lib/jline-0.9.94.jar:/home/min/storm-0.6.0/lib/servlet-api-2.5.jar:/home/min/storm-0.6.0/lib/libthrift7-0.7.0.jar:/home/min/storm-0.6.0/lib/tools.macro-0.1.0.jar:/home/min/storm-0.6.0/lib/minlog-1.2.jar:/home/min/storm-0.6.0/lib/joda-time-1.6.jar:/home/min/storm-0.6.0/lib/servlet-api-2.5-20081211.jar:/home/min/storm-0.6.0/lib/reflectasm-1.01.jar:/home/min/storm-0.6.0/lib/jetty-util-6.1.26.jar:/home/min/storm-0.6.0/lib/commons-exec-1.1.jar:/home/min/storm-0.6.0/lib/jetty-6.1.26.jar:/home/min/storm-0.6.0/lib/junit-3.8.1.jar:/home/min/storm-0.6.0/lib/core.incubator-0.1.0.jar:/home/min/storm-0.6.0/lib/zookeeper-3.3.2.jar:/home/min/storm-0.6.0/lib/jzmq-2.1.0.jar:/home/min/storm-0.6.0/lib/commons-logging-1.1.1.jar:/home/min/storm-0.6.0/log4j:/home/min/storm-0.6.0/conf:/disk1/storm/supervisor/stormdist/hbasebmwusersextra-267-1326246807/stormjar.jar backtype.storm.daemon.worker hbasebmwusersextra-267-1326246807 86f82bde-eee2-4e8d-92a8-ec3746769c67 6700 a453430f-b314-40cc-8a01-12dd8f92cf06

    2589912 kilobytes was allocated by this process in only 2 minutes. The figure is even much larger than we specified on the java option "-Xmx768m". I soon realized that there is a native memory leak out of the jvm heap.


Profiling


Since the problem is native heap leak, jvm tools couldn't help. I decided to use Valgrind because it can trace the memory usage of a subprocess, such feature is suitable for profiling storm. I modified bin/storm by adding a command line prefix for supervisor. Thus after deploying such script into one of the node of our cluster, it could profiling workers launched by supervisor on that node.

diff --git a/bin/storm b/bin/storm
index 779f61b..4457b0a 100755
--- a/bin/storm
+++ b/bin/storm
@@ -91,7 +91,7 @@ def nimbus():
def supervisor():
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
- exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts)
+ exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts, prefix="valgrind --trace-children=yes --leak-check=full"

def ui():
childopts = "-Xmx768m -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"

Unfortunately, the supervisor hung after restarting. I tried to modify the code of supervisor in order to use Valgrind and Google perftools but got a similar failure: worker hung.

diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj
index a38e726..da38845 100644
--- a/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/src/clj/backtype/storm/daemon/supervisor.clj
@@ -369,7 +369,7 @@
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (conf WORKER-CHILDOPTS) "%ID%" (str port))
logfilename (str "worker-" port ".log")
- command (str "java -server " childopts
+ command (str "env LD_PRELOAD=/lib/libtcmalloc.so HEAPPROFILE=/home/min/logs/perf java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
" -Dlogfile.name=" logfilename
" -Dlog4j.configuration=storm.log.properties"

Simulating


At the same time, I did a quick inspection on the code of that topology and write some code to track each phase of that topology. The result showed that the producer side(Spout) emitted 5 pieces of messages every 100 ms, each of them are about 130 kilobytes length, meanwhile, the consumer side(Bolt) would take about 40-80 ms on consuming one. The producing is much faster than consuming. It seemed messages are accumulate in memory. I wrote some code to simulate this issue and seen the same memory leak as expect.

import java.util.Map;

import backtype.storm.topology.TopologyBuilder;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.Config;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.topology.IRichBolt;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.StormSubmitter;

/**
* This class simulates a topology which would cause all workers
* get terrible memory leaks w/o any oom exception till the node
* down
*/
public class MemLeakTopology {

public static class MemLeakSpout implements IRichSpout {
SpoutOutputCollector _collector;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
}

private byte[] buffer = new byte[131072];
public void nextTuple() {
Utils.sleep(100);
for (int i = 0; i < 5; i++) {
_collector.emit(new Values(buffer));
}
}

public boolean isDistributed() {return true;}
public void close() { }
public void ack(Object msgId) {}
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("buffer"));
}
}

public static class MemLeakBolt implements IRichBolt {

public void execute(Tuple input) {
Utils.sleep(100);
}

public void cleanup() {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MemLeakSpout(), 2);
builder.setBolt("bolt", new MemLeakBolt(), 2)
.shuffleGrouping("spout");
Config conf = new Config();
conf.setMaxTaskParallelism(2);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
StormSubmitter.submitTopology("mem_leak_topology", conf,
builder.createTopology());
}
}

Profiling with Systemtap


Since Valgrind & Google perftools weren't okay, Another more powerful tool - Systemtap would help if glibc debuginfo was installed. The script is quite simple, just backtrace the callgraph of malloc().

probe process("/lib64/libc.so.6").function("malloc") {
if (target()== pid()) {
print_ubacktrace();
}
}


Result showed that libzmq called malloc again and again
0x3eb0274b96 : malloc+0x16/0x230 [/lib64/libc-2.5.so]
0x2aaab36a4209 [/usr/lib64/libzmq.so.1.0.0+0x2d209/0x7c000]
0x3eb0274b96 : malloc+0x16/0x230 [/lib64/libc-2.5.so]
0x2aaab36a4209 [/usr/lib64/libzmq.so.1.0.0+0x2d209/0x7c000]
...

It's not completely prove zeromq is exactly the reason of memory leak since the library might subsequently call the free() method, however, zeromq is a suspect at least.

Profiling with Google perftools


The reason why I stopped using Systemtap is someone on the mailing list has pointed my mistake while installing perftools on a x86_64 system.
This is because you are using libunwind. See the README and INSTALL
file for some of the dangers of doing so. You may have to compile
perftools with --enable-frame-pointers instead.

Using the libunwind library to get stack-traces could cause program hangs or crashes on x86_64 64-bit systems. After rebuilding with --enable-frame-pointers, things started to be okay. The major culprit was finally caught.

Total: 15310.7 MB
14894.8 97.3% 97.3% 14894.8 97.3% zmq_msg_init_size
299.4 2.0% 99.2% 299.4 2.0% os::malloc
51.0 0.3% 99.6% 51.0 0.3% init
29.5 0.2% 99.8% 29.5 0.2% zcalloc
27.4 0.2% 99.9% 27.4 0.2% _init@c9e8
6.0 0.0% 100.0% 6.0 0.0% readCEN
1.0 0.0% 100.0% 1.0 0.0% ObjectSynchronizer::inflate
0.3 0.0% 100.0% 0.3 0.0% _dl_new_object
0.2 0.0% 100.0% 5.5 0.0% nmethod::new_nmethod

Messages unsettled are accumulated in memory. You may have some objection on whether it's memory leak or not for the reason message accumulation is expected with high memory occupation. However, I think storm should at least throttle that accumulation which could cause the whole cluster down.

Labels: , ,


Friday, January 6, 2012

Understanding Hadoop 0.23 Internals: Yarn

SEDA style model









// ContainerExecutor.java
/**
* Prepare the environment for containers in this application to execute.
* For $x in local.dirs
* create $x/$user/$appId
* Copy $nmLocal/appTokens -> $N/$user/$appId
* For $rsrc in private resources
* Copy $rsrc -> $N/$user/filecache/[idef]
* For $rsrc in job resources
* Copy $rsrc -> $N/$user/$appId/filecache/idef
* @param user user name of application owner
* @param appId id of the application
* @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
* @param localDirs nm-local-dirs
* @param logDirs nm-log-dirs
* @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM
*/
public abstract void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
List localDirs, List logDirs)
throws IOException, InterruptedException;


TBD...

Hadoop 0.23 Building From the Source

Hadoop 0.23 was released several ago. The most notable improvements of this version include two architectural changes.
  1. NameNode Federation For the purpose of a better scalability, Namenode is divided into several nodes, each of which holds an independent namespace. Datanodes register with all of those namespaces, they are responsible for providing storage for data blocks.

  2. Yarn Also for the purpose of a better scalability, the functionality of JobTracker is separated into two parts: resource management and job monitoring/tracking. The first part with its name of yarn is abstracted into a new component of hadoop infrastructure, like the kernel means to Linux. MapReduce framework will just be a user space application of yarn.

In order to building hadoop 0.23, you should prepare those executables or libraries list below
  1. JDK1.6

  2. Maven Hadoop 0.23 will no longer use ant any more, you can download maven from here and install it follow the instructions of that link.

  3. Protobuf Google protobuf is used in hadoop 0.23 for new version of RPC. Protoc is needed when building hadoop. If this tool hasn't been installed, the building would got an error. So download protobuf from here and install it by ./configure && make && sudo make install

NOTE That would be better not do the building on an ubuntu system, or building would failed when compiling native code
[INFO] /bin/bash ./libtool  --tag=CC   --mode=compile gcc -DHAVE_CONFIG_H -I.  -I/home/min/software/jdk1.6.0_29/include -I/home/min/software/jdk1.6.0_29/include/linux -I/home/min/code/hadoop/hadoop/branch-0.23/hadoop-common-project/hadoop-common/target/native/src -I/home/min/code/hadoop/hadoop/branch-0.23/hadoop-common-project/hadoop-common/target/native/javah -I/usr/local/include -g -Wall -fPIC -O2 -m32 -g -O2 -MT ZlibCompressor.lo -MD -MP -MF .deps/ZlibCompressor.Tpo -c -o ZlibCompressor.lo `test -f 'src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c' || echo './'`src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
[INFO] libtool: compile: gcc -DHAVE_CONFIG_H -I. -I/home/min/software/jdk1.6.0_29/include -I/home/min/software/jdk1.6.0_29/include/linux -I/home/min/code/hadoop/hadoop/branch-0.23/hadoop-common-project/hadoop-common/target/native/src -I/home/min/code/hadoop/hadoop/branch-0.23/hadoop-common-project/hadoop-common/target/native/javah -I/usr/local/include -g -Wall -fPIC -O2 -m32 -g -O2 -MT ZlibCompressor.lo -MD -MP -MF .deps/ZlibCompressor.Tpo -c src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c -fPIC -DPIC -o .libs/ZlibCompressor.o
[INFO] src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c: In function 'Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_initIDs':
[INFO] src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c:71:41: error: expected expression before ',' token
[INFO] make: *** [ZlibCompressor.lo] Error 1

Here is an explanation from an ubuntu core developer

More recent version of ubuntu only link 'as-needed' so the macro used to determine the name of the libz.so fails.

Now that you can download a tarball release of Hadoop from here, or fetch the source code by subversion.
 svn co http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23

Enter the source directory and package it by maven.

cd branch-0.23
mvn clean install package -Pdist -Dtar -Pnative -DskipTests=true

Finally, make sure all target tarballs are built.

$ find . -name "*.tar.gz"
./hadoop-mapreduce-project/target/hadoop-mapreduce-0.23.1-SNAPSHOT.tar.gz
./hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/test.tar.gz
./hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-0.23.1-SNAPSHOT.tar.gz
./hadoop-project-dist/target/hadoop-project-dist-0.23.1-SNAPSHOT.tar.gz
./hadoop-hdfs-project/hadoop-hdfs-httpfs/downloads/tomcat.tar.gz
./hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-0.23.1-SNAPSHOT/share/hadoop/httpfs/tomcat/bin/tomcat-native.tar.gz
./hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-0.23.1-SNAPSHOT/share/hadoop/httpfs/tomcat/bin/commons-daemon-native.tar.gz
./hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-0.23.1-SNAPSHOT.tar.gz
./hadoop-hdfs-project/hadoop-hdfs/downloads/commons-daemon-1.0.3-bin-linux-i686.tar.gz
./hadoop-hdfs-project/hadoop-hdfs/target/hadoop-hdfs-0.23.1-SNAPSHOT.tar.gz
./hadoop-dist/target/hadoop-0.23.1-SNAPSHOT/share/hadoop/httpfs/tomcat/bin/tomcat-native.tar.gz
./hadoop-dist/target/hadoop-0.23.1-SNAPSHOT/share/hadoop/httpfs/tomcat/bin/commons-daemon-native.tar.gz
./hadoop-dist/target/hadoop-0.23.1-SNAPSHOT.tar.gz
./hadoop-common-project/hadoop-common/target/hadoop-common-0.23.1-SNAPSHOT.tar.gz

Labels: , , , ,


Wednesday, March 25, 2009

large-scale support vector machines

newton method



This page is powered by Blogger. Isn't yours?

Subscribe to Posts [Atom]