Tag Archives: Subprocessing

Use Xargs to Handle split-by Skew in Sqoop

Sqoop

I often use Sqoop for migrating relational data to Hadoop. However, I frequently encounter data skew in my “split-by” column, which generally leads to utilization problems with parallelization. Some mappers might get a lot of work while others may not get any. For example, imagine an “integer” column called “order_date” with data stored as YYYYMMDD (i.e. 20190101). Yes, using integers to store dates is not the best choice (IMHO), but this frequently occurs in the wild. In this situation, if I let Sqoop determine how to allocate the work to 10 mappers for a month of data, say 20190101 to 20190201, Sqoop will incorrectly allocate the mappers as such…

Mapper StartDate EndDate Notes
01 20190101 20190110 10 days of orders
02 20190111 20190120 10 days of orders
03 20190121 20190130 10 days of orders
04 20190131 20190140 01 days of orders
05 20190141 20190150 00 days of orders
06 20190151 20190160 00 days of orders
07 20190161 20190170 00 days of orders
08 20190171 20190180 00 days of orders
09 20190181 20190190 00 days of orders
10 20190191 20190200 00 days of orders

sqoop_allocation

Because Sqoop tries to evenly allocate the split-by keys, mappers 01, 02, and 03 will end up processing most of the daily orders. Mapper 04 will only process one day of orders, and mappers 05 through 10 will not process any orders.

It would be better if I could specifically tell each mapper which records to process. This would allow me to manually divide up the work (thus, avoiding the skew). Since Sqoop doesn’t really have a native way to solve this problem, my first thought was to run Sqoop inside a loop, such that each iteration processed a specific allocation of data splits. For example…

Note… In my use case, each order_date has millions of orders.

However, the problem with this approach is that each iteration of the loop waits for Sqoop to finish before continuing. Although this approach improves mapper utilization by evenly allocating data splits, I now have a parallelization problem, as I’m only running one mapper at a time. In addition to having precise control over data splits, I also need a way to parallelize the work.

To solve the parallelization problem, I thought I would just subprocess each call to Sqoop, which would give me multiple concurrent jobs, each running one mapper as a background job. This can be achieved by adding an ampersand “&” to the end of the call to Sqoop (this forks and runs the command in a separate sub-shell, as an asynchronous job).

Unfortunately, this technique introduces two problems. First, the stdout and stderr for all the jobs get jumbled together. While this is not critically important, I will address this problem later. More importantly, now I have a problem of flooding the source database with too much concurrent work. Every turn of the loop kicks off a background Sqoop job with one mapper. If I have to pull more than a few days of data, the source database will quickly get overwhelmed. So now, in addition to having precise control over splitting the data, and having a way to parallelize multiple jobs, I also need a way of throttling concurrent work.

After a little research on multithreading, subprocessing, and parallelizing bash scripts, I found that xargs can throttle subprocesses. That is, I can arbitrarily limit concurrent xargs subprocesses by setting the “max-procs” argument. When one job completes, another job will auto spawn to take its place until all jobs are complete. In other words, xargs will handle the complexity of tracking, running, and throttling concurrent work. Armed with this new information, I now had all of the pieces needed to control data splits, parallelize jobs, and throttle concurrent work.

The solution I came up with was to define a generic bash function to perform the Sqoop ingest. Using a function encapsulated all the common logic of each iteration and simplified the call to xargs. To this function I would pass the filter criteria (which gives me the custom split-by functionality of Sqoop). Xargs would iteratively call the function using a list of filter criteria. Within xargs, I can specify “max-procs” to limit concurrent work (this emulates the throttling). The following script is a simplified version of my approach. I pass a list of filter criteria to xargs which then passes each item to the “do_work” function. No more than 3 jobs will run concurrently because “max-procs=3″.

After testing this idea, everything seemed to work, except, I noted that all the concurrent jobs were dumping stderr and stdout to the terminal (that is, visually, every job’s terminal output was jumbled together). If something went wrong, it was difficult to tell which job I needed to troubleshoot. What I needed was for each job to redirect its output stream to individual files. Then I could go back and look at each job’s log and troubleshoot any issues. The solution was to use the “tee” command to copy Sqoop’s output streams to files in HDFS (you could, however, write to local disk if you wanted to). The following is a simplified version of my final solution.

Note that I export common variables that will be the same for every Sqoop job (this not required if you define the variables within the do_work function). Also, it goes without saying, please don’t put your Sqoop password in your script.

Although I only allocate one mapper in my function, it is possible to concurrently run Sqoop jobs that have multiple mappers. This would be useful if you had large data splits for a column that also had some significant skew. Just add the “split-by” argument and set “num-mappers” inside your call to Sqoop. However, be careful how high you set “max-procs” because the concurrent load you put on the source database will be (max-procs * num-mappers).

Although this solution doesn’t fix all skew problems for Sqoop, I have tweaked the approach in a few projects, and, so far, I’ve had pretty good success in achieving a better level of utilization of my Sqoop mappers.

One caveat, you end up running multiple application masters (each call to Sqoop is a new map reduce job). However, the inefficiency of multiple application masters is a good trade for wasted resources allocated for multiple mappers that do very little work, or nothing at all.

Hopefully this post gives you another technique for optimizing mapper utilization with Sqoop. If this helped you, or if you have ideas or suggestions, please let me know in the comments.