Allen_Jeo

Allen_Jeo

Word Cloud Robot Optimization Implementation

Word Cloud Bot: Original Project Address: https://github.com/devourbots/word_cloud_bot#

Optimization Reason:#

After reviewing the code, it was found that the sentence analysis of this bot is implemented during scheduled tasks or rank executions, and generally just accepts user chat records. Therefore, when used in large groups, it can cause pressure mainly due to uneven load. Thus, I plan to migrate the sentence analysis to normal times (i.e., analyze one sentence at a time).

Usage Environment#

  1. Following the use of Python 3
  2. Using SQLite3 database because it balances the load and does not have high performance requirements for the database.

Code Implementation#

Database Operation Module#

  1. Establish an operation class
import sqlite3
class db_manager:
    def __init__(self, Db_name):
        conn = sqlite3.connect(Db_name, check_same_thread=False) //
        print(f'Connected to database {Db_name} successfully')
        self.db = conn
        self.cursor = conn.cursor()

The above code initializes a database.
At point ①, if the database exists, it connects; if not, it creates one, and this database can be operated in multiple threads.

  1. Check if the table exists
    def check_table(self, chatId):
        sql = f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'""" //
        return self.cursor.execute(sql).fetchone()[0]

At point ①, the SQL statement means to find the table named chatId.
return self.cursor.execute(sql).fetchone()[0] executes the statement and returns the result.

  1. Create a word frequency table
  • Group word frequency table
    def creat_group_table(self, chatId):
        sql = f"""CREATE TABLE "{chatId}"(
            groupData NCHAR(20) UNIQUE, //Word field ①
            times SMALLINT  //Word frequency
        )
        """
        self.cursor.execute(sql)
        self.db.commit()

At point ①, it is the word field. Since Chinese characters may appear, it is set as NCHAR type and then set as UNIQUE, which will be explained later.

  • User speaking frequency table
    def creat_user_table(self, chatId):
        sql = f"""CREATE TABLE "{chatId}"(
            userId CHAR(20) UNIQUE, //User ID
            times SMALLINT, //Speaking frequency
            name NCHAR(20) //Username
        )
        """
        self.cursor.execute(sql)
        self.db.commit()
  1. Add words, update word frequency
  • Word frequency
    def add_user(self, chatId, userId, name):
        chatId = f"{chatId}_user" //This group's word table name
        if not self.check_table(chatId): //Call the previous function to check if the table exists
            self.creat_user_table(chatId) //If not, create it
        times = int(self.serch_user_data(chatId, userId)) + 1 //Increase count by one
        sql = f"""
        REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}') //①
        """
        self.cursor.execute(sql)
        self.db.commit()
        return True

At point ①, the SQL statement means: if the UNIQUE field in the table exists, update the data; if not, create the field.

  • User speaking
    def add_user(self, chatId, userId, name):
        chatId = f"{chatId}_user"
        if not self.check_table(chatId):
            self.creat_user_table(chatId)
        times = int(self.serch_user_data(chatId, userId)) + 1
        sql = f"""
        REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
        """
        self.cursor.execute(sql)
        self.db.commit()
        return True

Basically consistent with the word frequency statistics part.

  1. Extract word frequency (for generating word clouds)
    def serch_all(self, chatId):
        sql = f'''
            SELECT * FROM '{chatId}'; //①
            '''
        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchall()
        except:
            result = None
        return result

At point ①, the SQL statement is to retrieve all values from the chatId table, returning the format as (("word1","count"),("word2","count")...), so it can be converted into a dictionary using the dict() function.

  1. Find all tables
    def all_table(self):
        sql = f"""
        SELECT name _id FROM sqlite_master WHERE type ='table'
        """
        data = self.cursor.execute(sql).fetchall()
        result = []
        for i in data:
            if "group" in i[0]:
                result.append(i[0].split("_")[0])
        return result
  1. Delete all tables
    def del_all(self):
        sql = f"""
        SELECT name _id FROM sqlite_master WHERE type ='table'
        """
        data = self.cursor.execute(sql).fetchall()
        for i in data:
            sql = f"""
            DROP TABLE '{i[0]}'
            """
            self.cursor.execute(sql)
            self.db.commit()
        return True

Used to collect new word clouds after a cycle.

Complete Code of Database Operation Module#

db_manager.py

import sqlite3
class db_manager:
    def __init__(self, Db_name):
        conn = sqlite3.connect(Db_name, check_same_thread=False)
        print(f'Connected to database {Db_name} successfully')
        self.db = conn
        self.cursor = conn.cursor()

    def creat_group_table(self, chatId):
        sql = f"""CREATE TABLE "{chatId}"(
            groupData NCHAR(20) UNIQUE,
            times SMALLINT
        )
        """
        self.cursor.execute(sql)
        self.db.commit()

    def creat_user_table(self, chatId):
        sql = f"""CREATE TABLE "{chatId}"(
            userId CHAR(20) UNIQUE,
            times SMALLINT,
            name NCHAR(20)
        )
        """
        self.cursor.execute(sql)
        self.db.commit()

    def check_table(self, chatId):
        sql = f"""SELECT count(*) FROM sqlite_master WHERE type="table" AND name = '{chatId}'"""
        return self.cursor.execute(sql).fetchone()[0]

    def serch_group_data(self, chatId, message):
        sql = f'''
            SELECT * FROM '{chatId}' WHERE groupData = '{message}';
            '''
        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchone()[1]
        except Exception as e:
            result = 0
        return result

    def serch_user_data(self, chatId, message):
        sql = f'''
            SELECT * FROM '{chatId}' WHERE userId = '{message}';
            '''
        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchone()[1]
        except Exception as e:
            result = 0
        return result

    def add_message(self, chatId, message):
        chatId = f"{chatId}_group"
        if not self.check_table(chatId):
            self.creat_group_table(chatId)
        times = int(self.serch_group_data(chatId, message)) + 1
        sql = f"""
        REPLACE INTO '{chatId}' VALUES('{message}','{times}')
        """
        self.cursor.execute(sql)
        self.db.commit()
        return True

    def add_user(self, chatId, userId, name):
        chatId = f"{chatId}_user"
        if not self.check_table(chatId):
            self.creat_user_table(chatId)
        times = int(self.serch_user_data(chatId, userId)) + 1
        sql = f"""
        REPLACE INTO '{chatId}' VALUES('{userId}','{times}','{name}')
        """
        self.cursor.execute(sql)
        self.db.commit()
        return True
        
    def serch_all(self, chatId):
        sql = f'''
            SELECT * FROM '{chatId}';
            '''
        try:
            self.cursor.execute(sql)
            result = self.cursor.fetchall()
        except:
            result = None
        return result

    def all_table(self):
        sql = f"""
        SELECT name _id FROM sqlite_master WHERE type ='table'
        """
        data = self.cursor.execute(sql).fetchall()
        result = []
        for i in data:
            if "group" in i[0]:
                result.append(i[0].split("_")[0])
        return result

    def del_all(self):
        sql = f"""
        SELECT name _id FROM sqlite_master WHERE type ='table'
        """
        data = self.cursor.execute(sql).fetchall()
        for i in data:
            sql = f"""
            DROP TABLE '{i[0]}'
            """
            self.cursor.execute(sql)
            self.db.commit()
        return True

Main Program Part of the Bot#

  1. Import packages
import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //Import the previously written operation module
db = dm("data.db") //Instantiate the class
mk = imageio.imread("circle.png") //Read image (for generating word cloud)
  1. First, write a function to get the username for easy message handling
def schedule_task():
    for group in db.all_table():
        try:
            dic_to_pic(group)
            bot.send_photo(
            chat_id=group,
            photo=open("{}_pic.png".format(group), "rb")
            )
            os.remove(f"{group}_pic.png")
        except:
            continue
  1. Message handling function
def msg_handler(update, context): //Later, a message handler will be written to continuously pass parameters to this function
    try: //Try to get the user message text
        text = update.message.text
    except:
        return
    userId = str(update.effective_user.id) //Get user ID
    chatId = str(update.effective_message.chat_id) //Get group ID
    name = get_name(update) //Get username (call the function in 2)
    db.add_user(chatId, userId, name) //Call add_user method
    text = re.sub('\W*', '', text) //Use regular expression to remove punctuation
    if len(text) < 2:
        return //If it is a single character, end and do not continue word segmentation
    elif len(text) < 7:
        text = ' '.join(jieba.cut(text, cut_all=True, HMM=True)) //If less than 7 characters, use cut_all parameter to find all possible words; if more, use precise matching
    words = pseg.cut(text, use_paddle=True)
    for word, flag in words: //Return format (("word1","part of speech"),("word2","part of speech")...)
        if flag in ["n", "nr", "a", "v", "vd", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw", "vn"]: //Remove interference from function words
            if len(word) > 1: //If it is a word (i.e., greater than or equal to two characters)
                db.add_message(chatId, word) //Add to database
    return //End
  1. Word cloud image generation function
def dic_to_pic(chatId):
    group = chatId
    chatId = f"{chatId}_group"
    result = db.serch_all(chatId)
    if result:
        wordcloud.WordCloud(width=800,
                            height=800,
                            background_color='white',
                            font_path='font.ttf', //Read font
                            mask=mk, //Image
                            scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png") 
														//generate_from_frequencies generates word cloud based on weighted word frequency from dictionary, output to the current directory with groupid+_pic.png as the filename
  1. Manual image generation situation
def make(update, context):
    group = str(update.effective_message.chat_id) //Get the group for which the image is requested to be generated
    dic_to_pic(group) //Generate image
    try:
        context.bot.send_photo(
        chat_id=group,
        photo=open(f"{group}_pic.png", "rb")
        ) //Send image
        os.remove(f"{group}_pic.png") //Delete image
    except:
        pass
  1. Scheduled task situation
def schedule_task():
    for group in db.all_table(): //Find all tables and iterate through all groups with messages
        try:
            dic_to_pic(group)
            bot.send_photo(
            chat_id=group,
            photo=open("{}_pic.png".format(group), "rb")
            )
            os.remove(f"{group}_pic.png")
        except:
            continue
  1. Clear all data
def reset():
    db.del_all()
  1. Scheduled tasks
def check_schedule():
    while True:
        try:
            schedule.run_pending() //Check if there are any pending scheduled tasks
            time.sleep(1)
        except:
            time.sleep(7200)

This function will be used later.

  1. Main program
if __name__ == '__main__':
    TOKEN = "123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //Fill in the TOKEN you applied for
    bot = telegram.Bot(token=TOKEN) 
    updater = Updater(token=TOKEN, workers=10) //Number of concurrent tasks, can be adjusted
    dp = updater.dispatcher //Trigger
    print("working")
    schedule.every().day.at('10:00').do(schedule_task)
    schedule.every().day.at('12:10').do(schedule_task)
    schedule.every().day.at('18:00').do(schedule_task)
    schedule.every().day.at('22:00').do(schedule_task)
    schedule.every().day.at('04:00').do(schedule_task)
    schedule.every().day.at('04:08').do(reset) //Add scheduled tasks
    threading.Thread(target=check_schedule).start() //Create a new thread to run the function in 8
    dp.add_handler(CommandHandler("make", make, run_async=True)) //Create a command trigger, when the user sends /make, the make function will be executed
    dp.add_handler(MessageHandler(Filters.text, msg_handler)) //Create a message trigger, filter condition is plain text

Thus, after adding bot permissions, it can run normally!

Complete Code of Main Program Part#

bot.py

import telegram
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
from telegram import Bot, ParseMode, MessageEntity, User
import jieba
import jieba.posseg as pseg
import logging
import schedule
import time
import re
import threading
from db_manager import db_manager as dm //Import the previously written operation module
db = dm("data.db") //Instantiate the class
mk = imageio.imread("circle.png") //Read image (for generating word cloud)

def get_name(update):
    user = update.message.from_user
    firstname = str(user["first_name"])
    lastname = str(user["last_name"])
    name = ""
    if firstname != "None":
        name = firstname + " "
    if lastname != "None":
        name += lastname
    if len(name) == 0:
        try:
            name = update.effective_user.username
        except:
            name = update.effective_user.id
    return name


def msg_handler(update, context):
    try:
        text = update.message.text
    except:
        return
    userId = str(update.effective_user.id)
    chatId = str(update.effective_message.chat_id)
    name = get_name(update)
    db.add_user(chatId, userId, name)
    text = re.sub('\W*', '', text)
    if len(text) < 2:
        return
    elif len(text) < 7:
        text = ' '.join(jieba.cut(text, cut_all=True, HMM=True))
    words = pseg.cut(text, use_paddle=True)
    for word, flag in words:
        if flag in ["n", "nr", "a", "v", "vd", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw", "vn"]:
            if len(word) > 1:
                db.add_message(chatId, word)
    return

def make(update, context):
    group = str(update.effective_message.chat_id)
    dic_to_pic(group)
    try:
        context.bot.send_photo(
        chat_id=group,
        photo=open(f"{group}_pic.png", "rb")
        )
        os.remove(f"{group}_pic.png")
    except:
        pass

def dic_to_pic(chatId):
    group = chatId
    chatId = f"{chatId}_group"
    result = db.serch_all(chatId)
    if result:
        wordcloud.WordCloud(width=800,
                            height=800,
                            background_color='white',
                            font_path='font.ttf',
                            mask=mk,
                            scale=5).generate_from_frequencies(dict(result)).to_file(f"{group}_pic.png")

def schedule_task():
    for group in db.all_table():
        try:
            dic_to_pic(group)
            bot.send_photo(
            chat_id=group,
            photo=open("{}_pic.png".format(group), "rb")
            )
            os.remove(f"{group}_pic.png")
        except:
            continue

def reset():
    db.del_all()
		
def check_schedule():
    while True:
        try:
            schedule.run_pending()
            time.sleep(1)
        except:
            time.sleep(7200)

if __name__ == '__main__':
    TOKEN = "123456787878:xxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //Fill in the TOKEN you applied for
    bot = telegram.Bot(token=TOKEN) 
    updater = Updater(token=TOKEN, workers=10) //Number of concurrent tasks, can be adjusted
    dp = updater.dispatcher //Trigger
    print("working")
    schedule.every().day.at('10:00').do(schedule_task)
    schedule.every().day.at('12:10').do(schedule_task)
    schedule.every().day.at('18:00').do(schedule_task)
    schedule.every().day.at('22:00').do(schedule_task)
    schedule.every().day.at('04:00').do(schedule_task)
    schedule.every().day.at('04:08').do(reset) //Add scheduled tasks
    threading.Thread(target=check_schedule).start() //Create a new thread to run the function in 8
    dp.add_handler(CommandHandler("make", make, run_async=True)) //Create a command trigger, when the user sends /make, the make function will be executed
    dp.add_handler(MessageHandler(Filters.text, msg_handler)) //Create a message trigger, filter condition is plain text

Summary#

[todo-t] Count word frequency [/todo-t]
[todo-t] Count user speaking [/todo-t]
[todo-t] Accept commands to generate word clouds [/todo-t]
[todo-t] Generate word clouds regularly and send to groups [/todo-t]
[todo-f] User popularity statistics [/todo-f]
[todo-f] Permission restrictions [/todo-f]

File Download#

  1. Image
  2. Font

Please include the source when reprinting the article: https://z-r.cc/2022/05/11/%E8%AF%8D%E4%BA%91%E6%9C%BA%E5%99%A8%E4%BA%BA%E7%9A%84%E4%BC%98%E5%8C%96%E5%AE%9E%E7%8E%B0.html

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.