The use of broadcast variables – which province is it querying through IP?

1,Why broadcast variables?

To give a simple example, we have to deal with a log file with IP address.

20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&s

We can get IP through segmentation. Now we are asked to get the IP province from this IP. The IP rules are as follows (a small part):

1.0.1.0|1.0.3.255|16777472|16778239|Asia, China, Fujian, Fuzhou, China Telecom |350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|Asia, China, Guangdong, Guangzhou, China Telecom |440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|Asia, China, Guangdong, Guangzhou, China Telecom |440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|Asia, China, Fujian, Fuzhou, China Telecom |350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|Asia, China, Fujian, Fuzhou, China Telecom |350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|Asia, China, Guangdong, Guangzhou, China Telecom |440100|China|CN|113.280637|23.125178

ipRule processing

//Converting IP to decimal system

 

def ip2Long(ip: String): Long = {
  val fragments = ip.split("[.]")
  var ipNum = 0L
  for (i <- 0 until fragments.length) {
    ipNum = fragments(i).toLong | ipNum << 8L
  }
  ipNum
}
//Dichotomy for fast matching IP rules
def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = {
  var low = 0
  var high = lines.length - 1
  while (low <= high) {
    val middle = (low + high) / 2
    if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
      return middle
    if (ip < lines(middle)._1)
      high = middle - 1
    else {
      low = middle + 1
    }
  }
  -1
}

 


 

1.Question Description:

The amount of data in the future may be very large, so the IP rules must be stored in HDFS, so the corresponding Task will be started according to the number of slices you read, but the data slices may not contain all the IP rules, and then the IP obtained by the log file you process will not be found.It’s the province. This led to problems. So now every Task is required to get all the IP rules. But the data of IP rules are stored in slices. How can Task get all the IP rules? At this point, we need to pull the IP rule of each slice to Spark Sub.The MIT (Driver) end is then sent to each Executor in the form of broadcast variables, and each Executor holds a complete IP rule so that Task can pull out the I in the Executor while processing log file data.P rules.

Advantages of broadcast variables

The benefit of broadcasting variables is not that each task has a copy of the variable, but that it becomes an executor for each node. In this case,
It allows the copy produced by variables to be greatly reduced.

Usage of broadcast variables

Broadcast variables are very simple.
In fact, it is the broadcast () method of SparkContext, which is passed to the variables you want to broadcast.

final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = 
sc.broadcast(fastutilDateHourExtractMap);

When broadcasting variables
Directly calling value () / getValue (Broadcast) of broadcast variable (type)
You can get the broadcast variables before encapsulation.

Broadcast variables, at the beginning, have a copy on Drvier.

taskAt runtime, if you want to use the data in the broadcast variable, you will first be in your local Executor corresponding BlockManager.
Try to get a copy of the variable; if not locally, pull the copy of the variable remotely from Driver and save it in the local BlockManager;
Since then, task on this executor will directly use copies of local BlockManager.
executorIn addition to pulling from the driver, BlockManager can also pull copies of variables from BlockManager on other nodes.
HttpBroadcast     TorrentBroadcast(Default)

BlockManager

Manage the memory and disk data for an Executor and try to find a map in the local BlockManager

for instance
50One executor, 1000 task. A map, 10M.
By default, 1000 task, 1000 copies. 10G data, network transmission, in cluster, consume 10G memory resources.
If broadcast variables are used. 50 execurtor, 50 copies. 500M data, network transmission,
And it’s not necessarily all transmitted from Driver to each node, it’s probably the bockmanager near the executor from the nearest node
A copy of the variable is pulled up, the speed of network transmission is greatly increased, and the memory consumption of 500M is increased.
10000M,500M,20Double. 20 times more than the network transmission performance consumption reduction; 20 times the reduction of memory consumption.
It is still objective to improve and influence the performance.
Although it may not necessarily play a decisive role in performance. For example, running spark jobs for 30 minutes may be done after broadcasting variables.
It’s 2 minutes faster, or 5 minutes. But every little bit of adjustment makes a small contribution. Finally, it will be effective.

Spark operation without any tuning means, 16 hours; three boards axe down, you can go to 5 hours;
And then a very important tuning, especially big impact, shuffle tuning, 2 to 3 hours; more than 10 performance tuning techniques were applied
,JVM+Radio, 30 minutes. 16 hours ~30 minutes.

In the end, let’s do it and how to do it. That is to make dateHourExtractMap a broadcast variable Broadcast.

Leave a Reply

Your email address will not be published. Required fields are marked *