ClickStreamPageView - Analyze User Clickstream Data Using Hadoop MapReduce
public class ClickStreamPageView {
static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
Text k = new Text();
WebLogBean v = new WebLogBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
if (fields.length < 9) return;
//将切分出来的各字段set到weblogbean中
//fields[0].equals("true")
v.set('true'.equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
//只有有效记录才进入后续处理
if (v.isValid()) {
//此处用ip地址来标识用户
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
// for (WebLogBean b : values) {
// beans.add(b);
// }
// 先将一个用户的所有访问记录中的时间拿出来排序
try {
for (WebLogBean bean : values) {
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch(Exception e) {
e.printStackTrace();
}
beans.add(webLogBean);
}
//将bean按时间先后顺序排序
Collections.sort(beans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
* 核心思想:
* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
* 否则,就属于不同的session
*
*/
int step = 1;
String session = UUID.randomUUID().toString();
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果仅有1条数据,则直接输出
if (1 == beans.size()) {
// 设置默认停留时长为60s
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" +
bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" +
bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
if (i == 0) {
continue;
}
// 求近两次时间差
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
if (timeDiff < 30 * 60 * 1000) {
v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" +
beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
v.set(session + "\001" + key.toString() + "\001" + beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" +
beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
System.out.println("0-0-0-0-1" + session);
// 输出完上一条之后,重置step编号
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍历的是最后一条,则将本条直接输出
if (i == beans.size() - 1) {
// 设置默认停留市场为60s
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
}
private String toStr(Date date) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamPageView.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\input/*"));
FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\66666\\Desktop\\毕业设计素材\\龙咏\\output"));
// 设置reduce tsk的个数为0,意味着没有reduce阶段
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
The code you provided uses Hadoop MapReduce to process clickstream data. The core logic analyzes user sessions, determines visit steps, and outputs page view data. Here's a breakdown of the code and its key functionalities:
1. ClickStreamMapper:
- Input: Reads lines of clickstream data (presumably from a file). Each line represents a user's page view.
- Output: Emits a key-value pair:
- Key: The user's IP address, which is used to identify the user.
- Value: A
WebLogBeanobject containing relevant information about the page view, such as the timestamp, URL, and user agent.
- Logic: Parses the input line into fields, creates a
WebLogBeanobject, and sets its attributes based on the parsed fields. Only valid records (based onv.isValid()) are emitted.
2. ClickStreamReducer:
- Input: Receives a key (IP address) and an iterable of
WebLogBeanobjects (page views) associated with that user. - Output: Emits a key-value pair:
- Key:
NullWritable(not used in this case). - Value: A
Textobject containing a string representation of the processed data. This string includes information like the session ID, IP address, timestamp, URL, visit step, and other relevant details.
- Key:
- Logic: Performs the following steps:
- Sorting: Sorts the incoming
WebLogBeanobjects by timestamp to determine the order of page views. - Session Identification: Iterates through the sorted list of page views and identifies user sessions by comparing timestamps. If the time difference between consecutive page views is less than 30 minutes, they belong to the same session. Otherwise, a new session is started.
- Step Numbering: Within each session, the code assigns a step number (e.g., 1, 2, 3) to each page view, representing the order in which the pages were visited.
- Output Generation: Outputs the processed data for each page view as a string containing session ID, IP address, timestamp, URL, visit step, and other details. The code handles cases where there's only one page view in a session or when a session ends.
- Sorting: Sorts the incoming
3. Main Function:
- Job Configuration: Sets up the Hadoop MapReduce job, specifying the mapper and reducer classes, input and output paths, and other configuration settings.
- Running the Job: Submits the job to Hadoop for execution.
Key Points:
- Sessionization: The core logic of this code is to identify user sessions based on timestamps and assign visit steps within each session. This is a crucial step for clickstream analysis.
- Data Output Format: The output data is formatted as a string, making it easy to parse and analyze further. The string includes details like session ID, timestamp, URL, and visit step, which are important for understanding user behavior.
- Reduce Task: The code uses a zero reduce task setup (
job.setNumReduceTasks(0);). This indicates that the output from the mapper is not combined and directly written to the output directory. The reason for this could be to avoid shuffling and sorting in the reduce phase. Instead, the mapper directly writes the results to the output directory, which may be beneficial for certain use cases.
Debugging Tip: The most likely reason for your empty output file is an issue with the output format or the data being written. Double-check the v.set() method in both the ClickStreamMapper and ClickStreamReducer to ensure the correct format of the output string. You might want to include a simple System.out.println() in both classes to verify that data is being processed correctly and to see what is actually being written to the output. Make sure the data being read from the input file matches the expected format used in the ClickStreamMapper.
Remember to test your code thoroughly and analyze the output data to verify its accuracy. Clickstream analysis can be a powerful tool for understanding user behavior and optimizing your web applications. Good luck!
原文地址: https://www.cveoy.top/t/topic/ml8N 著作权归作者所有。请勿转载和采集!