Wednesday, January 11, 2012

Terrible memory leak on jvm native heap when running a storm topology

Issue


Days ago, our testing storm cluster was damaged without any clue on /var/log/messages. Fortunately, I found a terrible memory increasing before machines went down through Ganglia. Below is a slave node hadn't survive.

Memory usage increased very fast, and occupied the rest swap space in a rapid manner until there is no space left.

Analysis


Later, I found a topology written by my colleague could caused that phenomenon. I resubmitted it for finding out the reason on the following steps.
  1. Find the pid of a worker by jps

    $ jps
    23361 worker
    7237 supervisor
    23690 Jps


  2. Check its memory usage by jmap


    $ jmap -heap 23361
    Attaching to process ID 23361, please wait...
    Debugger attached successfully.
    Server compiler detected.
    JVM version is 19.0-b09

    using thread-local object allocation.
    Parallel GC with 13 thread(s)

    Heap Configuration:
    MinHeapFreeRatio = 40
    MaxHeapFreeRatio = 70
    MaxHeapSize = 805306368 (768.0MB)
    NewSize = 1310720 (1.25MB)
    MaxNewSize = 17592186044415 MB
    OldSize = 5439488 (5.1875MB)
    NewRatio = 2
    SurvivorRatio = 8
    PermSize = 21757952 (20.75MB)
    MaxPermSize = 268435456 (256.0MB)

    Heap Usage:
    PS Young Generation
    Eden Space:
    capacity = 263454720 (251.25MB)
    used = 74242272 (70.80294799804688MB)
    free = 189212448 (180.44705200195312MB)
    28.18027781016791% used
    From Space:
    capacity = 2424832 (2.3125MB)
    used = 2423600 (2.3113250732421875MB)
    free = 1232 (0.0011749267578125MB)
    99.94919235641892% used
    To Space:
    capacity = 2555904 (2.4375MB)
    used = 0 (0.0MB)
    free = 2555904 (2.4375MB)
    0.0% used
    PS Old Generation
    capacity = 536870912 (512.0MB)
    used = 20846872 (19.881126403808594MB)
    free = 516024040 (492.1188735961914MB)
    3.883032500743866% used
    PS Perm Generation
    capacity = 28442624 (27.125MB)
    used = 28372328 (27.057960510253906MB)
    free = 70296 (0.06703948974609375MB)
    99.75284980738768% used

    To my surprise, quite a few memory are used by this process.
  3. Confirm that there is no jvm memory problems on this process by jstat

    $ jstat -gcutil 23361 1000
    S0 S1 E O P YGC YGCT FGC FGCT GCT
    39.43 0.00 60.61 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 60.72 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 62.06 2.18 99.60 118 0.232 0 0.000 0.232
    39.43 0.00 87.31 2.18 99.60 118 0.232 0 0.000 0.232
    0.00 64.42 38.64 2.18 99.61 119 0.233 0 0.000 0.233
    0.00 64.42 67.66 2.18 99.61 119 0.233 0 0.000 0.233
    67.51 0.00 16.28 2.19 99.61 120 0.234 0 0.000 0.234
    67.51 0.00 61.42 2.19 99.61 120 0.234 0 0.000 0.234
    0.00 43.17 26.09 2.19 99.61 121 0.236 0 0.000 0.236
    0.00 43.17 80.43 2.19 99.62 121 0.236 0 0.000 0.236
    64.52 0.00 7.98 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 30.61 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 44.38 2.20 99.62 122 0.237 0 0.000 0.237
    64.52 0.00 78.44 2.20 99.62 122 0.237 0 0.000 0.237
    0.00 70.64 8.95 2.21 99.62 123 0.238 0 0.000 0.238
    0.00 70.64 83.90 2.21 99.62 123 0.238 0 0.000 0.2389


  4. List the top 1 of those occupied large piece of memories after submitted the bad topology a few minutes.

    $ ps auxk -rss | less
    USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
    50957 23361 10.3 10.4 3630988 2589912 ? Sl 09:53 2:04 java -server -Xmx768m -Xms768m -XX:MaxPermSize=256m -XX:+UseParallelGC -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64:/lib64:/usr/local/lib64 -Dlogfile.name=worker-6700.log -Dlog4j.configuration=storm.log.properties -cp /home/min/storm-0.6.0/storm-0.6.0.jar:/home/min/storm-0.6.0/lib/slf4j-log4j12-1.5.8.jar:/home/min/storm-0.6.0/lib/clout-0.4.1.jar:/home/min/storm-0.6.0/lib/commons-io-1.4.jar:/home/min/storm-0.6.0/lib/ring-servlet-0.3.11.jar:/home/min/storm-0.6.0/lib/jvyaml-1.0.0.jar:/home/min/storm-0.6.0/lib/json-simple-1.1.jar:/home/min/storm-0.6.0/lib/compojure-0.6.4.jar:/home/min/storm-0.6.0/lib/clojure-1.2.0.jar:/home/min/storm-0.6.0/lib/httpcore-4.0.1.jar:/home/min/storm-0.6.0/lib/httpclient-4.0.1.jar:/home/min/storm-0.6.0/lib/clojure-contrib-1.2.0.jar:/home/min/storm-0.6.0/lib/clj-time-0.3.0.jar:/home/min/storm-0.6.0/lib/ring-core-0.3.10.jar:/home/min/storm-0.6.0/lib/log4j-1.2.16.jar:/home/min/storm-0.6.0/lib/commons-codec-1.3.jar:/home/min/storm-0.6.0/lib/carbonite-1.0.0.jar:/home/min/storm-0.6.0/lib/asm-3.2.jar:/home/min/storm-0.6.0/lib/kryo-1.04.jar:/home/min/storm-0.6.0/lib/slf4j-api-1.5.8.jar:/home/min/storm-0.6.0/lib/hiccup-0.3.6.jar:/home/min/storm-0.6.0/lib/commons-fileupload-1.2.1.jar:/home/min/storm-0.6.0/lib/ring-jetty-adapter-0.3.11.jar:/home/min/storm-0.6.0/lib/commons-lang-2.5.jar:/home/min/storm-0.6.0/lib/jline-0.9.94.jar:/home/min/storm-0.6.0/lib/servlet-api-2.5.jar:/home/min/storm-0.6.0/lib/libthrift7-0.7.0.jar:/home/min/storm-0.6.0/lib/tools.macro-0.1.0.jar:/home/min/storm-0.6.0/lib/minlog-1.2.jar:/home/min/storm-0.6.0/lib/joda-time-1.6.jar:/home/min/storm-0.6.0/lib/servlet-api-2.5-20081211.jar:/home/min/storm-0.6.0/lib/reflectasm-1.01.jar:/home/min/storm-0.6.0/lib/jetty-util-6.1.26.jar:/home/min/storm-0.6.0/lib/commons-exec-1.1.jar:/home/min/storm-0.6.0/lib/jetty-6.1.26.jar:/home/min/storm-0.6.0/lib/junit-3.8.1.jar:/home/min/storm-0.6.0/lib/core.incubator-0.1.0.jar:/home/min/storm-0.6.0/lib/zookeeper-3.3.2.jar:/home/min/storm-0.6.0/lib/jzmq-2.1.0.jar:/home/min/storm-0.6.0/lib/commons-logging-1.1.1.jar:/home/min/storm-0.6.0/log4j:/home/min/storm-0.6.0/conf:/disk1/storm/supervisor/stormdist/hbasebmwusersextra-267-1326246807/stormjar.jar backtype.storm.daemon.worker hbasebmwusersextra-267-1326246807 86f82bde-eee2-4e8d-92a8-ec3746769c67 6700 a453430f-b314-40cc-8a01-12dd8f92cf06

    2589912 kilobytes was allocated by this process in only 2 minutes. The figure is even much larger than we specified on the java option "-Xmx768m". I soon realized that there is a native memory leak out of the jvm heap.


Profiling


Since the problem is native heap leak, jvm tools couldn't help. I decided to use Valgrind because it can trace the memory usage of a subprocess, such feature is suitable for profiling storm. I modified bin/storm by adding a command line prefix for supervisor. Thus after deploying such script into one of the node of our cluster, it could profiling workers launched by supervisor on that node.

diff --git a/bin/storm b/bin/storm
index 779f61b..4457b0a 100755
--- a/bin/storm
+++ b/bin/storm
@@ -91,7 +91,7 @@ def nimbus():
def supervisor():
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
- exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts)
+ exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts, prefix="valgrind --trace-children=yes --leak-check=full"

def ui():
childopts = "-Xmx768m -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"

Unfortunately, the supervisor hung after restarting. I tried to modify the code of supervisor in order to use Valgrind and Google perftools but got a similar failure: worker hung.

diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj
index a38e726..da38845 100644
--- a/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/src/clj/backtype/storm/daemon/supervisor.clj
@@ -369,7 +369,7 @@
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (conf WORKER-CHILDOPTS) "%ID%" (str port))
logfilename (str "worker-" port ".log")
- command (str "java -server " childopts
+ command (str "env LD_PRELOAD=/lib/libtcmalloc.so HEAPPROFILE=/home/min/logs/perf java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
" -Dlogfile.name=" logfilename
" -Dlog4j.configuration=storm.log.properties"

Simulating


At the same time, I did a quick inspection on the code of that topology and write some code to track each phase of that topology. The result showed that the producer side(Spout) emitted 5 pieces of messages every 100 ms, each of them are about 130 kilobytes length, meanwhile, the consumer side(Bolt) would take about 40-80 ms on consuming one. The producing is much faster than consuming. It seemed messages are accumulate in memory. I wrote some code to simulate this issue and seen the same memory leak as expect.

import java.util.Map;

import backtype.storm.topology.TopologyBuilder;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.Config;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.topology.IRichBolt;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.StormSubmitter;

/**
* This class simulates a topology which would cause all workers
* get terrible memory leaks w/o any oom exception till the node
* down
*/
public class MemLeakTopology {

public static class MemLeakSpout implements IRichSpout {
SpoutOutputCollector _collector;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
}

private byte[] buffer = new byte[131072];
public void nextTuple() {
Utils.sleep(100);
for (int i = 0; i < 5; i++) {
_collector.emit(new Values(buffer));
}
}

public boolean isDistributed() {return true;}
public void close() { }
public void ack(Object msgId) {}
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("buffer"));
}
}

public static class MemLeakBolt implements IRichBolt {

public void execute(Tuple input) {
Utils.sleep(100);
}

public void cleanup() {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MemLeakSpout(), 2);
builder.setBolt("bolt", new MemLeakBolt(), 2)
.shuffleGrouping("spout");
Config conf = new Config();
conf.setMaxTaskParallelism(2);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
StormSubmitter.submitTopology("mem_leak_topology", conf,
builder.createTopology());
}
}

Profiling with Systemtap


Since Valgrind & Google perftools weren't okay, Another more powerful tool - Systemtap would help if glibc debuginfo was installed. The script is quite simple, just backtrace the callgraph of malloc().

probe process("/lib64/libc.so.6").function("malloc") {
if (target()== pid()) {
print_ubacktrace();
}
}


Result showed that libzmq called malloc again and again
0x3eb0274b96 : malloc+0x16/0x230 [/lib64/libc-2.5.so]
0x2aaab36a4209 [/usr/lib64/libzmq.so.1.0.0+0x2d209/0x7c000]
0x3eb0274b96 : malloc+0x16/0x230 [/lib64/libc-2.5.so]
0x2aaab36a4209 [/usr/lib64/libzmq.so.1.0.0+0x2d209/0x7c000]
...

It's not completely prove zeromq is exactly the reason of memory leak since the library might subsequently call the free() method, however, zeromq is a suspect at least.

Profiling with Google perftools


The reason why I stopped using Systemtap is someone on the mailing list has pointed my mistake while installing perftools on a x86_64 system.
This is because you are using libunwind. See the README and INSTALL
file for some of the dangers of doing so. You may have to compile
perftools with --enable-frame-pointers instead.

Using the libunwind library to get stack-traces could cause program hangs or crashes on x86_64 64-bit systems. After rebuilding with --enable-frame-pointers, things started to be okay. The major culprit was finally caught.

Total: 15310.7 MB
14894.8 97.3% 97.3% 14894.8 97.3% zmq_msg_init_size
299.4 2.0% 99.2% 299.4 2.0% os::malloc
51.0 0.3% 99.6% 51.0 0.3% init
29.5 0.2% 99.8% 29.5 0.2% zcalloc
27.4 0.2% 99.9% 27.4 0.2% _init@c9e8
6.0 0.0% 100.0% 6.0 0.0% readCEN
1.0 0.0% 100.0% 1.0 0.0% ObjectSynchronizer::inflate
0.3 0.0% 100.0% 0.3 0.0% _dl_new_object
0.2 0.0% 100.0% 5.5 0.0% nmethod::new_nmethod

Messages unsettled are accumulated in memory. You may have some objection on whether it's memory leak or not for the reason message accumulation is expected with high memory occupation. However, I think storm should at least throttle that accumulation which could cause the whole cluster down.

Labels: , ,


Comments:
Great post. It really helped me figure out what the hell was going on with my cluster. Thanks!
 

Post a Comment

Subscribe to Post Comments [Atom]





<< Home

This page is powered by Blogger. Isn't yours?

Subscribe to Posts [Atom]