新闻资讯news(新闻个性化推荐系统源码之构建离线用户画像)

wufei123 发布于 2023-12-25 阅读(231)

用户画像往往是大型网站的重要模块,基于用户画像不仅可以实现个性化推荐,还可以实现用户分群、精准推送、精准营销以及用户行为预测、商业化转化分析等,为商业决策提供数据支持通常用户画像包括用户属性信息(性别、年龄、出生日期等)、用户行为信息(浏览、收藏、点赞等)以及环境信息(时间、地理位置等)。

处理用户行为数据在数据准备阶段,我们通过 Flume 已经可以将用户行为数据收集到 Hive 的 user_action 表的 HDFS 路径中,先来看一下这些数据长什么样子,我们读取当天的用户行为数据,注意读取之前要先关联分区

_day = time.strftime("%Y-%m-%d", time.localtime()) _localions = /user/hive/warehouse/profile.db/user_action/ + _day if fs.exists(_localions):

# 如果有该文件直接关联,捕获关联重复异常 try: self.spark.sql("altertable user_action addpartition (dt=%s) location

%s" % (_day, _localions)) except Exception as e: pass self.spark.sql("use profile") user_action = self.spark.sql("

select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId

from user_action where dt>=" + _day) user_action 结果如下所示

useraction可以发现,上面的一条记录代表用户对文章的一次行为,但通常我们需要查询某个用户对某篇文章的所有行为,所以,我们要将这里用户对文章的多条行为数据合并为一条,其中包括用户对文章的所有行为我们需要新建一个 Hive 表 user_article_basic,这张表包括了用户 ID、文章 ID、是否曝光、是否点击、阅读时间等等,随后我们将处理好的用户行为数据存储到此表中。

createtable user_article_basic ( user_id BIGINTcomment"userID", action_time STRINGcomment

"user actions time", article_id BIGINTcomment"articleid", channel_id INTcomment"channel_id"

, sharedBOOLEANcomment"is shared", clicked BOOLEANcomment"is clicked", collected BOOLEAN

comment"is collected", exposure BOOLEANcomment"is exposured", read_time STRINGcomment"reading time"

) COMMENT"user_article_basic" CLUSTERED by (user_id) into2 buckets STOREDas textfile LOCATION

/user/hive/warehouse/profile.db/user_article_basic; 遍历每一条原始用户行为数据,判断用户对文章的行为,在 user_action_basic 中将该用户与该文章对应的行为设置为 True

if user_action.collect(): def_generate(row): _list = [] if row.action == exposure

: for article_id in eval(row.articleId): # ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]

_list.append( [row.userId, row.actionTime, article_id, row.channelId,

False, False, False, True, row.readTime]) return _list else: classTemp

(object): shared = False clicked = False collected =

False read_time = "" _tp = Temp() if row.action == click: _tp.clicked =

Trueelif row.action == share: _tp.shared = Trueelif row.action == collect: _tp.collected =

Trueelif row.action == read: _tp.clicked = True _list.append( [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,

True, row.readTime]) return _list user_action_basic = user_action.rdd.flatMap(_generate) user_action_basic = user_action_basic.toDF( [

"user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",

"read_time"]) user_action_basic 结果如下所示,这里的一条记录包括了某个用户对某篇文章的所有行为

user_action_basic由于 Hive 目前还不支持 pyspark 的原子性操作,所以 user_article_basic 表的用户行为数据只能全量更新(实际场景中可以选择其他语言或数据库实现)。

这里,我们需要将当天的用户行为与 user_action_basic 的历史用户行为进行合并old_data = uup.spark.sql("select * from user_article_basic"

) new_data = old_data.unionAll(user_action_basic) 合并后又会产生一个新的问题,那就是用户 ID 和文章 ID 可能重复,因为今天某个用户对某篇文章的记录可能在历史数据中也存在,而 unionAll() 方法并没有去重,这里我们可以按照用户 ID 和文章 ID 进行分组,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可,去重后直接存储到 user_article_basic 表中

new_data.registerTempTable("temptable") self.spark.sql(insert overwrite table user_article_basic select

user_id, max(action_time) as action_time, article_id, max(channel_id) as channel_id, max(shared

) asshared, max(clicked) as clicked, max(collected) as collected, max(exposure) as exposure,

max(read_time) as read_time from temptable groupby user_id, article_id) 表 user_article_basic 结果如下所示

user_article_basic计算用户画像我们选择将用户画像存储在 Hbase 中,因为 Hbase 支持原子性操作和快速读取,并且 Hive 也可以通过创建外部表关联到 Hbase,进行离线分析,如果要删除 Hive 外部表的话,对 Hbase 也没有影响。

首先,在 Hbase 中创建用户画像表createuser_profile, basic,partial,env在 Hive 中创建 Hbase 外部表,注意字段类型设置为 mapcreateexternal

table user_profile_hbase ( user_id STRINGcomment"userID", information MAP

DOUBLE> comment"user basic information", article_partial MAP comment"article partial"

, env MAP comment"user env" ) COMMENT"user profile table"STOREDBYorg.apache.hadoop.hive.hbase.HBaseStorageHandler

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:") TBLPROPERTIES ("hbase.table.name"

= "user_profile"); 创建外部表之后,还需要导入一些依赖包cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/ cp -r /root/bigdata/hive/lib/h

*.jar /root/bigdata/spark/jars/ 接下来,读取处理好的用户行为数据,由于日志中的 channel_id 有可能是来自于推荐频道(0),而不是文章真实的频道,所以这里要将 channel_id 列删除

spark.sql("use profile") user_article_basic = spark.sql("select * from user_article_basic").drop(channel_id

) 通过文章 ID,将用户行为数据与文章画像数据进行连接,从而得到文章频道 ID 和文章主题词spark.sql(use article) article_topic = spark.sql("select article_id, channel_id, topics from article_profile"

) user_article_topic = user_article_basic.join(article_topic, how=left, on=[article_id]) user_article_topic 结果如下图所示,其中 topics 列即为文章主题词列表,如 [补码, 字符串, 李白, ...]

user_article_topic接下来,我们需要计算每一个主题词对于用户的权重,所以需要将 topics 列中的每个主题词都拆分为单独的一条记录可以利用 Spark 的 explode() 方法,达到类似“爆炸”的效果。

import pyspark.sql.functions as F user_article_topic = user_topic.withColumn(topic, F.explode(topics

)).drop(topics) user_article_topic 如下图所示

user_article_topic我们通过用户对哪些文章发生了行为以及该文章有哪些主题词,计算出了用户对哪些主题词发生了行为这样,我们就可以根据用户对主题词的行为来计算主题词对用户的权重,并且将这些主题词作为用户的标签。

那么,用户标签权重的计算公式为:用户标签权重 =(用户行为分值之和)x 时间衰减其中,时间衰减公式为:时间衰减系数 = 1 / (log(t) + 1),其中 t 为发生行为的时间距离当前时间的大小不同的用户行为对应不同的权重,如下所示

用户行为分值阅读时间(=1000)2收藏2分享3点击5计算用户标签及权重,并存储到 Hbase 中 user_profile 表的 partial 列族中注意,这里我们将频道 ID 和标签一起作为 partial 列族的键存储,这样我们就方便查询不同频道的标签及权重了。

defcompute_user_label_weights(partitions):""" 计算用户标签权重 """ action_weight = { "read_min"

: 1, "read_middle": 2, "collect": 2, "share": 3, "click": 5 }

from datetime import datetime import numpy as np # 循环处理每个用户对应的每个主题词for row in partitions:

# 计算时间衰减系数 t = datetime.now() - datetime.strptime(row.action_time, %Y-%m-%d %H:%M:%S) alpha =

1 / (np.log(t.days + 1) + 1) if row.read_time == : read_t = 0else: read_t = int(row.read_time)

# 计算阅读时间的行为分数 read_score = action_weight[read_middle] if read_t > 1000else action_weight[read_min

] # 计算各种行为的权重和并乘以时间衰减系数 weights = alpha * (row.shared * action_weight[share] + row.clicked * action_weight[

click] + row.collected * action_weight[collect] + read_score)

# 更新到user_profilehbase表with pool.connection() as conn: table = conn.table(user_profile) table.put(

user:{}.format(row.user_id).encode(), {partial:{}:{}.format(row.channel_id, row.topic).encode(): json.dumps( weights).encode()}) conn.close() user_topic.foreachPartition(compute_user_label_weights)

在 Hive 中查询用户标签及权重hive> select * from user_profile_hbase limit 1; OK user:1 {"birthday":0.0,"gender":

null} {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++"

:0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244

,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code"

:0.23934588700996243,"18:DIR...... 接下来,要将用户属性信息加入到用户画像中读取用户基础信息,存储到用户画像表的 basic 列族即可defupdate_user_info。

():""" 更新用户画像的属性信息 :return: """ spark.sql("use toutiao") user_basic = spark.sql("select user_id, gender, birthday from user_profile"

) defudapte_user_basic(partition):import happybase # 用于读取hbase缓存结果配置 pool = happybase.ConnectionPool(size=

10, host=172.17.0.134, port=9090) for row in partition: from datetime import date age =

0if row.birthday != null: born = datetime.strptime(row.birthday, %Y-%m-%d) today = date.today() age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))

with pool.connection() as conn: table = conn.table(user_profile) table.put(

user:{}.format(row.user_id).encode(), {basic:gender.encode(): json.dumps(row.gender).encode()}) table.put(

user:{}.format(row.user_id).encode(), {basic:birthday.encode(): json.dumps(age).encode()}) conn.close() user_basic.foreachPartition(udapte_user_basic)

到这里,我们的用户画像就计算完成了Apscheduler 定时更新定义更新用户画像方法,首先处理用户行为日志,拆分文章主题词,接着计算用户标签的权重,最后再将用户属性信息加入到用户画像中defupdate_user_profile。

():""" 定时更新用户画像的逻辑 :return: """ up = UpdateUserProfile() if up.update_user_action_basic(): up.update_user_label() up.update_user_info()

在 Apscheduler 中添加定时更新用户画像任务,设定每隔 2 个小时更新一次from apscheduler.schedulers.blocking import BlockingScheduler

from apscheduler.executors.pool import ProcessPoolExecutor # 创建scheduler,多进程执行 executors = { default

: ProcessPoolExecutor(3) } scheduler = BlockingScheduler(executors=executors) # 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次

scheduler.add_job(update_article_profile, trigger=interval, hours=1) # 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次

scheduler.add_job(update_user_profile, trigger=interval, hours=2) scheduler.start() 另外说一下,在实际场景中,用户画像往往是非常复杂的,下面是电商场景的用户画像,可以了解一下。

用户画像

亲爱的读者们,感谢您花时间阅读本文。如果您对本文有任何疑问或建议,请随时联系我。我非常乐意与您交流。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。