实时用户数统计是很多线上服务需要的基础功能,比如统计当前在线人数、当日活跃用户数等,Redis因为高性能、支持原子操作的特点,成为这类场景的常用存储工具,结合Python可以很方便地实现相关功能。

核心实现思路
实时用户数统计的核心需求是快速更新、快速查询,同时保证计数准确。Redis提供了多种数据结构可以适配不同场景:
- 如果是统计当前在线用户数,适合用
SET结构存储用户唯一标识,利用SCARD命令获取集合大小 - 如果是统计时间窗口内的活跃用户数,可以用
HyperLogLog结构,占用空间小且支持去重计数 - 如果是需要记录用户最后活跃时间并自动过期,可以结合
STRING或者HASH结构设置过期时间
环境准备
首先需要在环境中安装Redis服务和Python的Redis客户端,执行以下命令安装依赖:
# 安装Python Redis客户端 pip install redis
场景一:统计当前在线用户数
这种场景适合用Redis的SET结构,用户登录时把用户ID加入集合,用户退出或者超时下线时从集合移除,需要查询时直接获取集合元素个数。
import redis
import time
# 连接Redis,默认本地6379端口,无密码
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def user_login(user_id):
"""用户登录,加入在线集合"""
# 集合key为online_users,存储所有在线用户ID
r.sadd('online_users', user_id)
print(f"用户{user_id}登录,当前在线用户数:{r.scard('online_users')}")
def user_logout(user_id):
"""用户退出,从在线集合移除"""
r.srem('online_users', user_id)
print(f"用户{user_id}退出,当前在线用户数:{r.scard('online_users')}")
def get_online_count():
"""获取当前在线用户数"""
return r.scard('online_users')
# 测试示例
if __name__ == '__main__':
user_login('user_1001')
user_login('user_1002')
user_login('user_1003')
print(f"当前在线总人数:{get_online_count()}")
time.sleep(2)
user_logout('user_1002')
print(f"当前在线总人数:{get_online_count()}")场景二:统计时间窗口内活跃用户数
如果需要统计比如最近1小时、1天内的活跃用户数,不需要精确知道每个用户的状态,只需要去重计数,可以用HyperLogLog结构,占用空间极小,适合大用户量场景。
import redis
from datetime import datetime
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def record_active_user(user_id, time_window='daily'):
"""记录时间窗口内的活跃用户,time_window可选daily/hourly"""
if time_window == 'daily':
# 按天统计,key格式为active_users:日期
key = f"active_users:{datetime.now().strftime('%Y%m%d')}"
elif time_window == 'hourly':
# 按小时统计,key格式为active_users:日期_小时
key = f"active_users:{datetime.now().strftime('%Y%m%d_%H')}"
else:
key = 'active_users:default'
# 向HyperLogLog中添加用户ID,自动去重
r.pfadd(key, user_id)
# 可以设置过期时间,比如按天的key30天后过期
if time_window == 'daily':
r.expire(key, 30 * 86400)
def get_active_user_count(time_window='daily'):
"""获取时间窗口内的活跃用户数"""
if time_window == 'daily':
key = f"active_users:{datetime.now().strftime('%Y%m%d')}"
elif time_window == 'hourly':
key = f"active_users:{datetime.now().strftime('%Y%m%d_%H')}"
else:
key = 'active_users:default'
# 获取HyperLogLog的近似计数
return r.pfcount(key)
# 测试示例
if __name__ == '__main__':
record_active_user('user_1001', 'daily')
record_active_user('user_1002', 'daily')
record_active_user('user_1001', 'daily') # 重复添加不会重复计数
print(f"今日活跃用户数:{get_active_user_count('daily')}")注意事项
实际使用中需要注意几个问题:
- 用户唯一标识尽量选择不会重复的值,比如用户ID、设备ID等
- 如果是分布式服务,多个实例操作同一个Redis,不需要额外加锁,Redis的命令是原子操作,不会出现计数错误
- HyperLogLog的计数是近似值,误差在0.81%左右,如果需要精确计数不要使用这个结构
- 在线用户统计如果依赖用户主动退出,可能存在用户直接关闭页面没有触发退出的情况,可以结合心跳机制,定期清理超过一定时间没有活跃的用户
心跳机制适配在线统计
为了解决用户异常下线的问题,可以加一个心跳接口,用户每隔一段时间调用一次,更新自己的最后活跃时间,后台定时任务清理超时的用户。
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def user_heartbeat(user_id):
"""用户心跳,更新最后活跃时间,同时确保在在线集合中"""
# 用HASH结构存储用户的最后活跃时间戳
r.hset('user_last_active', user_id, int(time.time()))
# 确保在在线集合中
r.sadd('online_users', user_id)
def clean_offline_users(timeout=300):
"""清理超过timeout秒没有心跳的用户,默认5分钟""
current_time = int(time.time())
# 获取所有用户的最后活跃时间
all_users = r.hgetall('user_last_active')
offline_users = []
for user_id, last_active in all_users.items():
if current_time - int(last_active) > timeout:
offline_users.append(user_id)
if offline_users:
# 从在线集合移除
r.srem('online_users', *offline_users)
# 从活跃时间记录中移除
r.hdel('user_last_active', *offline_users)
print(f"清理离线用户:{offline_users},当前在线数:{r.scard('online_users')}")
# 测试示例,模拟后台定时执行clean_offline_users即可