现有情形下,选择一门适合自己的语言来编写QQ bot才是一个最好的选择。
编辑于 2021.7.16 转载请注明作者
简介
首先,我个人不是什么正经的开发者——甚至没有上过什么课,或者看过什么正经的教程,所需要的东西完全来自于网络星星点点的缝合,因此有很多地方的写法和语法都会非常的令人头大与迷惑。
但尽管如此,我还是希望通过这一篇文章让大部分明白——基于GOCQ开发QQ bot并非一件难事,只要假以时日都能有所进展。
本项目已上传github,如有需要可自行取用作为参考——虽然我觉得大部分人都不需要就是了。
思路
选取链接方式
GO CQHTTP允许采用POST、反向POST、正向Websocket和反向Websocket四种方式进行框架和程序的沟通;出于各种方便的考虑,我选用了websocket进行事件的监听,通过http post进行API的调用——这样的一个好处是,如果完全使用ws,那么你的API调用的回执会混杂在ws中,在高并发环境下分离这些回执信息变得有些复杂。
当选择完成后,则需要考虑使用python进行链接。
首先,我们的主线程一定是以监听事件为主的,因此:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import sys #导入独立包路径 sys.path.append('.\\Lib\\site-packages') #导入模块 #ws模块 import websockets #异步模块 import asyncio #导入自定义模块 import mapping_flow import api #从全局变量管理中获取对接ws的链接 ws_uri = api.glo_get('ws_uri') #从网上抄的异步ws对接 async def async_processing(): async with websockets.connect(ws_uri) as websocket: while True: try: message = await websocket.recv() #将接受到的信息异步的送入消息匹配流程中 await mapping_flow.main_process(message) except websockets.ConnectionClosed: print('ConnectionClosed') break asyncio.get_event_loop().run_until_complete(async_processing()) |
这样的一个对接流程,便能够将接受到的所有信息,送入到mapping_flow模块下的main_process的一个匹配流函数中。
匹配
在匹配过程中,我们需要将多种不同的消息类型完全的分割开来,并且将不同的类型送入到不同的函数中。
根据GO-CQHTTP提供的事件数据,只需要解析json就可以通过一系列判断将其分离。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
async def main_process(ori_data): if ori_data: data = json.loads(ori_data) if data['post_type'] == 'notice': if data['notice_type'] == 'notify': if data['sub_type'] == 'poke': #戳一戳事件 await poke_event(data) elif data['sub_type'] == 'lucky_king': #运气王事件 await common_event() elif data['sub_type'] == 'honor': #群荣耀变化事件 await common_event() else: await unknown_event() elif data['notice_type'] == 'group_upload': #群文件上传事件 await common_event() elif data['notice_type'] == 'group_admin': #群管理变动事件 await common_event() elif data['notice_type'] == 'group_increase': #群成员增加事件 await common_event() elif data['notice_type'] == 'group_decrease': #群成员减少事件 await common_event() . . . |
在实际开发时可以只选择自己需要的事件进行分离,其余不需要的直接丢弃——在每个elif后面,则是对json信息的一个校验,检查是否属于当前事件,如果是,那么就异步调用(await)后面的event函数。
这样,我们只需要定义event函数内部的内容,就可以控制bot在收到指定消息的时候进行相对应的处理。
1 2 3 4 |
#戳一戳事件处理 async def poke_event(data): await setu.setu(data, False) return |
比如上文中,当发生了戳一戳事件时(poke_event),则会直接调用setu模块的setu函数,其中传入的两个参数分别为接收到的消息数据data和一个是否r18的Flag。
调用API
那么我们已经接收到了事件,该如何回应呢?
如果我们调用print,那它只会从console里面发送出一段log,如果要让bot能够在QQ中进行回复,我们必须要调用POST方法,将自己需要回复的内容反馈给GOCQ框架提供的API。
因此我新建了一个api模块,专门用以储存封装GOCQ现有的API方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import json from aiohttp import http from asyncio.base_events import Server import aiohttp import asyncio import re import requests #上报通用函数 async def post(data, url = http_url): async with aiohttp.ClientSession() as session: headers = { 'Content-Type': 'application/json' } jdata = json.dumps(data['params']) async with session.post(url + data['action'], data=jdata, headers=headers) as response: res = await response.text() if res: return json.loads(res) return #私聊消息发送函数封装 async def send_private_msg(qq, group, msg): data, params= dict(), dict() params['user_id'] = qq params['group_id'] = group if group else 0 params['message'] = msg data['action'] = 'send_private_msg' data['params'] = params res = await post(data) return res #群聊消息发送 async def send_group_msg(group, msg): data, params= dict(), dict() params['group_id'] = group if group else 0 params['message'] = msg data['action'] = 'send_group_msg' data['params'] = params res = await post(data) return res #图片cq码 async def cq_pic(file): return '[CQ:image,file=' + file + ']' #自适应消息发送 async def send_msg(qq, group, msg): data, params = dict(), dict() if group == 0 and qq!=0: params['user_id'] = qq elif group!=0: params['group_id'] = group params['message'] = msg data['action'] = 'send_msg' data['params'] = params res = await post(data) return res #已知消息,跟踪用户对象 class MsgUser: def __init__(self, data): self.qq = data.get('user_id', 0) self.group = data.get('group_id', 0) return #重定向 async def redirect_qq(self, user_id): self.qq = user_id return async def redirect_group(self, group_id): self.group = group_id return #自适应回复 async def send(self,msg): msg_id = await send_msg(self.qq, self.group, msg) return msg_id #快捷获取昵称 async def name(self): if self.name: self.name = await get_name(self.qq, self.group) return self.name |
请注意,上文的代码只是一个简单的摘抄和举例!并不能真正的使用!如果想要得到完整版代码,还请前往github一探究竟。
回应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import sqlite3 import re import api async def setu(data, r_18 = False): #建立数据库连接 conn = sqlite3.connect('.\\database\\setu.db') #获取游标对象 cursor = conn.cursor() state = 'SETU' if r_18 else 'SETU2' #随机提取色图 sql_sen = 'SELECT * FROM ' + state + ' ORDER BY RANDOM() limit 1;' cursor.execute(sql_sen) value = cursor.fetchall() ori_url = value[0][3] pid = str(value[0][0]) #根据footer和pid重新拼接新的储存桶链接 footer_obj = re.search('_p\d(\.[a-z]{3})',ori_url,re.M|re.I) footer = footer_obj.group(1) url = 'https://picbucket-1257117970.cos.ap-beijing.myqcloud.com/' + pid + footer #建立消息对象 user = api.MsgUser(data) await user.send(await api.cq_pic(url)) #关闭数据库服务 conn.close() return |
这是一个简单的程序,提供了一个向QQ群发送图片的示例。
去耦合
但是我们注意到,这样的匹配方案中,我们编写“插件”,每次都需要修改mapping_flow中的内容,这样的结果并不是我想要的——我希望能够通过一个方案,不修改mapping_flow的内容来达到增加可匹配关键词的结果——这不仅是一种去耦合的要求,也是一种灵活性的要求。
因此我采用了铃心的传统,将大部分的消息事件分为了私聊和群聊两类,并且进行了三种匹配方式的导入(正则、完全、前缀)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#mapping_flow.py async def flow_deal(data, command:str, flow): for obj in flow: key = obj['key'] #进行正则匹配 if obj['match_type'] == 'reg': #正则表达式匹配 match_obj = re.match(key, command, re.M|re.I) #如果成功 if match_obj: #调用函数 await obj['function'](data, command) break #进行完全匹配 if obj['match_type'] == 'abs' and command == key: await obj['function'](data, command) break #进行前缀匹配 if obj['match_type'] == 'pre' and command.startswith(key): await obj['function'](data, command) break return #戳一戳事件处理 async def poke_event(data): await setu.setu(data, False) return #私聊事件 async def private_event(data): msg = str(data['message']) #多匹配机制 if msg.startswith(('.', '。')): #清除空格和首位 command = msg[1:].strip() #进行循环匹配 await flow_deal(data, command, match_flow_private) return #群聊事件 async def group_event(data): msg = str(data['message']) #清除前缀干扰并且添加at标识 if msg.startswith(at_id): msg = msg.rstrip(at_id).strip() flag_at = True #多匹配机制 if msg.startswith(('.', '。')): #清除空格和首位 command = msg[1:].strip() #进行循环匹配 await flow_deal(data, command, match_flow_group) return |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
#api.py global_dict = {} match_map ={'private':[], 'group':[]} #进行全局变量管理 def glo_set(key, value): global_dict[key] = value return def glo_get(key, defValue = None): try: return global_dict[key] except KeyError: return defValue async def asy_glo_set(key, value): global_dict[key] = value return async def asy_glo_get(key, defValue = None): try: return global_dict[key] except KeyError: return defValue #设置常用全局变量 glo_set('secret', secret) glo_set('ws_uri', ws_uri) glo_set('http_url', http_url) glo_set('self_id', self_id) #更新匹配结构 def match_update(msg_type: str, key: str, fun: str, match_type = 'reg', priority = 100): #优先级越大越优先,默认为100 #match_map本身类型为承载msg_type触发类型的字典 #msg_type下则为一个依照优先级顺序排序的列表 #列表中每个元素对应不同回复的属性字典 index = -1 for i in range(0, len(match_map[msg_type])-1): #每次插入进行一次独立排序,得到优先级队列 if match_map[msg_type][i]['priority'] < priority: index = i break match_map[msg_type].insert(index, {'match_type':match_type, 'key':key, 'function':fun, 'priority':priority}) print("已导入: %s 的回复" % (key)) return def get_match_map(): return match_map |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
#setu.py #用于异步调用的函数 async def send_setu(data, command_obj): await setu(data, False) return async def send_setub(data, command_obj): await setu(data, True) return #匹配类型:私聊、匹配依据:setu、调用函数:send_setu、匹配方式:前缀匹配:prefix、匹配优先级:50(越大越优先) api.match_update('private', 'setu', send_setu, 'pre', 50) api.match_update('private', 'setub', send_setub, 'pre', 50) api.match_update('group', 'setu', send_setu, 'pre', 50) |
值得注意的是,因为python的特殊性,在api.py中加入了一个全局变量管理模块,用以在多文件间传递变量。

添加secret
对于公网上的服务器,请务必添加secret防止恶意攻击。
Comments | NOTHING