Category Archives: Linux

Use Xargs to Handle split-by Skew in Sqoop

Sqoop

I often use Sqoop for migrating relational data to Hadoop. However, I frequently encounter data skew in my “split-by” column, which generally leads to utilization problems with parallelization. Some mappers might get a lot of work while others may not get any. For example, imagine an “integer” column called “order_date” with data stored as YYYYMMDD (i.e. 20190101). Yes, using integers to store dates is not the best choice (IMHO), but this frequently occurs in the wild. In this situation, if I let Sqoop determine how to allocate the work to 10 mappers for a month of data, say 20190101 to 20190201, Sqoop will incorrectly allocate the mappers as such…

Mapper StartDate EndDate Notes
01 20190101 20190110 10 days of orders
02 20190111 20190120 10 days of orders
03 20190121 20190130 10 days of orders
04 20190131 20190140 01 days of orders
05 20190141 20190150 00 days of orders
06 20190151 20190160 00 days of orders
07 20190161 20190170 00 days of orders
08 20190171 20190180 00 days of orders
09 20190181 20190190 00 days of orders
10 20190191 20190200 00 days of orders

sqoop_allocation

Because Sqoop tries to evenly allocate the split-by keys, mappers 01, 02, and 03 will end up processing most of the daily orders. Mapper 04 will only process one day of orders, and mappers 05 through 10 will not process any orders.

It would be better if I could specifically tell each mapper which records to process. This would allow me to manually divide up the work (thus, avoiding the skew). Since Sqoop doesn’t really have a native way to solve this problem, my first thought was to run Sqoop inside a loop, such that each iteration processed a specific allocation of data splits. For example…

Note… In my use case, each order_date has millions of orders.

However, the problem with this approach is that each iteration of the loop waits for Sqoop to finish before continuing. Although this approach improves mapper utilization by evenly allocating data splits, I now have a parallelization problem, as I’m only running one mapper at a time. In addition to having precise control over data splits, I also need a way to parallelize the work.

To solve the parallelization problem, I thought I would just subprocess each call to Sqoop, which would give me multiple concurrent jobs, each running one mapper as a background job. This can be achieved by adding an ampersand “&” to the end of the call to Sqoop (this forks and runs the command in a separate sub-shell, as an asynchronous job).

Unfortunately, this technique introduces two problems. First, the stdout and stderr for all the jobs get jumbled together. While this is not critically important, I will address this problem later. More importantly, now I have a problem of flooding the source database with too much concurrent work. Every turn of the loop kicks off a background Sqoop job with one mapper. If I have to pull more than a few days of data, the source database will quickly get overwhelmed. So now, in addition to having precise control over splitting the data, and having a way to parallelize multiple jobs, I also need a way of throttling concurrent work.

After a little research on multithreading, subprocessing, and parallelizing bash scripts, I found that xargs can throttle subprocesses. That is, I can arbitrarily limit concurrent xargs subprocesses by setting the “max-procs” argument. When one job completes, another job will auto spawn to take its place until all jobs are complete. In other words, xargs will handle the complexity of tracking, running, and throttling concurrent work. Armed with this new information, I now had all of the pieces needed to control data splits, parallelize jobs, and throttle concurrent work.

The solution I came up with was to define a generic bash function to perform the Sqoop ingest. Using a function encapsulated all the common logic of each iteration and simplified the call to xargs. To this function I would pass the filter criteria (which gives me the custom split-by functionality of Sqoop). Xargs would iteratively call the function using a list of filter criteria. Within xargs, I can specify “max-procs” to limit concurrent work (this emulates the throttling). The following script is a simplified version of my approach. I pass a list of filter criteria to xargs which then passes each item to the “do_work” function. No more than 3 jobs will run concurrently because “max-procs=3″.

After testing this idea, everything seemed to work, except, I noted that all the concurrent jobs were dumping stderr and stdout to the terminal (that is, visually, every job’s terminal output was jumbled together). If something went wrong, it was difficult to tell which job I needed to troubleshoot. What I needed was for each job to redirect its output stream to individual files. Then I could go back and look at each job’s log and troubleshoot any issues. The solution was to use the “tee” command to copy Sqoop’s output streams to files in HDFS (you could, however, write to local disk if you wanted to). The following is a simplified version of my final solution.

Note that I export common variables that will be the same for every Sqoop job (this not required if you define the variables within the do_work function). Also, it goes without saying, please don’t put your Sqoop password in your script.

Although I only allocate one mapper in my function, it is possible to concurrently run Sqoop jobs that have multiple mappers. This would be useful if you had large data splits for a column that also had some significant skew. Just add the “split-by” argument and set “num-mappers” inside your call to Sqoop. However, be careful how high you set “max-procs” because the concurrent load you put on the source database will be (max-procs * num-mappers).

Although this solution doesn’t fix all skew problems for Sqoop, I have tweaked the approach in a few projects, and, so far, I’ve had pretty good success in achieving a better level of utilization of my Sqoop mappers.

One caveat, you end up running multiple application masters (each call to Sqoop is a new map reduce job). However, the inefficiency of multiple application masters is a good trade for wasted resources allocated for multiple mappers that do very little work, or nothing at all.

Hopefully this post gives you another technique for optimizing mapper utilization with Sqoop. If this helped you, or if you have ideas or suggestions, please let me know in the comments.

Monitoring Temperature With Raspberry Pi

DS18B20TrendData4

The Problem:

I recently remodeled my home office and I now have a dedicated closet for my electronics (Server, NAS, AV Receiver, etc.) During the build I planned for heat remediation by installing an exhaust fan that dumps air from the closet into my adjoining office. However, the temperature in the closet hovers around 90°F (32°C), even with the fan on. Although this temperature is within hardware thresholds, it’s a bit warmer than I would prefer. To get a better understanding of my heat dissipation needs, I decided to monitor and record temperature fluctuations over several days to see what temperature ranges I was experiencing.

Monitoring temperature levels is a perfect project for the Raspberry Pi. I have used an analog TMP36GZ low voltage temperature sensor before in an Arduino project but this would be my first attempt at using the Raspberry Pi’s GPIO pins. Unfortunately, after a bit of research, I discovered that my analog temperature sensor wouldn’t work with the Raspberry Pi’s “digital only” IO pins. While I could have prototyped a solution using an ADC and some spare components, I really wanted a simple build so I could just start coding on the Pi.

The solution to my problem was a DS18B20 Digital Temperature Sensor IC which I found on Amazon.com. The DS18B20 uses the 1-Wire communication bus which is perfect for the BCM GPIO4 pin (PIN 7) on the Raspberry Pi. Other caveats, you can work with the DS18B20 from the Linux terminal, and you can connect multiple 1-Wire devices, in series, to PIN 7.

The Build:

I had some spare CAT5e cable so I stripped and soldered 3 wires to the three pins on the sensor – orange for +3.3v, brown for ground, and green for data. Also, the DS18B20 requires a pull-up resistor between the power and data leads.

DS18B20 CAT5e connection

Then, I used electrical tape to insulate the exposed areas and I shrink-wrapped everything to protect the connections.

DS18B20 Shrink-wrapped

To the other end of the CAT5e cable I attached three female jumper wire cable housing connectors. These will principally be used for quick connections to a splitter rather than connecting directly to the Pi because I need to connect several devices to a single pin (specifically PIN7 for 1-wire).

DS18B20 Female Connectors

Next, I manufactured three tiny Y-splitters (2 male to 1 female) to join the VDD, DQ, and GND lines from 2 sensors before connecting to the Pi.

DS18B20 Y-Splitter

Finally, I made a second sensor and attached both to the Raspberry Pi using the following arrangement.
DS18B20 Schematic

Here is the finished build. Note the three splitters are plugged into PIN1 (orange/3.3v), PIN6 (brown/GND), and PIN7 (green/data).

DS18B20FinishedBuild

The Code:

After connecting the DS18B20’s to the Raspberry Pi, you can interact with the devices using the below terminal commands. Note, your device IDs will be specific to your 1-Wire devices. In my case, my devices are 28-0000055f311a and 28-0000055f327d.

Here is what my terminal window looks like after running the above.

Click for larger image
DS18B202Terminal

In the terminal window above, the interesting information from reading from the device is the first line ending with “YES” – which means no errors – and the second line ending with “t=” followed by the temperature in thousandths of degrees Celsius (°C * 1000). Using this information, we can divide the temperature by 1000 to get Celsius and convert to Fahrenheit using…
°F = °C * (9/5) + 32.

Once we verify that the DS18B20 is operating correctly using terminal commands, we can construct a python script to read from the devices and write the temperature to a CSV file. Later we will plot the temperature readings to see how much they fluctuate over time.

Once created, you can run the above Python script from the terminal by using the following command.

Running the above terminal command a few times produces output, like the below, in a CSV file. Note that I have 2 Temperature sensors connected to the Pi. One is hanging in the center of the top of the closet, measuring ambient temperature, and the other is attached to the hottest exterior surface on my server.

Device TimeStamp Temp °F
28-0000055f311a 2014-05-21 22:21:02.585486 80.9366
28-0000055f327d 2014-05-21 22:21:02.585486 119.3
28-0000055f311a 2014-05-21 22:21:09.331944 81.05
28-0000055f327d 2014-05-21 22:21:09.331944 119.4116
28-0000055f311a 2014-05-21 22:21:13.082604 81.05
28-0000055f327d 2014-05-21 22:21:13.082604 119.3
The Schedule:

Now that we have a script to collect temperature data and save it to a CSV file, we need to schedule it to run periodically. In my case, I wanted to run the script every minute of every day for several days. To schedule the python script, we can use crontab from the terminal.

While in edit mode, add a crontab job using the following syntax. Note that this command should be all on one line in crontab.

The five asterisks (*) mean to run the job every minute, every hour, every day of month, every month, and every day of the week. The part about “> /dev/null 2>&1″ just means don’t save the output or errors in a log and don’t show anything on any screens (i.e. run silently even if errors occur).

The Results:

After a few days of collecting data, I can plot the results to see how effective I am at dissipating heat from my closet. Below is a chart of the output after running the Python script, via crontab, for several days.

DS18B20TrendData3

Now that I have a baseline, I can experiment with different heat dissipating methods to find what works best to keep my electronics cool. When I’m done monitoring my electronics closet, I can see myself redeploying the rig to other projects such as attics, crawl spaces, automotive projects, and mini-fridge hacking. Let me know in the comments if you come up with your own temperature monitoring projects.

Running Linux Commands from PowerShell.

In my lab, I occasionally need to automate maintenance tasks that involve Windows and Linux systems. For example, I need to backup Windows directories to a Linux-based NAS device, compress and decompress files, delete old backups, etc. Sometimes, what I need to do is run SSH commands from PowerShell in a dynamic way. I found some examples online but they only ran one command at a time. For me, it would be better if I could dynamically create a set of commands; then have those all run consecutively in one SSH call.

To do this, first you need to define the statements you want to run in an array. In my case, I wanted something dynamic so I came up with the following.

Basically, the above commands will display the Linux distribution release info, change the working directory, print the working directory, unzip a file, and then remove the zip file. Note the “;” after each command is required. Alternatively, you can use “and list” (&&) or “or list” (||) instead of “;” if you understand how they work.

Now that I have the SSH commands that I want to run, how do I pass them to Linux? Manually, when I want to remotely connect to Linux in an interactive way, I use PuTTY. However, by itself, PuTTY doesn’t have a Windows command-line interface. Thankfully, the makers of PuTTY released Plink, aka “PuTTY Link”, which is a command-line connection tool for PuTTY. Armed with this new information, I downloaded Plink to the same directory as PuTTY and added an alias to my PowerShell script.

Now that I have an alias for Plink, I can pass my array of SSH commands directly to my Linux machine in one line of code.

One thing that is nice about this approach, the output of the SSH commands are displayed in the PowerShell console. That way, you can see if any Linux-based warnings or errors occur.

In the above example, I’ve added my user name and password as parameters in the command-line. Obviously, in a production environment this is not desirable. You can get around this by using public keys for SSH authentication. For more information, check out PuTTY’s help documentation. At the time of this writing, Chapter 8 covered how to set up public keys for SSH authentication.

Here is the finished script.

Some notes worth sharing… Initially, my instinct told me that zipping a large directory locally on the NAS device would be faster than trying to remotely zip the files from my Windows PC. I assumed the network overhead of downloading the files and then uploading the compressed archive back to the NAS would be a bottleneck. In fact, in my case, it was faster to do it remotely from Windows. This is because the limited RAM and CPU for my consumer grade NAS device were quickly overwhelmed by the compression task. My Windows box, with a dual core CPU, 4GB RAM, a Gigabit NIC, and an SSD could compress the files faster than the NAS device despite having to send the data over the network both ways. Some tasks, such as deleting large directories were significantly faster when ran locally on the NAS. Therefore, you will have to experiment to find out what works best for you.