You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何基于Hadoop MapReduce从文本文件中实现共同好友查找?

嘿,作为刚入坑MapReduce的新手,能通过实战项目来提升技术真的太赞了!针对你要实现的「基于Hadoop的共同好友查找」任务,我来一步步给你拆解思路和代码实现,保证你能快速上手~

核心思路拆解

先给你两种常见的实现思路,新手优先看第一种,逻辑更直观,容易理解:

思路1:基于好友对的交集计算(推荐新手)

  1. Map阶段
    • 读取每一行输入,拆分出用户ID(记为u)和他的好友列表。
    • 遍历好友列表里的每个好友f,为了避免重复计算(比如用户0和1的共同好友,不用同时处理(0,1)(1,0)),我们把用户ID和好友ID按升序排列组成唯一键(比如u < f时键为u,f,否则为f,u)。
    • 输出键为这个有序好友对,值为当前用户u的完整好友列表。
  2. Reduce阶段
    • 同一个有序好友对的所有值(也就是两个用户各自的好友列表)会被分到同一个Reduce任务。
    • 计算这两个列表的交集,这个交集就是这对用户的共同好友。
    • 最终输出格式为用户对\t共同好友列表

思路2:基于共同好友的反向映射

这种思路适合数据量较大的场景,核心是反向推导:

  1. Map阶段:读取每行输入,拆分用户u和好友列表,然后对每个好友f,输出键为f,值为u(表示uf的好友)。
  2. Reduce阶段:对每个键f,拿到所有关注他的用户列表,生成这些用户的两两组合,把f作为他们的共同好友之一,最后汇总每个用户对的所有共同好友。
具体代码实现(Java版本)

Hadoop生态下Java是最常用的开发语言,下面是思路1的完整代码实现:

Mapper类

负责拆分输入,生成有序好友对和对应好友列表:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CommonFriendsMapper extends Mapper<Object, Text, Text, Text> {
    private Text pairKey = new Text();
    private Text friendsList = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 拆分输入行:格式为 <User><TAB><Friends>
        String[] parts = value.toString().split("\t");
        if (parts.length != 2) {
            // 跳过格式错误的行
            return;
        }
        String user = parts[0];
        String[] friends = parts[1].split(",");
        
        // 遍历每个好友,生成有序键避免重复计算
        for (String friend : friends) {
            String sortedPair;
            if (user.compareTo(friend) < 0) {
                sortedPair = user + "," + friend;
            } else {
                sortedPair = friend + "," + user;
            }
            pairKey.set(sortedPair);
            friendsList.set(parts[1]); // 输出当前用户的完整好友列表
            context.write(pairKey, friendsList);
        }
    }
}

Reducer类

负责计算好友列表的交集,输出共同好友结果:

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
    private Text result = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Set<String> commonFriends = new HashSet<>();
        boolean firstList = true;
        
        for (Text val : values) {
            String[] friends = val.toString().split(",");
            Set<String> friendSet = new HashSet<>();
            for (String f : friends) {
                friendSet.add(f);
            }
            
            if (firstList) {
                // 第一个好友列表直接存入集合
                commonFriends.addAll(friendSet);
                firstList = false;
            } else {
                // 后续列表与现有集合取交集,保留共同好友
                commonFriends.retainAll(friendSet);
            }
        }
        
        // 把集合转成逗号分隔的字符串
        StringBuilder sb = new StringBuilder();
        for (String f : commonFriends) {
            if (sb.length() > 0) {
                sb.append(",");
            }
            sb.append(f);
        }
        result.set(sb.toString());
        context.write(key, result);
    }
}

Driver类(主程序入口)

负责配置和提交MapReduce任务:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CommonFriendsDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Common Friends Finder");
        
        // 设置任务相关类
        job.setJarByClass(CommonFriendsDriver.class);
        job.setMapperClass(CommonFriendsMapper.class);
        job.setReducerClass(CommonFriendsReducer.class);
        
        // 设置输出键值类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        // 设置输入输出路径(从命令行参数获取)
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 提交任务并等待完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
输入输出示例

简化版输入:
0 1,2,3
1 0,2,4
2 0,1,5

对应输出:
0,1 2
0,2 1
1,2 0

运行注意事项
  • 确保Hadoop环境已启动(集群模式),或在本地模式下运行(需在Driver类的配置中添加conf.set("mapreduce.framework.name", "local");)。
  • 打包Jar包:先用Hadoop类路径编译代码,再打包:
    javac -cp $(hadoop classpath) CommonFriends*.java
    jar cvf commonfriends.jar CommonFriends*.class
    
  • 提交任务命令:
    hadoop jar commonfriends.jar CommonFriendsDriver /input/path /output/path
    
    注意:/output/path必须是不存在的目录,Hadoop会自动创建。

内容的提问来源于stack exchange,提问作者Damian

火山引擎 最新活动