482 lines
19 KiB
Python
482 lines
19 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
import psycopg2
|
|
import requests
|
|
import websockets
|
|
|
|
from database import (user_exists, insert_user, guild_exists, insert_guild,
|
|
insert_poll, insert_vote_options, bet, get_user_vote, change_bet,
|
|
is_last_poll_created_opened, get_poll_id_opened, close_poll, get_bet_value,
|
|
get_bet_value_multipliers, get_vote_id_by_vote_option_and_poll_id)
|
|
from enums import OpCode, EventTitle, InteractionType
|
|
|
|
with open('configuration.json', 'r') as file:
|
|
CONFIG = json.load(file)
|
|
|
|
APPLICATION_ID: int = CONFIG["APPLICATION_ID"]
|
|
GATEWAY_URL: str = "wss://gateway.discord.gg/?v=10&encoding=json"
|
|
API_URL: str = "https://discord.com/api/v10"
|
|
TOKEN: str = CONFIG["TOKEN"]
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
database=CONFIG["DB_NAME"],
|
|
user=CONFIG["DB_USER"],
|
|
password=CONFIG["DB_PASSWORD"],
|
|
host=CONFIG["DB_ADDRESS"],
|
|
port=CONFIG["DB_PORT"]
|
|
)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
async def create_command():
|
|
bodies = [
|
|
{
|
|
"name": "poll",
|
|
"type": 1,
|
|
"description": "Des votes sur lesquels gamble !",
|
|
},
|
|
{
|
|
"name": "bet",
|
|
"type": 1,
|
|
"description": "Pour pouvoir planifier le prochain pari !",
|
|
"options": [
|
|
{
|
|
"name": "somme",
|
|
"required": True,
|
|
"min_value": 0,
|
|
"description": "La somme du prochain pari",
|
|
"type": 4
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "close",
|
|
"type": 1,
|
|
"description": "Pour clôturer un sondage",
|
|
"options": [
|
|
{
|
|
"name": "option",
|
|
"description": "L'issue du sondage",
|
|
"required": True,
|
|
"type": 3
|
|
}
|
|
]
|
|
},
|
|
{ # à améliorer en /profil <@user_id>
|
|
"name": "me",
|
|
"type": 1,
|
|
"description": "Afficher des informations sur vous",
|
|
}
|
|
]
|
|
|
|
for body in bodies:
|
|
requests.post(f"{API_URL}/applications/{APPLICATION_ID}/commands", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
|
|
|
|
async def init_commands():
|
|
res = requests.get(f"{API_URL}/applications/{APPLICATION_ID}/commands",
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
commands = json.loads(res.content)
|
|
for command in commands:
|
|
response = requests.delete(f"{API_URL}/applications/{APPLICATION_ID}/commands/{command['id']}",
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if response.status_code == 204:
|
|
print("Suppression des commandes: Supprimé avec succès !")
|
|
else:
|
|
print("Suppression des commandes: Une erreur est survenue")
|
|
await create_command()
|
|
|
|
|
|
async def create_poll(guild_id: str, creator_id: str, poll_created_id: str, interaction_id: str,
|
|
interaction_token: str,
|
|
components: list[dict]):
|
|
question = next(
|
|
(comp["components"][0]["value"] for comp in components if comp["components"][0]["custom_id"] == "label"),
|
|
"Question du poll"
|
|
)
|
|
|
|
vote_options = ""
|
|
buttons = []
|
|
vote_text_mem = [] # mémoire pour détecter les doublons
|
|
for index, comp in enumerate(components):
|
|
if "vote" in comp["components"][0]["custom_id"] and comp["components"][0]["value"] != "":
|
|
vote_text = comp["components"][0]["value"]
|
|
if vote_text not in vote_text_mem:
|
|
vote_text_mem.append(vote_text)
|
|
vote_options += f"Option {index}: {vote_text}\n"
|
|
buttons.append({
|
|
"type": 2,
|
|
"label": vote_text,
|
|
"style": 1,
|
|
"custom_id": f"{poll_created_id}_{vote_text}_{index}"
|
|
})
|
|
|
|
action_rows = [{"type": 1, "components": buttons[i:i + 5]} for i in range(0, len(buttons), 5)]
|
|
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"embeds": [
|
|
{
|
|
"title": "Les paris sont ouverts !",
|
|
"description": "Vous pouvez parier vos points, ils ne passeront pas en dessous de 50.",
|
|
"color": 5613215
|
|
},
|
|
{
|
|
"title": f"Sondage: {question}",
|
|
"description": vote_options,
|
|
"color": 16775222
|
|
}
|
|
],
|
|
"components": action_rows
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"}
|
|
)
|
|
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
await insert_poll(conn=conn, creator=creator_id, poll_id=poll_created_id, guild_id=guild_id,
|
|
question=question)
|
|
for option in buttons:
|
|
await insert_vote_options(conn=conn, poll_id=poll_created_id, vote_id=option["custom_id"],
|
|
vote_option=option["label"])
|
|
|
|
|
|
async def identify(websocket):
|
|
payload: dict = {
|
|
"op": OpCode.IDENTIFY,
|
|
"d": {
|
|
"token": TOKEN,
|
|
"properties": {
|
|
"os": "linux",
|
|
"browser": "gambling",
|
|
"device": "gambling"
|
|
},
|
|
"presence": {
|
|
"activities": [{
|
|
"name": "son argent flamber",
|
|
"type": 3,
|
|
}],
|
|
"status": "dnd",
|
|
"since": 91879201,
|
|
"afk": False
|
|
},
|
|
"intents": 36871,
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(payload))
|
|
|
|
|
|
async def create_poll_form(interaction_id: str, interaction_token: str, guild_id: str, user_id: str, username: str,
|
|
avatar: str):
|
|
poll_id = uuid4()
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
await insert_user(conn=conn, user_id=user_id, guild_id=guild_id, username=username, avatar=avatar)
|
|
if not await is_last_poll_created_opened(conn=conn, creator=user_id):
|
|
body = {
|
|
"type": 9,
|
|
"data": {
|
|
"title": "Créer un vote",
|
|
"custom_id": str(poll_id),
|
|
"components": [
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "label",
|
|
"label": "Question du vote",
|
|
"placeholder": "Shy aime-t-il les robots?",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote1",
|
|
"label": "Première option",
|
|
"placeholder": "Oui",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote2",
|
|
"label": "Deuxième option",
|
|
"placeholder": "Non",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote3",
|
|
"label": "Troisième option",
|
|
"placeholder": "Peut-être",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote4",
|
|
"label": "Quatrième option",
|
|
"placeholder": "Pas du tout",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, il faut d'abord clôturer ton sondage avant de pouvoir en créer un nouveau. "
|
|
f"(commande: **/close**)",
|
|
"flags": 64
|
|
}
|
|
}
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def heartbeat(websocket, interval):
|
|
while True:
|
|
await asyncio.sleep(interval / 1000)
|
|
await websocket.send(json.dumps({"op": OpCode.HEARTBEAT, "d": None}))
|
|
|
|
|
|
async def vote_confirmation(interaction_id: str, interaction_token: str, custom_id: str, user_id: str,
|
|
guild_id: str,
|
|
avatar: str, username: str):
|
|
user_vote = await get_user_vote(conn=conn, user_id=user_id, poll_id=custom_id.split("_")[0])
|
|
if user_vote is not None:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu as déjà voté pour **{user_vote[0].split('_')[1]}**",
|
|
"flags": 64
|
|
}
|
|
}
|
|
else:
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
await insert_user(conn=conn, user_id=user_id, avatar=avatar, guild_id=guild_id, username=username)
|
|
bet_value = await get_bet_value(conn=conn, user_id=user_id, guild_id=guild_id)
|
|
await bet(conn=conn, guild_id=guild_id, user_id=user_id, vote_id=custom_id,
|
|
poll_id=custom_id.split("_")[0], bet_value=bet_value)
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu as misé 50 sur l'option **{custom_id.split('_')[1]}** ! 🎉",
|
|
"flags": 64
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}", "Content-Type": "application/json"}
|
|
)
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def close_poll_cmd(guild_id: str, user_id: str, interaction_id: str, issue: str, interaction_token: str,
|
|
username: str,
|
|
avatar: str):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
await insert_user(conn=conn, user_id=user_id, guild_id=guild_id, username=username, avatar=avatar)
|
|
poll: tuple = await get_poll_id_opened(conn=conn, creator=user_id)
|
|
if poll is not None:
|
|
winner_id = await get_vote_id_by_vote_option_and_poll_id(conn=conn, vote_option=issue, poll_id=poll[1])
|
|
if winner_id is not None:
|
|
winner_cote = None
|
|
values = await get_bet_value_multipliers(conn=conn, poll_id=poll[1])
|
|
for value in values:
|
|
vote_id = value[0]
|
|
if vote_id == winner_id:
|
|
winner_cote = value[1]
|
|
# get users where vote_id
|
|
# users.points + (value[1] * users.vote.bet_value)
|
|
pass
|
|
else:
|
|
# get users where vote_id
|
|
# users.points - users.vote.bet_value
|
|
pass
|
|
await close_poll(conn=conn, poll_id=poll[1])
|
|
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f'@everyone le sondage **{poll[2]}** est terminé ! L\'issue gagnante est **{issue}** '
|
|
f'avec une côte de {(round(winner_cote, 2) - 1) * 100}%'
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, L'option **{issue}** n'existe pas.",
|
|
"flags": 64
|
|
}
|
|
}
|
|
else:
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu n'as actuellement aucun sondage en cours.",
|
|
"flags": 64
|
|
}
|
|
}
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def get_event(response: Any):
|
|
match response["t"]:
|
|
case EventTitle.MESSAGE_CREATE:
|
|
print(f'{response["d"]["author"]["username"]}: {response["d"]["content"]}')
|
|
case EventTitle.INTERACTION_CREATE:
|
|
if response["d"]["type"] == InteractionType.MODAL_SUBMIT:
|
|
await create_poll(
|
|
guild_id=response["d"]["guild"]["id"],
|
|
creator_id=response["d"]["member"]["user"]["id"],
|
|
poll_created_id=response["d"]["data"]["custom_id"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
components=response["d"]["data"]["components"]
|
|
)
|
|
elif response["d"]["type"] == InteractionType.MESSAGE_COMPONENT:
|
|
custom_id = response["d"]["data"]["custom_id"]
|
|
user_id = response["d"]["member"]["user"]["id"]
|
|
await vote_confirmation(interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
custom_id=custom_id,
|
|
user_id=user_id,
|
|
guild_id=response["d"]["guild"]["id"],
|
|
avatar=response["d"]["member"]["user"]["avatar"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'poll'):
|
|
await create_poll_form(interaction_id=response["d"]["id"],
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
guild_id=response["d"]["guild_id"],
|
|
avatar=response["d"]["member"]["user"]["avatar"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'close'):
|
|
await close_poll_cmd(guild_id=response["d"]["guild_id"],
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
issue=response["d"]["data"]["options"][0]["value"],
|
|
avatar=response["d"]["member"]["user"]["avatar"],
|
|
username=response["d"]["member"]["user"]["username"])
|
|
elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and
|
|
response["d"]["data"]["name"] == 'bet'):
|
|
try:
|
|
command_option: str = response["d"]["data"]["options"][0]["value"]
|
|
if await guild_exists(conn=conn, guild_id=response["d"]["guild_id"]):
|
|
if command_option.isnumeric() and await user_exists(conn=conn,
|
|
user_id=response["d"]["member"]["user"][
|
|
"id"],
|
|
guild_id=response["d"]["guild_id"]):
|
|
await change_bet(conn=conn,
|
|
guild_id=response["d"]["guild_id"],
|
|
user_id=response["d"]["member"]["user"]["id"],
|
|
new_bet_amount=response["d"]["data"]["name"])
|
|
else:
|
|
await bet_error_message("Vous devez rentrer un nombre")
|
|
except Exception:
|
|
await bet_error_message("Vous avez oublié de donner une somme !")
|
|
case _:
|
|
print(response)
|
|
|
|
|
|
async def bet_error_message(message: str):
|
|
print(message)
|
|
|
|
|
|
async def connect():
|
|
async with websockets.connect(GATEWAY_URL) as websocket:
|
|
response = json.loads(await websocket.recv())
|
|
heartbeat_interval = response["d"]["heartbeat_interval"]
|
|
|
|
asyncio.create_task(heartbeat(websocket, heartbeat_interval))
|
|
|
|
await identify(websocket)
|
|
|
|
while True:
|
|
response = json.loads(await websocket.recv())
|
|
match response["op"]:
|
|
case OpCode.DISPATCH:
|
|
await get_event(response)
|
|
case _:
|
|
print(response)
|
|
|
|
|
|
async def main():
|
|
gateway_connect = asyncio.create_task(connect())
|
|
# await init_commands()
|
|
# guild_ids = await get_guilds_ids()
|
|
# for guild_id in guild_ids:
|
|
# await get_guild_members(guild_id)
|
|
await gateway_connect
|
|
|
|
|
|
asyncio.run(main())
|