詞雲 bot:原項目地址: https://github.com/devourbots/word_cloud_bot#
優化原因:#
閱讀過代碼會發現,此機器人語句分析是在定時任務或者 rank 執行時實現的,一般時候只是純粹地接受用戶聊天記錄。故對於大量群組使用時會造成壓力,主要原因是負載不均衡。故我準備將語句分析遷移至平時。(即傳入一句分析一句)
使用環境#
- 沿襲 python3 的使用
- 使用 sqlite3 數據庫 原因為均衡了負載,對數據庫性能沒有較大要求
代碼實現#
數據庫操作模塊#
- 建立一個操作類
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False) //①
print(f'連接數據庫{Db_name}成功')
self.db=conn
self.cursor=conn.cursor()
以上代碼初始化了一個數據庫
①處若存在數據庫則連接,若不存在則創建,且這個數據庫可以多線程操作
- 檢查表是否存在
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""//①
return self.cursor.execute(sql).fetchone()[0]
①處為 sql 語句,意思為查找表名為chatId
的表
return self.cursor.execute(sql).fetchone()[0]
處執行語句,並返回執行結果
- 創建分詞數據表
- 群組詞頻表
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE, //詞語字段 ①
times SMALLINT //詞語頻率
)
"""
self.cursor.execute(sql)
self.db.commit()
①處為詞語字段,因為會出現中文,故設為 NCHAR 類型,然後設置為 UNIQUE,後面會解釋用法
- 用戶發言次數表
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE, //用戶id
times SMALLINT, //發言次數
name NCHAR(20) //用戶名
)
"""
self.cursor.execute(sql)
self.db.commit()
- 添加詞語,更新詞頻
- 詞頻
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user" //該群組詞表名稱
if not self.check_table(chatId)://調用之前函數檢查表是否存在
self.creat_user_table(chatId)//若不存在則創建
times=int(self.serch_user_data(chatId,userId))+1 //次數加一
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}') //①
"""
self.cursor.execute(sql)
self.db.commit()
return True
①處 sql 語句意思是:若表中 UNIQUE 字段存在一致,則更新數據,若不存在,則創建字段。
- 用戶發言
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
基本與詞頻統計部分一致
- 提取詞頻率(用於生成詞雲)
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}'; //①
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
①處 sql 語句為檢索chatId
表裡所有的值,返回格式為(("詞語1","次數"),("詞語2","次數")...)
,因此可以用 dict () 函數將此元組轉換為字典
- 查找所有表
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
- 刪除所有表
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
用於收集一周期後產生新一輪詞雲
數據庫操作模塊完整代碼#
db_manager.py
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False)
print(f'連接數據庫{Db_name}成功')
self.db=conn
self.cursor=conn.cursor()
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE,
times SMALLINT
)
"""
self.cursor.execute(sql)
self.db.commit()
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE,
times SMALLINT,
name NCHAR(20)
)
"""
self.cursor.execute(sql)
self.db.commit()
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""
return self.cursor.execute(sql).fetchone()[0]
def serch_group_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE groupData = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def serch_user_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE userId = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def add_message(self,chatId,message):
chatId=f"{chatId}_group"
if not self.check_table(chatId):
self.creat_group_table(chatId)
times=int(self.serch_group_data(chatId,message))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{message}','{times}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
機器人主程序部分#
- 導入包
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //導入先前寫好的操作模塊
db=dm("data.db") //實例化類
mk = imageio.imread("circle.png") //讀取圖片(用於生成詞雲)
- 先寫一個獲取用戶名的函數 方便 message 處理程序使用
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- 消息處理函數
def msg_handler(update,context): //後面會寫一個消息處理器來不斷傳入參數至這個函數
try: //嘗試獲取用戶消息文本
text = update.message.text
except:
return
userId = str(update.effective_user.id) //獲取用戶id
chatId = str(update.effective_message.chat_id) //獲取群組id
name=get_name(update) //獲取用戶名(調用 2 中的函數)
db.add_user(chatId, userId, name) //調用 add_user 方法
text=re.sub('\W*', '',text) //使用正則表達式,去除標點符號
if len(text)<2:
return //如果是單字,就結束,不繼續分詞
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True)) //小於7個字用cut_all參數找出所有可能詞,大於則使用精確匹配
words=pseg.cut(text,use_paddle=True)
for word ,flag in words: //返回格式(("詞語1","詞性"),("詞語2","詞性")...)
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]: //去除虛詞干擾
if len(word)>1: //如果是詞語(即大於等於兩個字)
db.add_message(chatId, word) //加入數據庫
return //結束
- 詞雲圖片生成函數
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf', //讀取字體
mask=mk, //圖片
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
//generate_from_frequencies 按字典加權詞語比重,輸出至本目錄下 以groupid+_pic.png為名字的文件中
- 手動生成圖片情況
def make(update,context):
group = str(update.effective_message.chat_id) //獲取請求生成的群組
dic_to_pic(group) //生成圖片
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
) //發送圖片
os.remove(f"{group}_pic.png") //刪除圖片
except:
pass
- 定時任務的情況
def schedule_task():
for group in db.all_table(): //查找所有表,並遍歷所有有消息的群組
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- 清除所有數據
def reset():
db.del_all()
- 定時任務
def check_schedule():
while True:
try:
schedule.run_pending() //檢查是否有到期定時任務
time.sleep(1)
except:
time.sleep(7200)
這個函數後面會使用
- 主程序
if __name__ == '__main__':
TOKEN=123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx //TOKEN自己申請填入
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同時可並行任務數,可自行調整
dp = updater.dispatcher //觸發器
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //添加定時任務
threading.Thread(target=check_schedule).start() //創建一個新線程運行 8 中函數
dp.add_handler(CommandHandler("make", make, run_async=True)) //創建一個命令觸發器,用戶發送 /make便會執行make函數
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //創建一個消息觸發器,過濾條件為純文本
這樣添加機器人權限後就可以正常運行了!
機器人主程序部分完整代碼#
bot.py
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //導入先前寫好的操作模塊
db=dm("data.db") //實例化類
mk = imageio.imread("circle.png") //讀取圖片(用於生成詞雲)
def get_name(update):
user = update.message.from_user
firstname = str(user["first_name"])
lastname = str(user["last_name"])
name = ""
if firstname != "None":
name = firstname + " "
if lastname != "None":
name += lastname
if len(name) == 0:
try:
name = update.effective_user.username
except:
name = update.effective_user.id
return name
def msg_handler(update,context):
try:
text = update.message.text
except:
return
userId = str(update.effective_user.id)
chatId = str(update.effective_message.chat_id)
name=get_name(update)
db.add_user(chatId, userId, name)
text=re.sub('\W*', '',text)
if len(text)<2:
return
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True))
words=pseg.cut(text,use_paddle=True)
for word ,flag in words:
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]:
if len(word)>1:
db.add_message(chatId, word)
return
def make(update,context):
group = str(update.effective_message.chat_id)
dic_to_pic(group)
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
)
os.remove(f"{group}_pic.png")
except:
pass
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf',
mask=mk,
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
def reset():
db.del_all()
def check_schedule():
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
time.sleep(7200)
if __name__ == '__main__':
TOKEN="123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //TOKEN自己申請填入
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同時可並行任務數,可自行調整
dp = updater.dispatcher //觸發器
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //添加定時任務
threading.Thread(target=check_schedule).start() //創建一個新線程運行 8 中函數
dp.add_handler(CommandHandler("make", make, run_async=True)) //創建一個命令觸發器,用戶發送 /make便會執行make函數
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //創建一個消息觸發器,過濾條件為純文本
總結#
[todo-t] 統計詞頻 [/todo-t]
[todo-t] 統計用戶發言 [/todo-t]
[todo-t] 接受指令生成詞雲 [/todo-t]
[todo-t] 定時生成詞雲並發送群組 [/todo-t]
[todo-f] 用戶熱度統計 [/todo-f]
[todo-f] 權限限制 [/todo-f]