关于上网时长的高性能计算问题
背景:
原始数据是上线一条数据纪录,下线一条数据数据纪录。并且数据量巨大,每秒产生20K~50K条数据。
数据纪录格式为文本,大概是这样:
IP地址,端口号,时间,上线/下线
现在需要当收到下线数据纪录时,找到之前的上线数据纪录。同时计算出该用户上网的时长。上网时间=下线时间-上线时间。
需要注意的问题:
1、上网时间可能会超过一周,因此前面上线纪录的数据需要保存较长时间,假设最长一周。
2、如何高效迅速的查到的对应的上线数据纪录,而不至于整个处理程序被堵塞。
3、按照IP地址、端口号来标识唯一一次用户上线行为,用户记录数最大可能会达到20M条纪录。假设可以全部放入内存中。
4、收到用户下线纪录后,完成上线时长的计算后,应该将内存中的相应上线纪录数据删除。删除也会有性能开销。
请各位大牛给一些思路和方案,感觉应该有对应这个问题的数据结构和算法。谢谢!!!最多只能一次给100分,如果问题解决,可以另外开帖子散分。 高性能计算 内存 算法
[解决办法]
建议:使用IP+Port做成一个关键字或组合
设:IP不变,
IP:PortA 上线,
IP:PortB 上线
IP:PortA 下线,
IP:PortB 下线,
上述时序中,如果只针对IP进行计算,就会出错。每次都要查询,有点像.net的垃圾回收机制,很复杂。所以建议将IP+Port做成一个关键字或组合。这样做会导致2-3之间重复计时。不知你能否接受。
不知道你的开发需求,要快速或质量就用数据库,要特殊平台就自己写。
数据结构很简单
ID
IP
Port
Time
[解决办法]
我这有个应用和LZ的很相似,数据结构几乎一样
说一下我处理的办法
10分钟计算一次所有用户在线时长
假设某用户在线
1、该用户只有登入记录,没有登出记录
2、计算完成后迁走本次计算的所有数据,并把该用户的在线时长插入Mysql和HBASE中
3、对没有登出记录的用户,在下一个10分钟的原数据前插入一条登入记录
这样做是为了节省硬盘
同时每天删除被迁走的记录,释放硬盘空间
有两种计算框架,都是用JAVA做的。
1、调用RDBMS的SP,只针对一些小数据量业务
2、MapReduce,针对那些无比庞大的数据
不管哪一种思路都是一样的
贴上部分源码
Map函数
public static class PlayerLoginMapper_KickPlayerByCCUDetail extends Mapper<LongWritable, Text, Text, Text>
{
Text outputkey = new Text();
Text outputvalue = new Text();
class PlayerLogCCUKickPlayer extends StructPlayerLogCCU {
public PlayerLogCCUKickPlayer() {
PlayerID = "";
LogTime = "";
LogType = DEFAULT_ZERO_TYPE;
}
public PlayerLogCCUKickPlayer(String _Value) {
String[] Kval = _Value.split("\t");
PlayerID = Kval[0];
LogTime = Kval[1];
LogType = PLAYER_LOGOUT;
}
}
PlayerLogCCUKickPlayer playerLogin = new PlayerLogCCUKickPlayer();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String vstr = value.toString();
try {
playerLogin = new PlayerLogCCUKickPlayer(vstr);
}
catch (Exception ex) {
return;
}
outputkey.set(playerLogin.getPlayerID() + "\t"
+ playerLogin.getLogTime() + "\t" + playerLogin.getLogType());
outputvalue.set("");
context.write(outputkey, outputvalue);
}
}
public static class PlayerOnlineTimeCombiner extends Reducer<Text, Text, Text, Text>
{
Text outputkey = new Text();
Text outputvalue = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
outputkey = key;
if (Integer.valueOf(outputkey.toString().split("\t", 3)[2]) != PLAYER_LOGINGETWAY)
context.write(outputkey, val);
}
}
}
public static class PlayerOnlineTimeReducer extends Reducer<Text, Text, Text, Text>
{
Text outputkey = new Text();
Text outputvalue = new Text();
String PlayerLoginBeginTime, PlayerLoginEndTime;
@Override
protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
PlayerLoginBeginTime = context.getConfiguration().get("PlayerLoginBeginTime");
PlayerLoginEndTime = context.getConfiguration().get("PlayerLoginEndTime");
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int OnlineTime = 0;
loginTime BeginTime = new loginTime();
loginTime EndTime = new loginTime();
String BeginTimeStrTmp = "";
String EndTimeStrTmp = "";
for (@SuppressWarnings("unused")
Text val : values) {
String[] LogValue = key.toString().split("\t", 3);
EndTime.setloginTime(LogValue[1], LogValue[2]);
if (BeginTime.getTime().equals("")) {
outputkey.set(LogValue[0]);
BeginTime.setloginTime(LogValue[1], LogValue[2]);
}
/*****************************************************************/
/* 处理分段数据的代码 */
/* if (EndTime.getTimeType() == PLAYER_LOGINGETWAY) { */
/* continue; */
/* } */
/* else {} */
/*****************************************************************/
if (EndTime.getTime().compareTo(PlayerLoginBeginTime) < 0) {
BeginTime.setloginTime(EndTime.getTime(), EndTime.getTimeType());
continue;
}
if (BeginTime.getTime().compareTo(PlayerLoginEndTime) > 0) {
BeginTime.setloginTime(EndTime.getTime(), EndTime.getTimeType());
break;
}
if (BeginTime.getTimeType() == PLAYER_LOGINZONE) {
if (BeginTime.getTime().compareTo(PlayerLoginBeginTime) <= 0
&& EndTime.getTime().compareTo(PlayerLoginBeginTime) >= 0) {
BeginTime.setloginTime(PlayerLoginBeginTime);
}
if (BeginTime.getTime().compareTo(PlayerLoginEndTime) <= 0
&& EndTime.getTime().compareTo(PlayerLoginEndTime) >= 0) {
EndTime.setloginTime(PlayerLoginEndTime);
}
try {
OnlineTime += loginTime.compTime(BeginTime.getTime(), EndTime.getTime());
}
catch (Exception ex) {
System.err.println(BeginTime.getTime() + "\t\t" + EndTime.getTime());
}
if (BeginTimeStrTmp.equals(""))
BeginTimeStrTmp = BeginTime.getTime();
EndTimeStrTmp = EndTime.getTime();
}
BeginTime.setloginTime(EndTime.getTime(), EndTime.getTimeType());
}
if (BeginTime.getTime().equals(EndTime.getTime())
&& BeginTime.getTimeType() == EndTime.getTimeType()
&& BeginTime.getTimeType() == PLAYER_LOGINZONE
&& BeginTime.getTime().compareTo(PlayerLoginEndTime) < 0) {
EndTime.setloginTime(PlayerLoginEndTime);
BeginTime.setloginTime(BeginTime.getTime().compareTo(PlayerLoginBeginTime) > 0
? BeginTime.getTime() : PlayerLoginBeginTime);
try {
OnlineTime += loginTime.compTime(BeginTime.getTime(), EndTime.getTime());
}
catch (Exception ex) {
System.err.println(BeginTime.getTime() + "\t\t" + EndTime.getTime());
}
if (BeginTimeStrTmp.equals(""))
BeginTimeStrTmp = BeginTime.getTime();
EndTimeStrTmp = PlayerLoginEndTime;
}
if (OnlineTime != 0) {
outputvalue.set(BeginTimeStrTmp + "\t" + EndTimeStrTmp + "\t" + OnlineTime);
context.write(outputkey, outputvalue);
}
}
}