Hadoop Cluster Configuration Tips

Planning

Plan to use a centralized configuration management framework once your cluster approaches five nodes. Your approach should include both tools and processes for version control, copying files, and keeping cluster configurations in sync. Use monitoring tools such as Nagios to track the performance and health of systems throughout the cluster.

Network Setup

The network setup is crucial for a working Hadoop configuration. Don't configure Hadoop with IP addresses for each item in the cluster (jobtracker, namenode, slaves) - Hadoop looks up the DNS number for the IP address and use those values. Setting up the correct /etc/hosts files is important. Each node in the network should be able to convert its own IP into the correct DNS and also should be able to resolve the DNS of every other node to its correct IP address.
Another problematic case is the internal vs. external network interface. This can be configured with the parameters mapred.tasktracker.dns.interface and dfs.datanode.dns.interface.
Disabling Ipv6 can resolve problems and improve throughput (-Djava.net.preferIPv4Stack=true for HADOOP_OPTS).

Directories and Partitioning

This guide assumes the following good foundation: N>2 discs, 1 system partition mirrored to at least 2 disc, N data partitions.

  • Configure the big data (dfs.data.dir, mapred.local.dir) to use all N data partitions.
  • Reserve enough space (dfs.datanode.du.reserved) for the temporary data of the biggest Hadoop job to expect.
  • Log files and the general tmp folder should remain on the system partition since they remain small and in case one disc fails the system is still fully functional.
  • The directories for the FileSystem-Image data should also remain on the system partition.

Backup

Critical to backup is the FileSystem-Image data managed by the namenode (dfs.name.dir : latest image + edit logs) and by the secondary namenode (fs.checkpoint.dir : latest image). There are various possibilities:

  • Have the data on multiple disks (on the master host). Either by having a mirrored system partition or by specifying multiple directories separated by commas.
  • Start the sec-namenode on a second machine (so you have the latest image on two hosts and the edit logs on one host).
  • Write one or both directories to NFS-share as well ( (warning) Caution: dfs.name.dir pointing on a NFS-Share can slow down the namenode).
  • Have an external backup mechanism which grabs the directories periodically.

Replication

The replication factor of the data has influence in multiple dimensions:

  • Greater replication means less free disk space.
  • Greater replication provides a greater chance of data locality and better performance.
  • Greater replication means better data reliability.

Compression

  • BLOCK compression can have huge benefits (both space and performance wise)
  • Compression must be configured (mapred.compress.map.output, mapred.output.compress)
  • Compression has different codecs, ZLib by default (native as well)
  • LZO is not as good at compression as GZip but it is a lot faster
    • Note that it uses a GPL license

Changing DFS Block Size

  • DFS block size is defaulted in Datameer X to 64MB
  • Open the Datameer X application
  • Click on the Administration tab at the top of the screen.
  • Select Hadoop Cluster from the navigation bar on the left and Hadoop Cluster in the mode settings.
  • In the custom properties box type in the new block size
    • dfs.block.size=[size]

The block size must be an integer and can't be a string value.

Example: (134217728 = 128mb)


dfs.block.size=134217728

Memory and Task Calculation

Setting the task count is essential for a good functioning cluster. Together,  mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum specify how many tasks can run on one node at a time. The optimal numbers depends on various factors:

  • Available RAM
    • tasktracker and datanode take each 1GB of RAM per default
    • For each task calculate mapred.child.java.opts (200MB per default) of RAM
  • Number of cores and average % of used CPU per task
    • Per core one task
    • If you know that your tasks usually occupy only 75% of the CPU, the number of tasks can be greater then the number of cores
    • Example: 4 cores, jobs use ~75% of CPU, free slots= 5, maxMapTasks=3, maxReduceTasks=2

Resource Allocation

Factors in resource allocation include CPU power, disk capacity, network speed, and which other processes are sharing a node. As you set up your system, record values, observe performance, make adjustments, and track changes. Are they improving performance or not? In a shared environment Hadoop can't effectively determine how to allocate data blocks or where to schedule tasks. Consider the network architecture and avoid spreading the Hadoop cluster across a data center.

If you see excessive swapping, long-running tasks, or out of memory failures, you need to change how you allocate resources. 

Here are a few tips:

  • If you have a CPU-intensive MapReduce application, start with higher values and adjust down as resources are maxed out.
  • If you have an I/O intensive application, start with lower values and adjust up.
  • Increasing DataNode and TaskTracker settings has an adverse impact on RAM available to individual MapReduce tasks. On large hardware, they can be set high. In general, unless you have several dozen more more nodes working together, using higher settings wastes system resources like RAM that could be better applied to running your mapper and reducer code.
  • The operations between Map and Reduce phases of a job can cause a lot of network traffic. Plan for it accordingly if you are using a shared network.
  • Set up the Hadoop cluster with dedicated hardware if possible. For example, set up a dedicated switch or set of switches and a dedicated rack or set of racks to optimize performance when nodes communicate with each other.
  • If you have processes that open many files, you may want to set the ulimit higher than the default. You can try using 16384 and adjust as you monitor performance. Use the command # ulimit -n 16384. You also set the ulimit for the hadoop user in /etc/security/limits.conf; this sets the value persistently. Enter the value hadoop hard nofile 16384.
  • If you have lots of RAM available, set io.sort.factor to 25 or 32 (up from 10). io.sort.mb should be 10 * io.sort.factor. Remember to multiply io.sort.mb by the number of concurrent tasks to determine how much RAM you’re allocating here, to prevent swapping. (So 10 task instances with io.sort.mb = 320 means you’re allocating 3.2 GB of RAM for sorting, up from 1.0 GB.)
  • Try setting io.file.buffer.size – to 65536.
  • If the NameNode and JobTracker are on big hardware, try setting both dfs.namenode.handler.count and mapred.job.tracker.handler.count to 64.

Job Scheduling

When not configured, Hadoop uses FIFO job scheduling. When a bunch of jobs are submitted, the first job uses all resources (map- and reduce-slots) which are available and the subsequent jobs have to wait on freed resources. Luckily Hadoop's scheduler is pluggable and there are two major implementations. The FairScheduler and the Capacity Scheduler bring such features as:

  • Share resources between queues--jobs can be submitted to specific queues which can have a specific resource share
  • Share resources between users
  • Priority jobs

Log Files

  • Spend extra time at first getting to know the common patterns in your log files so you know where to look to find out if hardware is having a problem or why a task has taken longer than usual to run.
  • Hadoop Daemon Logs: File names use the pattern hadoop-<user-running-hadoop>- <daemon> -<hostname>.log In them, look for cluster-specific errors and warnings that might have to do with daemons running incorrectly.
  • Job Configuration XML: These logs are stored in :/var/log/hadoop and /var/log/hadoop/history. The XML file describes the job configuration. The /hadoop file names are constructed as follows: job_<job_ID>_conf.xml. These files might of interest to developers rather than system administrators, because their contents are job-specific. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job.
  • Job Statistics: Files are named: <hostname><epoch-of-jobtracker-start(in milliseconds)><job-id>_<job-name> You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job.
  • Standard Error (for a particular task attempt) These logs are stored in: /var/log/hadoop/userlogs/attempt_<job-id><map-or-reduce><attempt-id>. They contain information written to standard error (stderr) captured when a task attempt is run. These logs can be used for debugging. For example, a developer can include System.err.println calls in the job code. The output appears in the standard error files. Note that <map-or-reduce> is either “m” if the task attempt was a mapper, or “r” if the task attempt was a reducer. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job.
  • Standard Out (for a particular task attempt)
  • log4j informational messages from within the task process containing Hadoop internal diagnostic information. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job.

The Hadoop components produce the following logs when installing Hadoop from CDH RPM or DEB packages:

  • On the job tracker:

/var/log/hadoop
/hadoop-* = daemon logs
/job_*.xml = job configuration XML logs
/history
/*_conf.xml = job configuration logs
< everything else > = job statistics logs

  • On the namenode:

/var/log/hadoop
/hadoop-* > daemon logs

  • On the secondary namenode:

/var/log/hadoop
/hadoop-* > daemon logs

  • On the datanode:

/var/log/hadoop
/hadoop-* > daemon logs

  • On the tasktracker:

/var/log/hadoop
/hadoop-* > daemon logs
/userlogs
/attempt_*
/stderr > standard error logs
/stdout > standard out logs
/syslog > log4j logs

Tools, tutorials, and extra features

See: Hadoop Tutorials and Extra Features for a list of tutorial links and an introduction to extra features that can be enabled through configuration.

 Other tips:

  • Don't set up $HADOOP_HOME to be a NFS mount. This causes a denial of service attack on the NFS server.
  • Make sure the domain name service (DNS) in the cluster is configured correctly.
  • RAID is often turned on by default although it isn't recommended for use with Hadoop. You might need to turn it off.

Required Configuration

hadoop-site.xml templates

conf/core-site.xml

Name

Value

Reasoning

fs.default.name

hdfs://<masterhost>:9000

  • Nodes need to know

fs.checkpoint.dir

<comma-separated list of local folders where the secondary namenode stores its images>

  • If the namenode crashes you might need those images for recreating a namenode
  • Default location is inside of /tmp - This is dangerous. Avoid problems by backing up those files or storing them on a NFS share

hadoop.tmp.dir

<local folder where Hadoop deamons store it temporary and based on your default settings some permanent data>

  • Can grow huge in case mapred.local.dir isn't configured
  • username should be removed from the path because this can lead to problems (in case mapred.local.dir isn't configured)

conf/mapred-site.xml

Name

Value

Reasoning

mapred.job.tracker

<masterhost>:9001

  • Nodes need to know

mapred.tasktracker.map.tasks.maximum

<how many map task per task-tracker concurrently>

  • Optimal use of the power of your machines
  • Maximum slots should be slightly more than number of cores, map tasks take the majority of free slots (6 cores > 8 slots > 6 map task max)
  • Sensitive to memory available on machine - per slot <mapred.child.java.opts> of memory has to be available

mapred.tasktracker.reduce.tasks.maximum

<how many reduce task per task-tracker concurrently>

  • Optimal use of the power of your machines
  • Maximum slots should be slightly more then number of cores, reduce tasks take the minority of free slots (6 cores > 8 slots > 2 reduce tasks max)
  • Sensitive to memory available on machine - per slot <mapred.child.java.opts> of memory has to be available

mapred.child.java.opts
(mapred.map.child.java.opts and mapred.reduce.child.java.opts for version >=0.21)

-Xmx500m

Datameer X needs a minimum of 500 MB per task jvm with its default configuration. Interacts with <mapred.tasktracker.map.tasks.maximum> and <mapred.tasktracker.reduce.tasks.maximum>

mapred.local.dir

<comma-separated list of all folders where the big temporary data should go.>

  • Should be distributed over all data partitions with dfs.data.dir

mapred.system.dir

<path in hdfs where small control files are stored>

  • Default location is inside of /tmp - This is dangerous.


conf/hdfs-site.xml

Name

Value

Reasoning

dfs.data.dir

<comma-separated list of all folders where the hdfs data should go.>

  • Should be distributed over all data partitions
  • Default location is inside of /tmp - This is dangerous. Avoid problems by backing up those files or storing them on a NFS share

dfs.name.dir

<comma-seperated list of local folders where the namenode stores it nametable(fsimage) to>

  • Default location is inside of /tmp - This is dangerous. Avoid problems by backing up those files or storing them on a NFS share

dfs.datanode.du.reserved


  • The amount of space datanodes keep free
  • Nodes need a lot of space for tmp data of map-reduce jobs, that should be reserved

Optional Configuration

Name

Value

Default

Influence

Location

dfs.replication


3

Greater replication factor means less disc space but greater data reliability and availability (and a lesser value means the opposite)

hdfs-site.xml

fs.trash.interval

number of minutes deleted files are kept in trash

0 - trash feature is disabled

Provides a rescue time for erroneous deletes

core-site.xml

mapred.map.tasks.speculative.execution

Boolean

true

Slow tasks are re-executed in parallel

mapred-site.xml

mapred.tasktracker.dns.interface

(eth0, eth1,...)

default (first one)

Use internal instead of external interface

mapred-site.xml

dfs.datanode.dns.interface

(eth0, eth1,...)

default (first one)

Use internal instead of external interface

hdfs-site.xml

dfs.permissions

true/false

true

Enables/disables unix-like permissions on hdfs. Enabling the permissions does usually make things harder to work with while its bringing limited advantages (its not so much for securing things but for prohibiting users to mistakenly mess up others users data )

hdfs-site.xml

Tuning Configuration

Name

Recommended value

Default

Influence

Location

mapred.job.reuse.jvm.num.tasks

(-1 is unlimited)

1

When the task jvn is reused between tasks of a job, task-preparation time is saved.

mapred-site.xml

io.sort.mb

>100 (dependent on available memory)

100

amount of buffer memory for sorting in MB

mapred-site.xml

io.sort.factor

50 (for io.sort.mb=100)

10

number of merge streams for sorting (highly dependent on io.sort.mb, each thread has io.sort.mb/io.sort.factor memory available )

mapred-site.xml

dfs.block.size

64MB, 128MB, 512MB

64MB

  • Maximum files in HDFS
  • Split size / data locality (and so performance)

hdfs-site.xml

dfs.datanode.max.xcievers

4096

256

  • upper limit for datanode threads

hdfs-site.xml

io.file.buffer.size

65536

4096

  • buffer for reading/writing to files

core-site.xml

Datameer-Specific Configuration

Name

Recommended Value

Description

Location

mapred.map.tasks.speculative.execution
mapred.reduce.tasks.speculative.execution

false

(warning) Datameer X currently doesn't support speculative execution. However you don't need to configure these properties cluster-wide since Datameer X disables speculative execution for every job it submits. You only need to make sure that these properties aren't set cluster-wide to true with the final parameter set to true as well, not allowing a client to change that property on a job basis anymore.

hdfs-site.xml

mapred.map.output.compression.codec
mapred.output.compression.codec

usually one of:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
com.hadoop.compression.lzo.LzoCodec
com.hadoop.compression.lzo.LzopCodec

Datameer X cares about the what and the when of compression but not about the how, means the codec. It uses the codec you configured for the cluster. If you've configured a non-standard codec like LZO, you have to make sure that Datameer X has access to the codec as well. See Frequently Asked Hadoop Questions#Q. How do I configure Datameer/Hadoop to use native compression?

mapred-site.xml

Resources