import asyncio import json from typing import Any from uuid import uuid4 import psycopg2 import requests import websockets from database import (user_exists, insert_user, guild_exists, insert_guild, insert_poll, insert_vote_options, bet, get_user_vote, change_bet) from enums import OpCode, EventTitle, InteractionType with open('configuration.json', 'r') as file: CONFIG = json.load(file) APPLICATION_ID: int = CONFIG["APPLICATION_ID"] GATEWAY_URL: str = "wss://gateway.discord.gg/?v=10&encoding=json" API_URL: str = "https://discord.com/api/v10" TOKEN: str = CONFIG["TOKEN"] try: conn = psycopg2.connect( database=CONFIG["DB_NAME"], user=CONFIG["DB_USER"], password=CONFIG["DB_PASSWORD"], host=CONFIG["DB_ADDRESS"], port=CONFIG["DB_PORT"] ) except Exception as e: raise e async def create_command(): bodies = [ { "name": "poll", "type": 1, "description": "Des votes sur lesquels gamble !", }, { "name": "bet", "type": 1, "description": "Pour pouvoir planifier le prochain pari !", "required": True, "min_value": 0, "options": [ { "name": "somme", "description": "La somme du prochain pari", "type": 3 } ] }, { # à améliorer en /profil <@user_id> "name": "me", "type": 1, "description": "Afficher des informations sur vous", } ] for body in bodies: requests.post(f"{API_URL}/applications/{APPLICATION_ID}/commands", json=body, headers={"Authorization": f"Bot {TOKEN}"}) async def init_commands(): res = requests.get(f"{API_URL}/applications/{APPLICATION_ID}/commands", headers={"Authorization": f"Bot {TOKEN}"}) commands = json.loads(res.content) for command in commands: response = requests.delete(f"{API_URL}/applications/{APPLICATION_ID}/commands/{command['id']}", headers={"Authorization": f"Bot {TOKEN}"}) if response.status_code == 204: print("Suppression des commandes: Supprimé avec succès !") else: print("Suppression des commandes: Une erreur est survenue") await create_command() async def create_poll(guild_id: str, creator_id: str, poll_created_id: str, interaction_id: str, interaction_token: str, components: list[dict]): question = next( (comp["components"][0]["value"] for comp in components if comp["components"][0]["custom_id"] == "label"), "Question du poll" ) vote_options = "" buttons = [] for index, comp in enumerate(components): if "vote" in comp["components"][0]["custom_id"] and comp["components"][0]["value"] != "": vote_text = comp["components"][0]["value"] vote_options += f"Option {index}: {vote_text}\n" buttons.append({ "type": 2, "label": vote_text, "style": 1, "custom_id": f"{poll_created_id}_{vote_text}_{index}" }) action_rows = [{"type": 1, "components": buttons[i:i + 5]} for i in range(0, len(buttons), 5)] body = { "type": 4, "data": { "embeds": [ { "title": "Les paris sont ouverts !", "description": "Vous pouvez parier vos points, ils ne passeront pas en dessous de 50.", "color": 5613215 }, { "title": f"Sondage: {question}", "description": vote_options, "color": 16775222 } ], "components": action_rows } } res = requests.post( f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body, headers={"Authorization": f"Bot {TOKEN}"} ) if res.status_code == 400: print(res.json()) else: await insert_poll(conn=conn, creator=creator_id, poll_id=poll_created_id, guild_id=guild_id, question=question) for option in buttons: await insert_vote_options(conn=conn, vote_id=option["custom_id"], vote_option=option["label"]) async def identify(websocket): payload: dict = { "op": OpCode.IDENTIFY, "d": { "token": TOKEN, "properties": { "os": "linux", "browser": "gambling", "device": "gambling" }, "presence": { "activities": [{ "name": "son argent flamber", "type": 3, }], "status": "dnd", "since": 91879201, "afk": False }, "intents": 36871, } } await websocket.send(json.dumps(payload)) async def create_poll_form(interaction_id: str, interaction_token: str, guild_id: str): poll_id = uuid4() if not await guild_exists(conn=conn, guild_id=guild_id): await insert_guild(conn=conn, guild_id=guild_id) body = { "type": 9, "data": { "title": "Créer un vote", "custom_id": str(poll_id), "components": [ { "type": 1, "components": [ { "type": 4, "custom_id": "label", "label": "Question du vote", "placeholder": "Shy aime-t-il les robots?", "style": 1, "min_length": 1, "required": True } ] }, { "type": 1, "components": [ { "type": 4, "custom_id": "vote1", "label": "Première option", "placeholder": "Oui", "style": 1, "min_length": 1, "required": True } ] }, { "type": 1, "components": [ { "type": 4, "custom_id": "vote2", "label": "Deuxième option", "placeholder": "Non", "style": 1, "min_length": 1, "required": False } ] }, { "type": 1, "components": [ { "type": 4, "custom_id": "vote3", "label": "Troisième option", "placeholder": "Peut-être", "style": 1, "min_length": 1, "required": False } ] }, { "type": 1, "components": [ { "type": 4, "custom_id": "vote4", "label": "Quatrième option", "placeholder": "Pas du tout", "style": 1, "min_length": 1, "required": False } ] } ] } } res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body, headers={"Authorization": f"Bot {TOKEN}"}) if res.status_code == 400: print(res.json()) else: print(res) async def heartbeat(websocket, interval): while True: await asyncio.sleep(interval / 1000) await websocket.send(json.dumps({"op": OpCode.HEARTBEAT, "d": None})) async def vote_confirmation(interaction_id: str, interaction_token: str, custom_id: str, user_id: str, guild_id: str, avatar: str, global_name: str): user_vote = await get_user_vote(conn=conn, user_id=user_id, poll_id=custom_id.split("_")[0]) if user_vote: body = { "type": 4, "data": { "content": f"<@{user_id}>, tu as déjà voté pour **{user_vote[0].split('_')[1]}**", "flags": 64 } } else: if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id): if not await guild_exists(conn=conn, guild_id=guild_id): await insert_guild(conn=conn, guild_id=guild_id) await insert_user(conn=conn, user_id=user_id, avatar=avatar, guild_id=guild_id, global_name=global_name) await bet(conn=conn, guild_id=guild_id, user_id=user_id, vote_id=custom_id, poll_id=custom_id.split("_")[0]) body = { "type": 4, "data": { "content": f"<@{user_id}>, tu as misé 50 sur l'option **{custom_id.split('_')[1]}** ! 🎉", "flags": 64 } } res = requests.post( f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body, headers={"Authorization": f"Bot {TOKEN}", "Content-Type": "application/json"} ) if res.status_code == 400: print(res.json()) else: print(res) async def get_event(response: Any): match response["t"]: case EventTitle.MESSAGE_CREATE: print(f'{response["d"]["author"]["username"]}: {response["d"]["content"]}') case EventTitle.INTERACTION_CREATE: if response["d"]["type"] == InteractionType.MODAL_SUBMIT: await create_poll( guild_id=response["d"]["guild"]["id"], creator_id=response["d"]["member"]["user"]["id"], poll_created_id=response["d"]["data"]["custom_id"], interaction_id=response["d"]["id"], interaction_token=response["d"]["token"], components=response["d"]["data"]["components"] ) elif response["d"]["type"] == InteractionType.MESSAGE_COMPONENT: custom_id = response["d"]["data"]["custom_id"] user_id = response["d"]["member"]["user"]["id"] await vote_confirmation(interaction_id=response["d"]["id"], interaction_token=response["d"]["token"], custom_id=custom_id, user_id=user_id, guild_id=response["d"]["guild"]["id"], avatar=response["d"]["member"]["user"]["avatar"], global_name=response["d"]["member"]["user"]["global_name"]) elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and response["d"]["data"]["name"] == 'poll'): await create_poll_form(interaction_id=response["d"]["id"], interaction_token=response["d"]["token"], guild_id=response["d"]["guild_id"]) elif (response["d"]["type"] == InteractionType.APPLICATION_COMMAND and response["d"]["data"]["name"] == 'bet'): try: command_option: str = response["d"]["data"]["options"][0]["value"] if await guild_exists(conn=conn, guild_id=response["d"]["guild_id"]): if command_option.isnumeric() and await user_exists(conn=conn, user_id=response["d"]["member"]["user"][ "id"], guild_id=response["d"]["guild_id"]): await change_bet(conn=conn, guild_id=response["d"]["guild_id"], user_id=response["d"]["member"]["user"]["id"], new_bet_amount=response["d"]["data"]["name"]) else: await bet_error_message("Vous devez rentrer un nombre") except Exception: await bet_error_message("Vous avez oublié de donner une somme !") case _: print(response) async def bet_error_message(message: str): print(message) async def connect(): async with websockets.connect(GATEWAY_URL) as websocket: response = json.loads(await websocket.recv()) heartbeat_interval = response["d"]["heartbeat_interval"] asyncio.create_task(heartbeat(websocket, heartbeat_interval)) await identify(websocket) while True: response = json.loads(await websocket.recv()) match response["op"]: case OpCode.DISPATCH: await get_event(response) case _: print(response) async def main(): gateway_connect = asyncio.create_task(connect()) # await init_commands() # guild_ids = await get_guilds_ids() # for guild_id in guild_ids: # await get_guild_members(guild_id) await gateway_connect asyncio.run(main())