578 lines
23 KiB
Python
578 lines
23 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
import psycopg2
|
|
import requests
|
|
import websockets
|
|
|
|
from database import (user_exists, insert_user, guild_exists, insert_guild,
|
|
insert_poll, insert_vote_options, bet, get_user_vote, change_bet,
|
|
is_last_poll_created_opened, get_poll_id_opened, close_poll, get_bet_value,
|
|
get_bet_value_multipliers, get_vote_id_by_vote_option_and_poll_id,
|
|
get_users_and_bet_value_by_vote_id, add_user_points, minus_user_points, get_user_points,
|
|
get_user_by_id)
|
|
from enums import OpCode, EventTitle, InteractionType, ApplicationCommand
|
|
|
|
with open('configuration.json', 'r') as file:
|
|
CONFIG = json.load(file)
|
|
|
|
APPLICATION_ID: int = CONFIG["APPLICATION_ID"]
|
|
GATEWAY_URL: str = "wss://gateway.discord.gg/?v=10&encoding=json"
|
|
API_URL: str = "https://discord.com/api/v10"
|
|
TOKEN: str = CONFIG["TOKEN"]
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
database=CONFIG["DB_NAME"],
|
|
user=CONFIG["DB_USER"],
|
|
password=CONFIG["DB_PASSWORD"],
|
|
host=CONFIG["DB_ADDRESS"],
|
|
port=CONFIG["DB_PORT"]
|
|
)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
async def create_command():
|
|
bodies = [
|
|
{
|
|
"name": "poll",
|
|
"type": ApplicationCommand.SUB_COMMAND,
|
|
"description": "Des votes sur lesquels gamble !",
|
|
},
|
|
{
|
|
"name": "bet",
|
|
"type": ApplicationCommand.SUB_COMMAND,
|
|
"description": "Pour pouvoir planifier le prochain pari !",
|
|
"options": [
|
|
{
|
|
"name": "somme",
|
|
"required": True,
|
|
"min_value": 0,
|
|
"description": "La somme du prochain pari",
|
|
"type": ApplicationCommand.INTEGER
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "close",
|
|
"type": ApplicationCommand.SUB_COMMAND,
|
|
"description": "Pour clôturer un sondage",
|
|
"options": [
|
|
{
|
|
"name": "option",
|
|
"description": "L'issue du sondage",
|
|
"required": True,
|
|
"type": ApplicationCommand.STRING
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "profil",
|
|
"type": ApplicationCommand.SUB_COMMAND,
|
|
"description": "Afficher des informations sur un utilisateur",
|
|
"options": [
|
|
{
|
|
"name": "user",
|
|
"description": "La personne concernée",
|
|
"required": False,
|
|
"type": ApplicationCommand.USER
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
for body in bodies:
|
|
requests.post(f"{API_URL}/applications/{APPLICATION_ID}/commands", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
|
|
|
|
async def init_commands():
|
|
res = requests.get(f"{API_URL}/applications/{APPLICATION_ID}/commands",
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
commands = json.loads(res.content)
|
|
for command in commands:
|
|
response = requests.delete(f"{API_URL}/applications/{APPLICATION_ID}/commands/{command['id']}",
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if response.status_code == 204:
|
|
print("Suppression des commandes: Supprimé avec succès !")
|
|
else:
|
|
print("Suppression des commandes: Une erreur est survenue")
|
|
await create_command()
|
|
|
|
|
|
async def create_poll(guild_id: str, creator_id: str, poll_created_id: str, interaction_id: str,
|
|
interaction_token: str,
|
|
components: list[dict]):
|
|
question = next(
|
|
(comp["components"][0]["value"] for comp in components if comp["components"][0]["custom_id"] == "label"),
|
|
"Question du poll"
|
|
)
|
|
|
|
vote_options = ""
|
|
buttons = []
|
|
vote_text_mem = [] # mémoire pour détecter les doublons
|
|
for index, comp in enumerate(components):
|
|
if "vote" in comp["components"][0]["custom_id"] and comp["components"][0]["value"] != "":
|
|
vote_text = comp["components"][0]["value"]
|
|
if vote_text not in vote_text_mem:
|
|
vote_text_mem.append(vote_text)
|
|
vote_options += f"Option {index}: {vote_text}\n"
|
|
buttons.append({
|
|
"type": 2,
|
|
"label": vote_text,
|
|
"style": 1,
|
|
"custom_id": f"{poll_created_id}_{vote_text}_{index}"
|
|
})
|
|
|
|
action_rows = [{"type": 1, "components": buttons[i:i + 5]} for i in range(0, len(buttons), 5)]
|
|
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"embeds": [
|
|
{
|
|
"title": "Les paris sont ouverts !",
|
|
"description": "Vous pouvez parier vos points, ils ne passeront pas en dessous de 50.",
|
|
"color": 5613215
|
|
},
|
|
{
|
|
"title": f"Sondage: {question}",
|
|
"description": vote_options,
|
|
"color": 16775222
|
|
}
|
|
],
|
|
"components": action_rows
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"}
|
|
)
|
|
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
await insert_poll(conn=conn, creator=creator_id, poll_id=poll_created_id, guild_id=guild_id,
|
|
question=question)
|
|
for option in buttons:
|
|
await insert_vote_options(conn=conn, poll_id=poll_created_id, vote_id=option["custom_id"],
|
|
vote_option=option["label"])
|
|
|
|
|
|
async def identify(websocket):
|
|
payload: dict = {
|
|
"op": OpCode.IDENTIFY,
|
|
"d": {
|
|
"token": TOKEN,
|
|
"properties": {
|
|
"os": "linux",
|
|
"browser": "gambling",
|
|
"device": "gambling"
|
|
},
|
|
"presence": {
|
|
"activities": [{
|
|
"name": "son argent flamber",
|
|
"type": 3,
|
|
}],
|
|
"status": "dnd",
|
|
"since": 91879201,
|
|
"afk": False
|
|
},
|
|
"intents": 36871,
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(payload))
|
|
|
|
|
|
async def create_poll_form(interaction_id: str, interaction_token: str, guild_id: str, user_id: str, username: str):
|
|
poll_id = uuid4()
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
await insert_user(conn=conn, user_id=user_id, guild_id=guild_id, username=username)
|
|
if not await is_last_poll_created_opened(conn=conn, creator=user_id):
|
|
body = {
|
|
"type": 9,
|
|
"data": {
|
|
"title": "Créer un vote",
|
|
"custom_id": str(poll_id),
|
|
"components": [
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "label",
|
|
"label": "Question du vote",
|
|
"placeholder": "Shy aime-t-il les robots?",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote1",
|
|
"label": "Première option",
|
|
"placeholder": "Oui",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote2",
|
|
"label": "Deuxième option",
|
|
"placeholder": "Non",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote3",
|
|
"label": "Troisième option",
|
|
"placeholder": "Peut-être",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote4",
|
|
"label": "Quatrième option",
|
|
"placeholder": "Pas du tout",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, il faut d'abord clôturer ton sondage avant de pouvoir en créer un nouveau. "
|
|
f"(commande: **/close**)",
|
|
"flags": 64
|
|
}
|
|
}
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def heartbeat(websocket, interval):
|
|
while True:
|
|
await asyncio.sleep(interval / 1000)
|
|
try:
|
|
await websocket.send(json.dumps({"op": OpCode.HEARTBEAT, "d": None}))
|
|
except websockets.exceptions.ConnectionClosed:
|
|
print("WebSocket fermé")
|
|
break
|
|
except Exception as e:
|
|
print(f"Erreur lors du heartbeat : {e}")
|
|
break
|
|
|
|
|
|
async def vote_confirmation(interaction_id: str, interaction_token: str, custom_id: str, user_id: str,
|
|
guild_id: str, username: str):
|
|
user_vote = await get_user_vote(conn=conn, user_id=user_id, poll_id=custom_id.split("_")[0])
|
|
if user_vote is not None:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu as déjà voté pour **{user_vote[0].split('_')[1]}**",
|
|
"flags": 64
|
|
}
|
|
}
|
|
else:
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
await insert_user(conn=conn, user_id=user_id, guild_id=guild_id, username=username)
|
|
bet_value = await get_bet_value(conn=conn, user_id=user_id, guild_id=guild_id)
|
|
await bet(conn=conn, guild_id=guild_id, user_id=user_id, vote_id=custom_id,
|
|
poll_id=custom_id.split("_")[0], bet_value=bet_value)
|
|
await minus_user_points(conn=conn, user_id=user_id, bet_value=bet_value)
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu as misé {bet_value} sur l'option **{custom_id.split('_')[1]}** ! 🎉",
|
|
"flags": 64
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}", "Content-Type": "application/json"}
|
|
)
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def close_poll_cmd(guild_id: str, user_id: str, interaction_id: str, issue: str, interaction_token: str,
|
|
username: str):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
await insert_user(conn=conn, user_id=user_id, guild_id=guild_id, username=username)
|
|
poll: tuple = await get_poll_id_opened(conn=conn, creator=user_id)
|
|
if poll is not None:
|
|
winner_id = await get_vote_id_by_vote_option_and_poll_id(conn=conn, vote_option=issue, poll_id=poll[1])
|
|
if winner_id is not None:
|
|
winner_cote = None
|
|
values = await get_bet_value_multipliers(conn=conn, poll_id=poll[1])
|
|
for value in values:
|
|
vote_id = value[0]
|
|
users_with_bet_values = await get_users_and_bet_value_by_vote_id(conn=conn, vote_id=vote_id)
|
|
if vote_id == winner_id:
|
|
winner_cote = value[1]
|
|
for user_with_bet_value in users_with_bet_values:
|
|
print(int(user_with_bet_value[1] * winner_cote))
|
|
await add_user_points(conn=conn,
|
|
user_id=user_with_bet_value[0],
|
|
bet_value=int(user_with_bet_value[1] * winner_cote))
|
|
await close_poll(conn=conn, poll_id=poll[1])
|
|
if winner_cote is not None:
|
|
out = (round(winner_cote, 2) - 1) * 100
|
|
else:
|
|
out = 0.0
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f'@everyone le sondage **{poll[2]}** est terminé ! L\'issue gagnante est **{issue}** '
|
|
f'avec une côte de {out}%'
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, L'option **{issue}** n'existe pas.",
|
|
"flags": 64
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu n'as actuellement aucun sondage en cours.",
|
|
"flags": 64
|
|
}
|
|
}
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def get_event(response: Any):
|
|
match response["t"]:
|
|
# case EventTitle.MESSAGE_CREATE:
|
|
case EventTitle.INTERACTION_CREATE:
|
|
if response["d"]["type"] == InteractionType.MODAL_SUBMIT:
|
|
await create_poll(
|
|
guild_id=response["d"]["guild"]["id"],
|
|
creator_id=response["d"]["member"]["user"]["id"],
|
|
poll_created_id=response["d"]["data"]["custom_id"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
components=response["d"]["data"]["components"]
|
|
)
|
|
elif response["d"]["type"] == InteractionType.MESSAGE_COMPONENT:
|
|
custom_id = response["d"]["data"]["custom_id"]
|
|
user_id = response["d"]["member"]["user"]["id"]
|
|
await vote_confirmation(interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
custom_id=custom_id,
|
|
user_id=user_id,
|
|
guild_id=response["d"]["guild"]["id"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'poll'):
|
|
await create_poll_form(interaction_id=response["d"]["id"],
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
guild_id=response["d"]["guild_id"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'close'):
|
|
await close_poll_cmd(guild_id=response["d"]["guild_id"],
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
issue=response["d"]["data"]["options"][0]["value"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'bet'):
|
|
command_option: int = response["d"]["data"]["options"][0]["value"]
|
|
await change_bet_cmd(guild_id=response["d"]["guild_id"],
|
|
command_option=command_option,
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
username=response["d"]["member"]["user"]["username"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'profil'):
|
|
command_option: str = ''
|
|
user_avatar: str = ''
|
|
username: str = ''
|
|
try:
|
|
command_option = response["d"]["data"]["options"][0]["value"]
|
|
user_avatar = response["d"]["data"]["resolved"]["users"][str(command_option)]["avatar"]
|
|
username = response["d"]["data"]["resolved"]["users"][str(command_option)]["username"]
|
|
except Exception as e:
|
|
pass
|
|
if command_option == '':
|
|
command_option = response["d"]["member"]["user"]["id"]
|
|
if user_avatar == '':
|
|
user_avatar = response["d"]["member"]["user"]["avatar"]
|
|
username = response["d"]["member"]["user"]["username"]
|
|
await show_user_profile(guild_id=response["d"]["guild_id"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
avatar=user_avatar,
|
|
concerned_user_id=command_option,
|
|
username=username)
|
|
|
|
case _:
|
|
print(response)
|
|
|
|
|
|
async def show_user_profile(guild_id: str, interaction_id: str, interaction_token: str,
|
|
concerned_user_id: str, avatar: str, username: str):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
if not await user_exists(conn=conn, user_id=concerned_user_id, guild_id=guild_id):
|
|
await insert_user(conn=conn, user_id=concerned_user_id, guild_id=guild_id, username=username)
|
|
|
|
concerned_user = await get_user_by_id(conn=conn, user_id=concerned_user_id, guild_id=guild_id)
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"embeds": [
|
|
{
|
|
"title": f"Profil de {concerned_user[0]}",
|
|
"description": f"Solde : {concerned_user[1]}\n"
|
|
f"Valeur du prochain pari: {concerned_user[2]}",
|
|
"image": {
|
|
"url": f"https://cdn.discordapp.com/avatars/{concerned_user_id}/{avatar}"
|
|
},
|
|
"color": 11141375
|
|
}
|
|
],
|
|
}
|
|
}
|
|
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def change_bet_cmd(guild_id: str, command_option: int, user_id: str, username: str,
|
|
interaction_id: str, interaction_token: str):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_user(conn=conn, guild_id=guild_id, user_id=user_id, username=username)
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
|
|
if await get_user_points(conn=conn, user_id=user_id, guild_id=guild_id) >= command_option:
|
|
await change_bet(conn=conn, guild_id=guild_id, user_id=user_id, new_bet_amount=command_option)
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, ton prochain pari vaudra **{command_option}**.",
|
|
"flags": 64
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, solde insuffisant !",
|
|
"flags": 64
|
|
}
|
|
}
|
|
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def connect():
|
|
async with websockets.connect(GATEWAY_URL) as websocket:
|
|
response = json.loads(await websocket.recv())
|
|
heartbeat_interval = response["d"]["heartbeat_interval"]
|
|
|
|
asyncio.create_task(heartbeat(websocket, heartbeat_interval))
|
|
|
|
await identify(websocket)
|
|
|
|
connected = True
|
|
while connected:
|
|
response = json.loads(await websocket.recv())
|
|
match response["op"]:
|
|
case OpCode.DISPATCH:
|
|
await get_event(response)
|
|
case OpCode.RECONNECT:
|
|
print("Reconnexion...")
|
|
connected = False
|
|
case _:
|
|
pass
|
|
|
|
# Reconnexion
|
|
await websocket.close()
|
|
await asyncio.sleep(5)
|
|
await connect()
|
|
|
|
|
|
async def main():
|
|
gateway_connect = asyncio.create_task(connect())
|
|
# await init_commands()
|
|
await gateway_connect
|
|
|
|
|
|
asyncio.run(main())
|