ワードクラウドボット:原プロジェクトアドレス: https://github.com/devourbots/word_cloud_bot#
最適化理由:#
コードを読んでみると、このボットの文分析は定期的なタスクやランク実行時に実現されており、通常はユーザーのチャット履歴を単純に受け取るだけです。そのため、大量のグループで使用する際に負担がかかることがあります。主な理由は負荷の不均衡です。したがって、私は文分析を通常の時に移行することを考えています。(つまり、文を受け取るたびに分析します)
使用環境#
- python3 の使用を継承
- sqlite3 データベースを使用する理由は、負荷を均衡させ、データベースのパフォーマンスに大きな要求がないためです。
コード実装#
データベース操作モジュール#
- 操作クラスを作成
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False) //①
print(f'データベース{Db_name}に接続成功')
self.db=conn
self.cursor=conn.cursor()
上記のコードはデータベースを初期化します。
①の部分は、データベースが存在する場合は接続し、存在しない場合は作成します。このデータベースはマルチスレッド操作が可能です。
- テーブルが存在するか確認
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""//①
return self.cursor.execute(sql).fetchone()[0]
①の部分は SQL 文で、chatId
という名前のテーブルを探します。
return self.cursor.execute(sql).fetchone()[0]
の部分で文を実行し、実行結果を返します。
- 単語分割データテーブルを作成
- グループ単語頻度テーブル
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE, //単語フィールド ①
times SMALLINT //単語頻度
)
"""
self.cursor.execute(sql)
self.db.commit()
①の部分は単語フィールドです。中国語が含まれるため、NCHAR 型に設定し、UNIQUE に設定します。後で使い方を説明します。
- ユーザー発言回数テーブル
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE, //ユーザーID
times SMALLINT, //発言回数
name NCHAR(20) //ユーザー名
)
"""
self.cursor.execute(sql)
self.db.commit()
- 単語を追加し、頻度を更新
- 単語頻度
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user" //このグループの単語テーブル名
if not self.check_table(chatId)://以前の関数を呼び出してテーブルが存在するか確認
self.creat_user_table(chatId)//存在しない場合は作成
times=int(self.serch_user_data(chatId,userId))+1 //回数を1増やす
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}') //①
"""
self.cursor.execute(sql)
self.db.commit()
return True
①の部分の SQL 文は、UNIQUE フィールドが一致する場合はデータを更新し、一致しない場合はフィールドを作成することを意味します。
- ユーザー発言
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
基本的には単語頻度統計部分と一致します。
- 単語頻度を抽出(ワードクラウド生成に使用)
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}'; //①
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
①の部分の SQL 文は、chatId
テーブル内のすべての値を検索し、返す形式は(("単語1","回数"),("単語2","回数")...)
です。したがって、dict () 関数を使用してこのタプルを辞書に変換できます。
- すべてのテーブルを検索
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
- すべてのテーブルを削除
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
周期後に新しいワードクラウドを生成するために使用します。
データベース操作モジュールの完全なコード#
db_manager.py
import sqlite3
class db_manager:
def __init__(self,Db_name):
conn = sqlite3.connect(Db_name,check_same_thread=False)
print(f'データベース{Db_name}に接続成功')
self.db=conn
self.cursor=conn.cursor()
def creat_group_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE,
times SMALLINT
)
"""
self.cursor.execute(sql)
self.db.commit()
def creat_user_table(self,chatId):
sql=f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE,
times SMALLINT,
name NCHAR(20)
)
"""
self.cursor.execute(sql)
self.db.commit()
def check_table(self,chatId):
sql=f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""
return self.cursor.execute(sql).fetchone()[0]
def serch_group_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE groupData = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def serch_user_data(self,chatId,message):
sql = f'''
SELECT * FROM '{chatId}' WHERE userId = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result=0
return result
def add_message(self,chatId,message):
chatId=f"{chatId}_group"
if not self.check_table(chatId):
self.creat_group_table(chatId)
times=int(self.serch_group_data(chatId,message))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{message}','{times}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def add_user(self,chatId,userId,name):
chatId=f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times=int(self.serch_user_data(chatId,userId))+1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def serch_all(self,chatId):
sql = f'''
SELECT * FROM '{chatId}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result=None
return result
def all_table(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
result=[]
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
def del_all(self):
sql=f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data=self.cursor.execute(sql).fetchall()
for i in data:
sql=f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
ボットのメインプログラム部分#
- パッケージをインポート
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //先に書いた操作モジュールをインポート
db=dm("data.db") //クラスをインスタンス化
mk = imageio.imread("circle.png") //画像を読み込む(ワードクラウド生成に使用)
- ユーザー名を取得する関数を先に書いて、メッセージ処理プログラムで使用します
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- メッセージ処理関数
def msg_handler(update,context): //後でメッセージ処理器を作成して、この関数にパラメータを渡し続けます
try: //ユーザーのメッセージテキストを取得しようとします
text = update.message.text
except:
return
userId = str(update.effective_user.id) //ユーザーIDを取得
chatId = str(update.effective_message.chat_id) //グループIDを取得
name=get_name(update) //ユーザー名を取得(2の関数を呼び出す)
db.add_user(chatId, userId, name) //add_userメソッドを呼び出す
text=re.sub('\W*', '',text) //正規表現を使用して、句読点を除去
if len(text)<2:
return //単語が1文字の場合、終了し、分割しない
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True)) //7文字未満の場合はcut_allパラメータを使用してすべての可能な単語を見つけ、大きい場合は精密マッチを使用
words=pseg.cut(text,use_paddle=True)
for word ,flag in words: //返される形式は(("単語1","品詞"),("単語2","品詞")...)
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]: //虚詞の干渉を除去
if len(word)>1: //単語(つまり2文字以上)の場合
db.add_message(chatId, word) //データベースに追加
return //終了
- ワードクラウド画像生成関数
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf', //フォントを読み込む
mask=mk, //画像
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
//generate_from_frequenciesは辞書に基づいて単語の重みを加え、出力をこのディレクトリにgroupid+_pic.pngという名前のファイルにします
- 手動生成画像の状況
def make(update,context):
group = str(update.effective_message.chat_id) //生成をリクエストしたグループを取得
dic_to_pic(group) //画像を生成
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
) //画像を送信
os.remove(f"{group}_pic.png") //画像を削除
except:
pass
- 定期的なタスクの状況
def schedule_task():
for group in db.all_table(): //すべてのテーブルを検索し、メッセージのあるすべてのグループを反復
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- すべてのデータをクリア
def reset():
db.del_all()
- 定期的なタスク
def check_schedule():
while True:
try:
schedule.run_pending() //期限が来た定期的なタスクがあるか確認
time.sleep(1)
except:
time.sleep(7200)
この関数は後で使用されます。
- メインプログラム
if __name__ == '__main__':
TOKEN=123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx //TOKENは自分で申請して入力
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同時に並行タスク数、調整可能
dp = updater.dispatcher //トリガー
print("動作中")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //定期的なタスクを追加
threading.Thread(target=check_schedule).start() //新しいスレッドを作成して8の関数を実行
dp.add_handler(CommandHandler("make", make, run_async=True)) //コマンドトリガーを作成し、ユーザーが/makeを送信するとmake関数が実行されます
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //メッセージトリガーを作成し、条件は純粋なテキスト
こうしてボットの権限を追加した後、正常に動作します!
ボットのメインプログラム部分の完全なコード#
bot.py
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //先に書いた操作モジュールをインポート
db=dm("data.db") //クラスをインスタンス化
mk = imageio.imread("circle.png") //画像を読み込む(ワードクラウド生成に使用)
def get_name(update):
user = update.message.from_user
firstname = str(user["first_name"])
lastname = str(user["last_name"])
name = ""
if firstname != "None":
name = firstname + " "
if lastname != "None":
name += lastname
if len(name) == 0:
try:
name = update.effective_user.username
except:
name = update.effective_user.id
return name
def msg_handler(update,context):
try:
text = update.message.text
except:
return
userId = str(update.effective_user.id)
chatId = str(update.effective_message.chat_id)
name=get_name(update)
db.add_user(chatId, userId, name)
text=re.sub('\W*', '',text)
if len(text)<2:
return
elif len(text)<7:
text=' '.join(jieba.cut(text, cut_all=True, HMM=True))
words=pseg.cut(text,use_paddle=True)
for word ,flag in words:
if flag in ["n", "nr","a","v","vd","nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw","vn"]:
if len(word)>1:
db.add_message(chatId, word)
return
def make(update,context):
group = str(update.effective_message.chat_id)
dic_to_pic(group)
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
)
os.remove(f"{group}_pic.png")
except:
pass
def dic_to_pic(chatId):
group=chatId
chatId=f"{chatId}_group"
result=db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf',
mask=mk,
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
def reset():
db.del_all()
def check_schedule():
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
time.sleep(7200)
if __name__ == '__main__':
TOKEN="123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //TOKENは自分で申請して入力
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN,workers=10) //同時に並行タスク数、調整可能
dp = updater.dispatcher //トリガー
print("動作中")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //定期的なタスクを追加
threading.Thread(target=check_schedule).start() //新しいスレッドを作成して8の関数を実行
dp.add_handler(CommandHandler("make", make, run_async=True)) //コマンドトリガーを作成し、ユーザーが/makeを送信するとmake関数が実行されます
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //メッセージトリガーを作成し、条件は純粋なテキスト
まとめ#
[todo-t] 単語頻度を統計する [/todo-t]
[todo-t] ユーザーの発言を統計する [/todo-t]
[todo-t] 指令を受けてワードクラウドを生成する [/todo-t]
[todo-t] 定期的にワードクラウドを生成し、グループに送信する [/todo-t]
[todo-f] ユーザーの熱度を統計する [/todo-f]
[todo-f] 権限制限 [/todo-f]