词云 bot:原项目地址: https://github.com/devourbots/word_cloud_bot#
优化原因:#
阅读过代码会发现,此机器人语句分析是在定时任务或者 rank 执行时实现的,一般时候只是纯粹地接受用户聊天记录。故对于大量群组使用时会造成压力,主要原因是负载不均衡。故我准备将语句分析迁移至平时。(即传入一句分析一句)
使用环境#
- 沿袭 python3 的使用
- 使用 sqlite3 数据库 原因为均衡了负载,对数据库性能没有较大要求
代码实现#
数据库操作模块#
- 建立一个操作类
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False) //①
print(f'连接数据库{Db_name}成功')
self.db=conn
self.cursor=conn.cursor()
以上代码初始化了一个数据库
①处若存在数据库则连接,若不存在则创建,且这个数据库可以多线程操作
- 检查表是否存在
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""//①
return self.cursor.execute(sql).fetchone()[0]
①处为 sql 语句,意思为查找表名为chatId
的表
return self.cursor.execute(sql).fetchone()[0]
处执行语句,并返回执行结果
- 创建分词数据表
- 群组词频表
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE, //词语字段 ①
times SMALLINT //词语频率
)
"""
self.cursor.execute(sql)
self.db.commit()
①处为词语字段,因为会出现中文,故设为 NCHAR 类型,然后设置为 UNIQUE,后面会解释用法
- 用户发言次数表
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE, //用户id
times SMALLINT, //发言次数
name NCHAR(20) //用户名
)
"""
self.cursor.execute(sql)
self.db.commit()
- 添加词语,更新词频
- 词频
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user" //该群组词表名称
if not self.check_table(chatId)://调用之前函数检查表是否存在
self.creat_user_table(chatId)//若不存在则创建
times=int(self.serch_user_data(chatId,userId))+1 //次数加一
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}') //①
"""
self.cursor.execute(sql)
self.db.commit()
return True
①处 sql 语句意思是:若表中 UNIQUE 字段存在一致,则更新数据,若不存在,则创建字段。
- 用户发言
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
基本与词频统计部分一致
- 提取词频率(用于生成词云)
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}'; //①
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
①处 sql 语句为检索chatId
表里所有的值,返回格式为(("词语1","次数"),("词语2","次数")...)
,因此可以用 dict () 函数将此元组转换为字典
- 查找所有表
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
- 删除所有表
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
用于收集一周期后产生新一轮词云
数据库操作模块完整代码#
db_manager.py
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False)
print(f'连接数据库{Db_name}成功')
self.db=conn
self.cursor=conn.cursor()
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE,
times SMALLINT
)
"""
self.cursor.execute(sql)
self.db.commit()
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE,
times SMALLINT,
name NCHAR(20)
)
"""
self.cursor.execute(sql)
self.db.commit()
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""
return self.cursor.execute(sql).fetchone()[0]
def serch_group_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE groupData = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def serch_user_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE userId = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def add_message(self,chatId,message):
chatId=f"{chatId}_group"
if not self.check_table(chatId):
self.creat_group_table(chatId)
times=int(self.serch_group_data(chatId,message))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{message}','{times}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
机器人主程序部分#
- 导入包
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //导入先前写好的操作模块
db=dm("data.db") //实例化类
mk = imageio.imread("circle.png") //读取图片(用于生成词云)
- 先写一个获取用户名的函数 方便 message 处理程序使用
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- 消息处理函数
def msg_handler(update,context): //后面会写一个消息处理器来不断传入参数至这个函数
try: //尝试获取用户消息文本
text = update.message.text
except:
return
userId = str(update.effective_user.id) //获取用户id
chatId = str(update.effective_message.chat_id) //获取群组id
name=get_name(update) //获取用户名(调用 2 中的函数)
db.add_user(chatId, userId, name) //调用 add_user 方法
text=re.sub('\W*', '',text) //使用正则表达式,去除标点符号
if len(text)<2:
return //如果是单字,就结束,不继续分词
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True)) //小于7个字用cut_all参数找出所有可能词,大于则使用精准匹配
words=pseg.cut(text,use_paddle=True)
for word ,flag in words: //返回格式(("词语1","词性"),("词语2","词性")...)
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]: //去除虚词干扰
if len(word)>1: //如果是词语(即大于等于两个字)
db.add_message(chatId, word) //加入数据库
return //结束
- 词云图片生成函数
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf', //读取字体
mask=mk, //图片
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
//generate_from_frequencies 按字典加权词语比重,输出至本目录下 以groupid+_pic.png为名字的文件中
- 手动生成图片情况
def make(update,context):
group = str(update.effective_message.chat_id) //获取请求生成的群组
dic_to_pic(group) //生成图片
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
) //发送图片
os.remove(f"{group}_pic.png") //删除图片
except:
pass
- 定时任务的情况
def schedule_task():
for group in db.all_table(): //查找所有表,并遍历所有有消息的群组
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- 清除所有数据
def reset():
db.del_all()
- 定时任务
def check_schedule():
while True:
try:
schedule.run_pending() //检查是否有到期定时任务
time.sleep(1)
except:
time.sleep(7200)
这个函数后面会使用
- 主程序
if __name__ == '__main__':
TOKEN=123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx //TOKEN自己申请填入
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同时可并行任务数,可自行调整
dp = updater.dispatcher //触发器
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //添加定时任务
threading.Thread(target=check_schedule).start() //创建一个新线程运行 8 中函数
dp.add_handler(CommandHandler("make", make, run_async=True)) //创建一个命令触发器,用户发送 /make便会执行make函数
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //创建一个消息触发器,过滤条件为纯文本
这样添加机器人权限后就可以正常运行了!
机器人主程序部分完整代码#
bot.py
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //导入先前写好的操作模块
db=dm("data.db") //实例化类
mk = imageio.imread("circle.png") //读取图片(用于生成词云)
def get_name(update):
user = update.message.from_user
firstname = str(user["first_name"])
lastname = str(user["last_name"])
name = ""
if firstname != "None":
name = firstname + " "
if lastname != "None":
name += lastname
if len(name) == 0:
try:
name = update.effective_user.username
except:
name = update.effective_user.id
return name
def msg_handler(update,context):
try:
text = update.message.text
except:
return
userId = str(update.effective_user.id)
chatId = str(update.effective_message.chat_id)
name=get_name(update)
db.add_user(chatId, userId, name)
text=re.sub('\W*', '',text)
if len(text)<2:
return
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True))
words=pseg.cut(text,use_paddle=True)
for word ,flag in words:
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]:
if len(word)>1:
db.add_message(chatId, word)
return
def make(update,context):
group = str(update.effective_message.chat_id)
dic_to_pic(group)
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
)
os.remove(f"{group}_pic.png")
except:
pass
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf',
mask=mk,
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
def reset():
db.del_all()
def check_schedule():
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
time.sleep(7200)
if __name__ == '__main__':
TOKEN="123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //TOKEN自己申请填入
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同时可并行任务数,可自行调整
dp = updater.dispatcher //触发器
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //添加定时任务
threading.Thread(target=check_schedule).start() //创建一个新线程运行 8 中函数
dp.add_handler(CommandHandler("make", make, run_async=True)) //创建一个命令触发器,用户发送 /make便会执行make函数
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //创建一个消息触发器,过滤条件为纯文本
总结#
[todo-t] 统计词频 [/todo-t]
[todo-t] 统计用户发言 [/todo-t]
[todo-t] 接受指令生成词云 [/todo-t]
[todo-t] 定时生成词云并发送群组 [/todo-t]
[todo-f] 用户热度统计 [/todo-f]
[todo-f] 权限限制 [/todo-f]