At NITS Solutions, we’ve been storing all our data in Oracle from day one, and for many years it did a great job. As our business continued to grow however, Oracle started to creak and groan under the weight of so much data processing. Pending jobs were piling up as new products, re-summarizations, and analyst queries continued to increase. We had to find a way to lessen the demands on our database by offloading a lot of tasks to a new system. This is what spurred our investigation into implementing data lake technologies.
Our goal is not to entirely replace Oracle, but instead ease the burden placed on it. At minimum we need to provide an environment that is up-to-date and accessible to all of our analysts so that they can move their queries and data dumps away from Oracle. This should free up quite a bit of processing for more mission-critical tasks. Furthermore, the solution should leave open the possibility of moving some of our summarization jobs away from Oracle using Spark instead. These jobs have been the most taxing on our database and having the option to execute them elsewhere in the future would be a big win.
The above goals are essential and if we cannot achieve them the project will be a failure. That being said, there are some additional considerations to keep in mind as well:
1. We run a large part of our infrastructure using AWS, so any solution should integrate their services if at all possible.
2. We have a couple in-house resources who know Scala, but many more who know Python.
3. Only a couple team members have previous exposure to data lake technology, so the more we can ease the rest of the team into this process the better.
4. Speed is always a plus, but since the data is not real-time we can afford to be a bit more relaxed in this department.
With those goals in mind, we settled on implementing a strategy to re-create our Oracle tables in AWS S3 buckets, using Athena as our query engine. This would a) provide our analysts with a location to query against outside of the database, and b) create table files in a data lake format that could be leveraged for computation in an EMR cluster. The rest of the data processing (loading, cleaning, rule application, etc.) would remain in Oracle.
Moving Data from Oracle
The easiest way to retrieve data from a relational database would be to use a powerful Hadoop-based tool known as Sqoop. With Sqoop you can query a database using one of several different connection types (odbc, jdbc, etc.) and define multiple workers to retrieve the data in parallel. Sounds great right? It is, but in our case it’s suboptimal. The whole point of doing this project in the first place was to decrease the load of queries on our database. We also wanted to keep our tech overhead low and using Sqoop would force us to install and manage Spark/HDFS on our machine.
Instead we worked with our database architects to track fact tables of interest. Every time a certain dealership (NITS provides data warehousing in the automotive space) changed their data for a given month in one of these tables, we would have a procedure to dump all of the data for that dealer and month into a csv file and upload it to an S3 bucket.
In order to get all of these files properly reorganized and reformatted, we had to create a program to automate this process. While there are many different ways this could be done (AWS CLI in Bash, AWS SDK in Scala/Java, etc), we decided to write a program in Python using the boto3 package to connect to AWS and the fastparquet package to convert from csv to parquet format. Multiprocessing was also used to speed up our execution time by taking advantage of parallelism without having to spin up an EMR cluster.
While this may not be the most powerful solution, it fits in well with the skill sets of our team and is a lean process. A single python file can be deployed to any server, scheduled as a cron job, and run daily to produce the desired tables in Athena. This makes for a very simple process that many people on our team will be able to troubleshoot and modify immediately.
As you may recall from Part II: Understanding the Data Lake Tech Stack, choosing the right file type is a very important decision. We had two major considerations when deciding on a file type:
1. It should be schema-based so that our analysts could have an easy transition from querying our database to querying the data lake
2. The whole point of creating this was to provide a high-speed query engine for our analysts to use. Columnar file types would therefore work best, as queries typically only require a fraction of the columns in a table.
With those two points in mind, creating schema files and saving everything in parquet format made the most sense. Our program would then reference the schema files we create and apply those data types as we convert each group of csv files to a singular parquet file. Parquet files stored on S3 will be very quick to query against given the fact that it is a columnar file type. We also decided to compress it using gzip for lower storage costs.
The final product shouldn’t have very many files, as they will a) be hard to track and b) slow our query engine down if it has to open a ton of files for each query. We also wanted to make this process modular, so that we were changing as little data as possible and weren’t re-creating full tables every day. Keep in mind that we tend to get frequent data updates for recent months and very infrequent updates otherwise.
With that being said, for every table we decided to group each dealer-month csv file with other files for that month that had the same ending digit of their dealer code (it’s a unique ID for all dealers). For example, if we had three years of data for a parts sales table, we’d have 360 total data files (36 months * 10 possible digits a dealer code can end with). By grouping on the last digit of the dealer code we end up with files that have close to the same amount of data, which could be handy if we want to apply parallel processing in the future.
Check out the table below for a better intuition of the organization. Notice how only rows 1 and 3 would end up in the same file.
The last thing we’ll have to do is upload our files to a location that is accessible to everyone who wishes to access our data lake. For this we created an S3 bucket that has a folder for each table we were re-creating. That folder would have 36 different month folders each with 10 different files for the last digit of the dealer code. Each day we can now process the parts sales for all dealers and replace only those parquet files that need to be edited. So, if we get data for all dealers for the current month only, we would only need to replace 10 of the 360 files. If one dealer replaces all their historical data, we would only need to replace 36 files, one for each of the 36 months. Though this methodology is a bit unconventional, it greatly speeds up processing time while still keeping a low file count.
Now that we have a well-organized folder structure in place, getting everything to play nicely with Athena is a breeze. It’s fairly straightforward to create table definitions in Athena and point the table towards pre-existing folder locations in S3. Since we already have config files that contain our table schemas, creating table definitions is made even easier.
Using parquet files is one way to speed up querying in Athena, but there’s another trick you can use to see large increases in performance: partitions. By setting up partitions, you can tell Athena to only search in certain locations whenever you submit a query. This will greatly reduce the time it takes to fetch query results by reducing the amount of data that has to be scanned. We elected to partition by month, as that is a very common filter for us to use in a query. Always think about how you will want to partition your data when deciding what your folder structure will be. Partitions should be commonly used in queries and you should aim to have them reduce the amount of data Athena has to sort through as much as possible.
Wrapping It Up
Our end product was an S3 location for each table filled with useful partitions and optimized file types. This can now be efficiently queried against by data analysts via an AWS Athena connection, or accessed programmatically from the AWS CLI, SDKs, or other clients with AWS connectivity. If we wish to run heavy-duty jobs using a Spark cluster, that’s now a breeze as well. We can spin up an AWS EMR Cluster and either download all of the parquet files to the HDFS on our cluster or consume them remotely via the AWS SDK for Java. Using spark-submit we can then run Scala or Java programs on the cluster to process any of the data in our lake in parallel.
That concludes our three-part series on data lakes. Thanks for reading and if you have any questions, drop a comment below and I’ll do my best to help out.