如何基于Hadoop MapReduce从文本文件中实现共同好友查找?
嘿,作为刚入坑MapReduce的新手,能通过实战项目来提升技术真的太赞了!针对你要实现的「基于Hadoop的共同好友查找」任务,我来一步步给你拆解思路和代码实现,保证你能快速上手~
核心思路拆解
先给你两种常见的实现思路,新手优先看第一种,逻辑更直观,容易理解:
思路1:基于好友对的交集计算(推荐新手)
- Map阶段:
- 读取每一行输入,拆分出用户ID(记为
u)和他的好友列表。 - 遍历好友列表里的每个好友
f,为了避免重复计算(比如用户0和1的共同好友,不用同时处理(0,1)和(1,0)),我们把用户ID和好友ID按升序排列组成唯一键(比如u < f时键为u,f,否则为f,u)。 - 输出键为这个有序好友对,值为当前用户
u的完整好友列表。
- 读取每一行输入,拆分出用户ID(记为
- Reduce阶段:
- 同一个有序好友对的所有值(也就是两个用户各自的好友列表)会被分到同一个Reduce任务。
- 计算这两个列表的交集,这个交集就是这对用户的共同好友。
- 最终输出格式为
用户对\t共同好友列表。
思路2:基于共同好友的反向映射
这种思路适合数据量较大的场景,核心是反向推导:
- Map阶段:读取每行输入,拆分用户
u和好友列表,然后对每个好友f,输出键为f,值为u(表示u是f的好友)。 - Reduce阶段:对每个键
f,拿到所有关注他的用户列表,生成这些用户的两两组合,把f作为他们的共同好友之一,最后汇总每个用户对的所有共同好友。
具体代码实现(Java版本)
Hadoop生态下Java是最常用的开发语言,下面是思路1的完整代码实现:
Mapper类
负责拆分输入,生成有序好友对和对应好友列表:
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CommonFriendsMapper extends Mapper<Object, Text, Text, Text> { private Text pairKey = new Text(); private Text friendsList = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 拆分输入行:格式为 <User><TAB><Friends> String[] parts = value.toString().split("\t"); if (parts.length != 2) { // 跳过格式错误的行 return; } String user = parts[0]; String[] friends = parts[1].split(","); // 遍历每个好友,生成有序键避免重复计算 for (String friend : friends) { String sortedPair; if (user.compareTo(friend) < 0) { sortedPair = user + "," + friend; } else { sortedPair = friend + "," + user; } pairKey.set(sortedPair); friendsList.set(parts[1]); // 输出当前用户的完整好友列表 context.write(pairKey, friendsList); } } }
Reducer类
负责计算好友列表的交集,输出共同好友结果:
import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> { private Text result = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<String> commonFriends = new HashSet<>(); boolean firstList = true; for (Text val : values) { String[] friends = val.toString().split(","); Set<String> friendSet = new HashSet<>(); for (String f : friends) { friendSet.add(f); } if (firstList) { // 第一个好友列表直接存入集合 commonFriends.addAll(friendSet); firstList = false; } else { // 后续列表与现有集合取交集,保留共同好友 commonFriends.retainAll(friendSet); } } // 把集合转成逗号分隔的字符串 StringBuilder sb = new StringBuilder(); for (String f : commonFriends) { if (sb.length() > 0) { sb.append(","); } sb.append(f); } result.set(sb.toString()); context.write(key, result); } }
Driver类(主程序入口)
负责配置和提交MapReduce任务:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CommonFriendsDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Common Friends Finder"); // 设置任务相关类 job.setJarByClass(CommonFriendsDriver.class); job.setMapperClass(CommonFriendsMapper.class); job.setReducerClass(CommonFriendsReducer.class); // 设置输出键值类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入输出路径(从命令行参数获取) FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务并等待完成 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
输入输出示例
简化版输入:
0 1,2,3
1 0,2,4
2 0,1,5
对应输出:
0,1 2
0,2 1
1,2 0
运行注意事项
- 确保Hadoop环境已启动(集群模式),或在本地模式下运行(需在Driver类的配置中添加
conf.set("mapreduce.framework.name", "local");)。 - 打包Jar包:先用Hadoop类路径编译代码,再打包:
javac -cp $(hadoop classpath) CommonFriends*.java jar cvf commonfriends.jar CommonFriends*.class - 提交任务命令:
注意:hadoop jar commonfriends.jar CommonFriendsDriver /input/path /output/path/output/path必须是不存在的目录,Hadoop会自动创建。
内容的提问来源于stack exchange,提问作者Damian




