Word Cloud Bot: Original Project Address: https://github.com/devourbots/word_cloud_bot#
Optimization Reason:#
After reviewing the code, it was found that the sentence analysis of this bot is implemented during scheduled tasks or rank executions, and generally just accepts user chat records. Therefore, when used in large groups, it can cause pressure mainly due to uneven load. Thus, I plan to migrate the sentence analysis to normal times (i.e., analyze one sentence at a time).
Usage Environment#
- Following the use of Python 3
- Using SQLite3 database because it balances the load and does not have high performance requirements for the database.
Code Implementation#
Database Operation Module#
- Establish an operation class
import sqlite3
class db_manager:
def __init__(self, Db_name):
conn = sqlite3.connect(Db_name, check_same_thread=False) //①
print(f'Connected to database {Db_name} successfully')
self.db = conn
self.cursor = conn.cursor()
The above code initializes a database.
At point ①, if the database exists, it connects; if not, it creates one, and this database can be operated in multiple threads.
- Check if the table exists
def check_table(self, chatId):
sql = f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'""" //①
return self.cursor.execute(sql).fetchone()[0]
At point ①, the SQL statement means to find the table named chatId
.
return self.cursor.execute(sql).fetchone()[0]
executes the statement and returns the result.
- Create a word frequency table
- Group word frequency table
def creat_group_table(self, chatId):
sql = f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE, //Word field ①
times SMALLINT //Word frequency
)
"""
self.cursor.execute(sql)
self.db.commit()
At point ①, it is the word field. Since Chinese characters may appear, it is set as NCHAR type and then set as UNIQUE, which will be explained later.
- User speaking frequency table
def creat_user_table(self, chatId):
sql = f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE, //User ID
times SMALLINT, //Speaking frequency
name NCHAR(20) //Username
)
"""
self.cursor.execute(sql)
self.db.commit()
- Add words, update word frequency
- Word frequency
def add_user(self, chatId, userId, name):
chatId = f"{chatId}_user" //This group's word table name
if not self.check_table(chatId): //Call the previous function to check if the table exists
self.creat_user_table(chatId) //If not, create it
times = int(self.serch_user_data(chatId, userId)) + 1 //Increase count by one
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}') //①
"""
self.cursor.execute(sql)
self.db.commit()
return True
At point ①, the SQL statement means: if the UNIQUE field in the table exists, update the data; if not, create the field.
- User speaking
def add_user(self, chatId, userId, name):
chatId = f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times = int(self.serch_user_data(chatId, userId)) + 1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
Basically consistent with the word frequency statistics part.
- Extract word frequency (for generating word clouds)
def serch_all(self, chatId):
sql = f'''
SELECT * FROM '{chatId}'; //①
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result = None
return result
At point ①, the SQL statement is to retrieve all values from the chatId
table, returning the format as (("word1","count"),("word2","count")...)
, so it can be converted into a dictionary using the dict() function.
- Find all tables
def all_table(self):
sql = f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data = self.cursor.execute(sql).fetchall()
result = []
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
- Delete all tables
def del_all(self):
sql = f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data = self.cursor.execute(sql).fetchall()
for i in data:
sql = f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
Used to collect new word clouds after a cycle.
Complete Code of Database Operation Module#
db_manager.py
import sqlite3
class db_manager:
def __init__(self, Db_name):
conn = sqlite3.connect(Db_name, check_same_thread=False)
print(f'Connected to database {Db_name} successfully')
self.db = conn
self.cursor = conn.cursor()
def creat_group_table(self, chatId):
sql = f"""CREATE TABLE "{chatId}"(
groupData NCHAR(20) UNIQUE,
times SMALLINT
)
"""
self.cursor.execute(sql)
self.db.commit()
def creat_user_table(self, chatId):
sql = f"""CREATE TABLE "{chatId}"(
userId CHAR(20) UNIQUE,
times SMALLINT,
name NCHAR(20)
)
"""
self.cursor.execute(sql)
self.db.commit()
def check_table(self, chatId):
sql = f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""
return self.cursor.execute(sql).fetchone()[0]
def serch_group_data(self, chatId, message):
sql = f'''
SELECT * FROM '{chatId}' WHERE groupData = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result = 0
return result
def serch_user_data(self, chatId, message):
sql = f'''
SELECT * FROM '{chatId}' WHERE userId = '{message}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchone()[1]
except Exception as e:
result = 0
return result
def add_message(self, chatId, message):
chatId = f"{chatId}_group"
if not self.check_table(chatId):
self.creat_group_table(chatId)
times = int(self.serch_group_data(chatId, message)) + 1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{message}','{times}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def add_user(self, chatId, userId, name):
chatId = f"{chatId}_user"
if not self.check_table(chatId):
self.creat_user_table(chatId)
times = int(self.serch_user_data(chatId, userId)) + 1
sql = f"""
REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
"""
self.cursor.execute(sql)
self.db.commit()
return True
def serch_all(self, chatId):
sql = f'''
SELECT * FROM '{chatId}';
'''
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
except:
result = None
return result
def all_table(self):
sql = f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data = self.cursor.execute(sql).fetchall()
result = []
for i in data:
if "group" in i[0]:
result.append(i[0].split("_")[0])
return result
def del_all(self):
sql = f"""
SELECT name _id FROM sqlite_master WHERE type ='table'
"""
data = self.cursor.execute(sql).fetchall()
for i in data:
sql = f"""
DROP TABLE '{i[0]}'
"""
self.cursor.execute(sql)
self.db.commit()
return True
Main Program Part of the Bot#
- Import packages
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //Import the previously written operation module
db = dm("data.db") //Instantiate the class
mk = imageio.imread("circle.png") //Read image (for generating word cloud)
- First, write a function to get the username for easy message handling
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- Message handling function
def msg_handler(update, context): //Later, a message handler will be written to continuously pass parameters to this function
try: //Try to get the user message text
text = update.message.text
except:
return
userId = str(update.effective_user.id) //Get user ID
chatId = str(update.effective_message.chat_id) //Get group ID
name = get_name(update) //Get username (call the function in 2)
db.add_user(chatId, userId, name) //Call add_user method
text = re.sub('\W*', '', text) //Use regular expression to remove punctuation
if len(text) < 2:
return //If it is a single character, end and do not continue word segmentation
elif len(text) < 7:
text = ' '.join(jieba.cut(text, cut_all=True, HMM=True)) //If less than 7 characters, use cut_all parameter to find all possible words; if more, use precise matching
words = pseg.cut(text, use_paddle=True)
for word, flag in words: //Return format (("word1","part of speech"),("word2","part of speech")...)
if flag in ["n", "nr", "a", "v", "vd", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw", "vn"]: //Remove interference from function words
if len(word) > 1: //If it is a word (i.e., greater than or equal to two characters)
db.add_message(chatId, word) //Add to database
return //End
- Word cloud image generation function
def dic_to_pic(chatId):
group = chatId
chatId = f"{chatId}_group"
result = db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf', //Read font
mask=mk, //Image
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
//generate_from_frequencies generates word cloud based on weighted word frequency from dictionary, output to the current directory with groupid+_pic.png as the filename
- Manual image generation situation
def make(update, context):
group = str(update.effective_message.chat_id) //Get the group for which the image is requested to be generated
dic_to_pic(group) //Generate image
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
) //Send image
os.remove(f"{group}_pic.png") //Delete image
except:
pass
- Scheduled task situation
def schedule_task():
for group in db.all_table(): //Find all tables and iterate through all groups with messages
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
- Clear all data
def reset():
db.del_all()
- Scheduled tasks
def check_schedule():
while True:
try:
schedule.run_pending() //Check if there are any pending scheduled tasks
time.sleep(1)
except:
time.sleep(7200)
This function will be used later.
- Main program
if __name__ == '__main__':
TOKEN = "123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //Fill in the TOKEN you applied for
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN, workers=10) //Number of concurrent tasks, can be adjusted
dp = updater.dispatcher //Trigger
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //Add scheduled tasks
threading.Thread(target=check_schedule).start() //Create a new thread to run the function in 8
dp.add_handler(CommandHandler("make", make, run_async=True)) //Create a command trigger, when the user sends /make, the make function will be executed
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //Create a message trigger, filter condition is plain text
Thus, after adding bot permissions, it can run normally!
Complete Code of Main Program Part#
bot.py
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //Import the previously written operation module
db = dm("data.db") //Instantiate the class
mk = imageio.imread("circle.png") //Read image (for generating word cloud)
def get_name(update):
user = update.message.from_user
firstname = str(user["first_name"])
lastname = str(user["last_name"])
name = ""
if firstname != "None":
name = firstname + " "
if lastname != "None":
name += lastname
if len(name) == 0:
try:
name = update.effective_user.username
except:
name = update.effective_user.id
return name
def msg_handler(update, context):
try:
text = update.message.text
except:
return
userId = str(update.effective_user.id)
chatId = str(update.effective_message.chat_id)
name = get_name(update)
db.add_user(chatId, userId, name)
text = re.sub('\W*', '', text)
if len(text) < 2:
return
elif len(text) < 7:
text = ' '.join(jieba.cut(text, cut_all=True, HMM=True))
words = pseg.cut(text, use_paddle=True)
for word, flag in words:
if flag in ["n", "nr", "a", "v", "vd", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw", "vn"]:
if len(word) > 1:
db.add_message(chatId, word)
return
def make(update, context):
group = str(update.effective_message.chat_id)
dic_to_pic(group)
try:
context.bot.send_photo(
chat_id=group,
photo=open(f"{group}_pic.png", "rb")
)
os.remove(f"{group}_pic.png")
except:
pass
def dic_to_pic(chatId):
group = chatId
chatId = f"{chatId}_group"
result = db.serch_all(chatId)
if result:
wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path='font.ttf',
mask=mk,
scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")
def schedule_task():
for group in db.all_table():
try:
dic_to_pic(group)
bot.send_photo(
chat_id=group,
photo=open("{}_pic.png".format(group), "rb")
)
os.remove(f"{group}_pic.png")
except:
continue
def reset():
db.del_all()
def check_schedule():
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
time.sleep(7200)
if __name__ == '__main__':
TOKEN = "123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //Fill in the TOKEN you applied for
bot = telegram.Bot(token=TOKEN)
updater = Updater(token=TOKEN, workers=10) //Number of concurrent tasks, can be adjusted
dp = updater.dispatcher //Trigger
print("working")
schedule.every().day.at('10:00').do(schedule_task)
schedule.every().day.at('12:10').do(schedule_task)
schedule.every().day.at('18:00').do(schedule_task)
schedule.every().day.at('22:00').do(schedule_task)
schedule.every().day.at('04:00').do(schedule_task)
schedule.every().day.at('04:08').do(reset) //Add scheduled tasks
threading.Thread(target=check_schedule).start() //Create a new thread to run the function in 8
dp.add_handler(CommandHandler("make", make, run_async=True)) //Create a command trigger, when the user sends /make, the make function will be executed
dp.add_handler(MessageHandler(Filters.text, msg_handler)) //Create a message trigger, filter condition is plain text
Summary#
[todo-t] Count word frequency [/todo-t]
[todo-t] Count user speaking [/todo-t]
[todo-t] Accept commands to generate word clouds [/todo-t]
[todo-t] Generate word clouds regularly and send to groups [/todo-t]
[todo-f] User popularity statistics [/todo-f]
[todo-f] Permission restrictions [/todo-f]