Hello. During this week, we discuss real world applications, and dive deep into different big data technologies and algorithms. Put your masks on, get the snorkels. Here we go. In this lesson, I will show you how to analyze telecommunication activity, and how to efficiently tackle different technical problems that may arise. Let us consider the following dataset, which is publicly available on the Dandelion website. Quite a name for an engineering site. Well done, marketing experts. You have ten-minute intervals, aggregations of data in different locations over the city of Milan. The aggregated data includes the amount of SMS sent and received, the number of calls made, and the amount of Internet traffic consumed. You may wonder, what is the most talkative spot of the city? The spot you think? Ladies, no offense. Still, we are going to validate the following hypothesis. Is it true that people who live in at the north of the city make twice more calls than people who live in the south? Your data contains Square ID for each aggregated data. Square ID is an identificator of a polygon under the map of Milan. Therefore, you need to join your data with spacial data to validate the hypothesis. You have a big dataset of telecommunication data, and a small dataset of spacial data. Let us solve this problem with Hadoop MapReduce. I am going to start with the simplest possible solution, and elaborate it to make it more efficient and scalable. When you have a big dataset, you don't have wide range of choices. On the mapper, you should read a piece of telecommunication data. Then you should somehow join this piece of data with special data stored in HDFS. No worries, you can read this data directly from your streaming script. Here on the slide, you see the Python streaming script that does this job. There is a special function to read the data from HDFS, and to load the Milan grid into memory. You call this function at the beginning of the script, and then you iterate over the lines of input with telecommunications data. For each line of input, you join this data by the grid ID with special data. So you can find out if this statistics is related to the north or the south. After running this application, you will see the following output. And all that happens within the map phase is of any data transferred between a mapper and a reducer. But what are the possible drawbacks of this solution? Usually, you have data stored with a factor of three replication. So, it means that you have to transfer all of this data over the network to each mapper. The letter usually corresponds to the number of cores on the machine. Do you know how to make the solution more efficient? Of course, you do. The time spent for this course is a well spent time. Here, you have to use a distributed cache. This data will be replicated to each node once, and will be available locally for each member. Therefore, you will dramatically reduce the and increase the level of data locality. In your script, you have to change the way you access data. Instead of reading it from HDFS, you will read it from the local file system. Errors in ALT in the script is left unchanged. When you write it again, you will see the same output. Please bear in mind the API of parsing an HDFS file to a distributed cache. You have to prefix the path with HDFS. And of course, you should expect the lower overall map face time. This approach which uses the distributed cached lot, small data into memory is called a Map-Side Join. Imagine that your telecommunication company has grown, and you have to aggregate that over thousands of cities. You also have a bad equipment to locate grids. So your spacial data is more granular and not small anymore. You need to find a way to join several big datasets. As you could have guessed, if you have a Map-Side Join, then there should be a Reduce-Side Join. It is exactly what I'm going to show you. In this slide, you see two big datasets, A and B. During the map phase, you do nothing except parsing data into key value pairs. Then during the shuffle and sort phase, data is distributed by keys in a way that allows to perform the join during the reduce phase. The first question you may have is, how do you differentiate between two datasets in the reducer script? The answer is simple. You know the input block location during the map phase, so you can tag into record appropriately. Here, I read the environment variable, maproduce map read input file, and tag into record of spacial data by the word, grid. Or other records, I tag by the word, logs. When you run at this mapper script, you will get labels for each record into the output. The first column is square ID, the second is a label, and the last is a value. As you can see, the value has different types for different labels. It is a string for spacial data, and it is numeric for logs records. Let us add the shuffle and sort phase to see how data will be distributed over the reducers. For some square IDs, you will see the grid records at the end. For some square IDs, you will see the grid records at the beginning. And for others, you can find them in the middle of logs records. If you would like to join the grid and log records for each square ID, then you should store in memory all the data for a specific square ID. It is a working solution, but there is no guarantee that you can store all the log records in memory for each square ID. Your data can be skewed. For example, if you will not able to locate square ID for a call, then you should store null in the record. If you have quite a big number of records with nulls, for example, 10%, then this solution will try to log 10% of a distributed dataset in the memory on one machine. You do understand it is a bad idea, don't you? You need to sort data by attack under reducer. This way, you can store in memory square ID, which is a location on the map. For example, the south or the north, and then iterate over the log records with the same square ID. The only problem is that in MapReduce, you can only reduce data by keys. You need to be careful here to make no mistakes. Your key is complex, and consists of two strings separated by the tab corrector. Square ID and label. You partition data only by square ID as in the previous examples, but you sort data by both of them. This technique is called Secondary Sort. Just as a reminder, if you need to sort your data in a different order, then you can use a key field based comparator available in the streaming jar. Now, your attention ladies and gentlemen. The final command to run the Reduce-Side Join using the Secondary Sort together with the corresponding output you can see in this slide. Let me go through the reducer script to close the loop on this subject. You iterate over this tender output line by line. If you read label, then you output the previously collected statistics if there is any. Otherwise, you just accumulate values for the current grid. In your case, it is an average number of text messages received by a person in this region of a period of ten minutes. In this video, you can learn several best practices to do joins. If you have a small dataset, then you can build from a Map-Side Join with the help of distributed cache. If you have several big datasets, then you can perform a Reduce-Side Join. You can use and configure Secondary Sort to reduce the memory footprints. In this case, you should take into consideration the number of records in different datasets for each key.