ClickStreamPageView Java 代码分析: 从日志文件生成页面访问数据
ClickStreamPageView Java 代码分析: 从日志文件生成页面访问数据
该 Java 代码片段 ClickStreamPageView 使用 Hadoop MapReduce 框架,从日志文件解析页面访问信息,生成包含用户、时间、页面、步骤、停留时长、来源页面、浏览器信息等数据的结构化数据。
代码结构:
ClickStreamMapper类:负责读取日志文件,将每行日志解析为WebLogBean对象,并根据用户 IP 地址分组。ClickStreamReducer类:接收每个用户的所有访问记录,根据时间顺序排序,并根据时间间隔判断用户访问的不同页面,为每个页面标号步骤,最后输出结构化数据。
代码解析:
public class ClickStreamPageView {
static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
Text k = new Text();
WebLogBean v = new WebLogBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
if (fields.length < 9) return;
// 将切分出来的各字段 set 到 weblogbean 中
// fields[0].equals("true")
v.set('true'.equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
// 只有有效记录才进入后续处理
if (v.isValid()) {
// 此处用 ip 地址来标识用户
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
// 先将一个用户的所有访问记录中的时间拿出来排序
try {
for (WebLogBean bean : values) {
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
beans.add(webLogBean);
}
// 将 bean 按时间先后顺序排序
Collections.sort(beans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下逻辑为:从有序 bean 中分辨出各次 visit,并对一次 visit 中所访问的 page 按顺序标号 step
* 核心思想:
* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个 session
* 否则,就属于不同的 session
*
*/
int step = 1;
String session = UUID.randomUUID().toString();
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果仅有 1 条数据,则直接输出
if (1 == beans.size()) {
// 设置默认停留时长为 60s
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止 1 条数据,则将第一条跳过不输出,遍历第二条时再输出
if (i == 0) {
continue;
}
// 求近两次时间差
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
if (timeDiff < 30 * 60 * 1000) {
v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将 step 重置,以分隔为新的 visit
v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
System.out.println("0-0-0-0-1" + session);
// 输出完上一条之后,重置 step 编号
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍历的是最后一条,则将本条直接输出
if (i == beans.size() - 1) {
// 设置默认停留市场为 60s
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
}
private String toStr(Date date) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamPageView.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\input\\*"));
FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\output"));
// 设置 reduce tsk 的个数为 0,意味着没有 reduce 阶段
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
可能出现问题:
- 输入数据格式问题: 检查输入数据是否符合预期,以及是否存在缺失、错误、重复等问题。
- 路径问题: 确认输入路径和输出路径是否正确,是否包含空格、特殊字符等。
- 权限问题: 确保程序有权限访问输入文件和输出目录。
WebLogBean类: 检查WebLogBean类中的isValid方法,确保它能够正确判断日志数据是否有效。BeanUtils类: 检查BeanUtils类是否被正确引入,并确保它能够正确复制数据。- 数据量问题: 如果数据量过大,可能需要调整 MapReduce 任务的配置,比如增加 Mapper 和 Reducer 的数量,或使用更高效的算法。
解决问题的方法:
- 调试 Mapper: 在
map方法中添加日志输出,查看是否正确读取和解析数据。 - 调试 Reducer: 在
reduce方法中添加日志输出,查看是否正确接收 Mapper 的输出,以及是否正确生成最终结果。 - 使用调试工具: 可以使用 Hadoop 的调试工具,比如
yarn logs命令,查看任务执行过程中的日志,帮助排查问题。
建议:
- 添加日志输出: 在关键代码段添加日志输出,记录程序执行过程中的状态和数据,帮助排查问题。
- 使用调试工具: 使用 Hadoop 的调试工具,可以方便地查看任务执行过程中的日志,帮助排查问题。
- 使用测试数据: 使用少量测试数据,验证程序逻辑是否正确,确保程序能够正常运行。
其他建议:
- 使用更具体的描述: 在代码中添加注释,解释每个代码块的功能,以便于理解和维护。
- 使用更具可读性的命名: 使用更具可读性的变量名和方法名,以便于理解代码逻辑。
- 使用更规范的代码格式: 遵循代码规范,使用统一的代码格式,提高代码可读性和可维护性。
希望以上分析能够帮助您理解 ClickStreamPageView 代码,解决程序运行过程中遇到的问题。
原文地址: https://www.cveoy.top/t/topic/ml8R 著作权归作者所有。请勿转载和采集!