ClickStreamPageView Java 代码分析: 从日志文件生成页面访问数据

该 Java 代码片段 ClickStreamPageView 使用 Hadoop MapReduce 框架,从日志文件解析页面访问信息,生成包含用户、时间、页面、步骤、停留时长、来源页面、浏览器信息等数据的结构化数据。

代码结构:

  • ClickStreamMapper 类:负责读取日志文件,将每行日志解析为 WebLogBean 对象,并根据用户 IP 地址分组。
  • ClickStreamReducer 类:接收每个用户的所有访问记录,根据时间顺序排序,并根据时间间隔判断用户访问的不同页面,为每个页面标号步骤,最后输出结构化数据。

代码解析:

public class ClickStreamPageView {

	static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {

		Text k = new Text();
		WebLogBean v = new WebLogBean();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString();

			String[] fields = line.split("\001");
			if (fields.length < 9) return;
			// 将切分出来的各字段 set 到 weblogbean 中
			// fields[0].equals("true")
			v.set('true'.equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
			// 只有有效记录才进入后续处理
			if (v.isValid()) {
				// 此处用 ip 地址来标识用户
				k.set(v.getRemote_addr());
				context.write(k, v);
			}
		}
	}

	static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {

		Text v = new Text();

		@Override
		protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
			ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();

			// 先将一个用户的所有访问记录中的时间拿出来排序
			try {
				for (WebLogBean bean : values) {
					WebLogBean webLogBean = new WebLogBean();
					try {
						BeanUtils.copyProperties(webLogBean, bean);
					} catch (Exception e) {
						e.printStackTrace();
					}
					beans.add(webLogBean);
				}


				// 将 bean 按时间先后顺序排序
				Collections.sort(beans, new Comparator<WebLogBean>() {

					@Override
					public int compare(WebLogBean o1, WebLogBean o2) {
						try {
							Date d1 = toDate(o1.getTime_local());
							Date d2 = toDate(o2.getTime_local());
							if (d1 == null || d2 == null)
								return 0;
							return d1.compareTo(d2);
						} catch (Exception e) {
							e.printStackTrace();
							return 0;
						}
					}

				});

				/**
				 * 以下逻辑为:从有序 bean 中分辨出各次 visit,并对一次 visit 中所访问的 page 按顺序标号 step
				 * 核心思想:
				 * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个 session
				 * 否则,就属于不同的 session
				 *
				 */
				int step = 1;
				String session = UUID.randomUUID().toString();
				for (int i = 0; i < beans.size(); i++) {
					WebLogBean bean = beans.get(i);
					// 如果仅有 1 条数据,则直接输出
					if (1 == beans.size()) {
						// 设置默认停留时长为 60s
						v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
								+ bean.getStatus());
						context.write(NullWritable.get(), v);
						session = UUID.randomUUID().toString();
						break;
					}

					// 如果不止 1 条数据,则将第一条跳过不输出,遍历第二条时再输出
					if (i == 0) {
						continue;
					}

					// 求近两次时间差
					long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
					// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
					if (timeDiff < 30 * 60 * 1000) {
						v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
							+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
						context.write(NullWritable.get(), v);
						step++;
					} else {
						// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将 step 重置,以分隔为新的 visit
						v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
							+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
						context.write(NullWritable.get(), v);
						System.out.println("0-0-0-0-1" + session);
						// 输出完上一条之后,重置 step 编号
						step = 1;
						session = UUID.randomUUID().toString();
					}

					// 如果此次遍历的是最后一条,则将本条直接输出
					if (i == beans.size() - 1) {
						// 设置默认停留市场为 60s
						v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
						context.write(NullWritable.get(), v);
					}
				}

			} catch (ParseException e) {
				e.printStackTrace();

			}

		}

		private String toStr(Date date) {
			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
			return df.format(date);
		}

		private Date toDate(String timeStr) throws ParseException {
			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
			return df.parse(timeStr);
		}

		private long timeDiff(String time1, String time2) throws ParseException {
			Date d1 = toDate(time1);
			Date d2 = toDate(time2);
			return d1.getTime() - d2.getTime();

		}

		private long timeDiff(Date time1, Date time2) throws ParseException {

			return time1.getTime() - time2.getTime();

		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(ClickStreamPageView.class);

		job.setMapperClass(ClickStreamMapper.class);
		job.setReducerClass(ClickStreamReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(WebLogBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		//		FileInputFormat.setInputPaths(job, new Path(args[0]));
		//		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		FileInputFormat.setInputPaths(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\input\\*"));
		FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\output"));
		// 设置 reduce tsk 的个数为 0,意味着没有 reduce 阶段
		job.setNumReduceTasks(0);

		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);
	}
}

可能出现问题:

  • 输入数据格式问题: 检查输入数据是否符合预期,以及是否存在缺失、错误、重复等问题。
  • 路径问题: 确认输入路径和输出路径是否正确,是否包含空格、特殊字符等。
  • 权限问题: 确保程序有权限访问输入文件和输出目录。
  • WebLogBean 类: 检查 WebLogBean 类中的 isValid 方法,确保它能够正确判断日志数据是否有效。
  • BeanUtils 类: 检查 BeanUtils 类是否被正确引入,并确保它能够正确复制数据。
  • 数据量问题: 如果数据量过大,可能需要调整 MapReduce 任务的配置,比如增加 Mapper 和 Reducer 的数量,或使用更高效的算法。

解决问题的方法:

  • 调试 Mapper:map 方法中添加日志输出,查看是否正确读取和解析数据。
  • 调试 Reducer:reduce 方法中添加日志输出,查看是否正确接收 Mapper 的输出,以及是否正确生成最终结果。
  • 使用调试工具: 可以使用 Hadoop 的调试工具,比如 yarn logs 命令,查看任务执行过程中的日志,帮助排查问题。

建议:

  • 添加日志输出: 在关键代码段添加日志输出,记录程序执行过程中的状态和数据,帮助排查问题。
  • 使用调试工具: 使用 Hadoop 的调试工具,可以方便地查看任务执行过程中的日志,帮助排查问题。
  • 使用测试数据: 使用少量测试数据,验证程序逻辑是否正确,确保程序能够正常运行。

其他建议:

  • 使用更具体的描述: 在代码中添加注释,解释每个代码块的功能,以便于理解和维护。
  • 使用更具可读性的命名: 使用更具可读性的变量名和方法名,以便于理解代码逻辑。
  • 使用更规范的代码格式: 遵循代码规范,使用统一的代码格式,提高代码可读性和可维护性。

希望以上分析能够帮助您理解 ClickStreamPageView 代码,解决程序运行过程中遇到的问题。

ClickStreamPageView Java 代码分析: 从日志文件生成页面访问数据

原文地址: https://www.cveoy.top/t/topic/ml8R 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录