316 lines
11 KiB
Python
316 lines
11 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
import psycopg2
|
|
import requests
|
|
import websockets
|
|
from websockets import Response
|
|
|
|
from database import user_exists, insert_user, guild_exists, insert_guild
|
|
from enums import OpCode, EventTitle, InteractionType
|
|
|
|
with open('configuration.json', 'r') as file:
|
|
CONFIG = json.load(file)
|
|
|
|
APPLICATION_ID: int = CONFIG["APPLICATION_ID"]
|
|
GATEWAY_URL: str = "wss://gateway.discord.gg/?v=10&encoding=json"
|
|
API_URL: str = "https://discord.com/api/v10"
|
|
TOKEN: str = CONFIG["TOKEN"]
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
database=CONFIG["DB_NAME"],
|
|
user=CONFIG["DB_USER"],
|
|
password=CONFIG["DB_PASSWORD"],
|
|
host=CONFIG["DB_ADDRESS"],
|
|
port=CONFIG["DB_PORT"]
|
|
)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
async def create_command():
|
|
body: dict = {
|
|
"name": "poll",
|
|
"type": 1,
|
|
"description": "Des votes sur lesquels gamble !",
|
|
}
|
|
res: Response = requests.post(f"{API_URL}/applications/{APPLICATION_ID}/commands", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
|
|
|
|
async def init_commands():
|
|
res = requests.get(f"{API_URL}/applications/{APPLICATION_ID}/commands", headers={"Authorization": f"Bot {TOKEN}"})
|
|
commands = json.loads(res.content)
|
|
for command in commands:
|
|
response = requests.delete(f"{API_URL}/applications/{APPLICATION_ID}/commands/{command['id']}",
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if response.status_code == 204:
|
|
print("Suppression des commandes: Supprimé avec succès !")
|
|
else:
|
|
print("Suppression des commandes: Une erreur est survenue")
|
|
await create_command()
|
|
|
|
|
|
async def create_poll(interaction_id: str, interaction_token: str, components: list[dict]):
|
|
question = next(
|
|
(comp["components"][0]["value"] for comp in components if comp["components"][0]["custom_id"] == "label"),
|
|
"Question du poll"
|
|
)
|
|
|
|
vote_options = ""
|
|
buttons = []
|
|
for index, comp in enumerate(components):
|
|
if "vote" in comp["components"][0]["custom_id"] and comp["components"][0]["value"] != "":
|
|
vote_text = comp["components"][0]["value"]
|
|
vote_options += f"Option {index}: {vote_text}\n"
|
|
|
|
buttons.append({
|
|
"type": 2,
|
|
"label": vote_text,
|
|
"style": 1,
|
|
"custom_id": f"vote_{index}"
|
|
})
|
|
|
|
action_rows = [{"type": 1, "components": buttons[i:i + 5]} for i in range(0, len(buttons), 5)]
|
|
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"embeds": [
|
|
{
|
|
"title": "Les paris sont ouverts !",
|
|
"description": "Vous pouvez parier vos points, ils ne passeront pas en dessous de 50.",
|
|
"color": 5613215
|
|
},
|
|
{
|
|
"title": f"Sondage: {question}",
|
|
"description": vote_options,
|
|
"color": 16775222
|
|
}
|
|
],
|
|
"components": action_rows
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"}
|
|
)
|
|
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def identify(websocket):
|
|
payload: dict = {
|
|
"op": OpCode.IDENTIFY,
|
|
"d": {
|
|
"token": TOKEN,
|
|
"properties": {
|
|
"os": "linux",
|
|
"browser": "gambling",
|
|
"device": "gambling"
|
|
},
|
|
"presence": {
|
|
"activities": [{
|
|
"name": "son argent flamber",
|
|
"type": 3,
|
|
}],
|
|
"status": "dnd",
|
|
"since": 91879201,
|
|
"afk": False
|
|
},
|
|
"intents": 36871,
|
|
}
|
|
}
|
|
await websocket.send(json.dumps(payload))
|
|
|
|
|
|
async def create_poll_form(interaction_id: str, interaction_token: str, guild_id: str):
|
|
poll_id = uuid4()
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
body = {
|
|
"type": 9,
|
|
"data": {
|
|
"title": "Créer un vote",
|
|
"custom_id": poll_id,
|
|
"components": [
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "label",
|
|
"label": "Question du vote",
|
|
"placeholder": "Shy aime-t-il les robots?",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote1",
|
|
"label": "Première option",
|
|
"placeholder": "Oui",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": True
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote2",
|
|
"label": "Deuxième option",
|
|
"placeholder": "Non",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote3",
|
|
"label": "Troisième option",
|
|
"placeholder": "Peut-être",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 4,
|
|
"custom_id": "vote4",
|
|
"label": "Quatrième option",
|
|
"placeholder": "Pas du tout",
|
|
"style": 1,
|
|
"min_length": 1,
|
|
"required": False
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
res = requests.post(f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback", json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}"})
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def heartbeat(websocket, interval):
|
|
while True:
|
|
await asyncio.sleep(interval / 1000)
|
|
await websocket.send(json.dumps({"op": OpCode.HEARTBEAT, "d": None}))
|
|
|
|
|
|
async def vote_confirmation(interaction_id: str, interaction_token: str, custom_id: str, user_id: str, guild_id: str,
|
|
avatar: str, global_name: str):
|
|
if not await user_exists(conn=conn, user_id=user_id, guild_id=guild_id):
|
|
if not await guild_exists(conn=conn, guild_id=guild_id):
|
|
await insert_guild(conn=conn, guild_id=guild_id)
|
|
await insert_user(conn=conn, user_id=user_id, avatar=avatar, guild_id=guild_id, global_name=global_name)
|
|
body = {
|
|
"type": 4,
|
|
"data": {
|
|
"content": f"<@{user_id}>, tu as misé 50 sur l'option **{custom_id.split('_')[1]}** ! 🎉",
|
|
"flags": 64
|
|
}
|
|
}
|
|
|
|
res = requests.post(
|
|
f"{API_URL}/interactions/{interaction_id}/{interaction_token}/callback",
|
|
json=body,
|
|
headers={"Authorization": f"Bot {TOKEN}", "Content-Type": "application/json"}
|
|
)
|
|
if res.status_code == 400:
|
|
print(res.json())
|
|
else:
|
|
print(res)
|
|
|
|
|
|
async def get_event(response: Any):
|
|
match response["t"]:
|
|
case EventTitle.MESSAGE_CREATE:
|
|
print(f'{response["d"]["author"]["username"]}: {response["d"]["content"]}')
|
|
case EventTitle.INTERACTION_CREATE:
|
|
if response["d"]["type"] == InteractionType.MODAL_SUBMIT:
|
|
await create_poll(
|
|
response["d"]["id"],
|
|
response["d"]["token"],
|
|
response["d"]["data"]["components"]
|
|
)
|
|
elif response["d"]["type"] == InteractionType.MESSAGE_COMPONENT:
|
|
custom_id = response["d"]["data"]["custom_id"]
|
|
user_id = response["d"]["member"]["user"]["id"]
|
|
await vote_confirmation(interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
custom_id=custom_id,
|
|
user_id=user_id,
|
|
guild_id=response["d"]["guild"]["id"],
|
|
avatar=response["d"]["member"]["user"]["avatar"],
|
|
global_name=response["d"]["member"]["user"]["global_name"])
|
|
elif response["d"]["type"] == InteractionType.APPLICATION_COMMAND:
|
|
print(response["d"])
|
|
await create_poll_form(interaction_id=response["d"]["id"],
|
|
interaction_token=response["d"]["token"],
|
|
guild_id=response["d"]["guild_id"])
|
|
case _:
|
|
print(response)
|
|
|
|
|
|
async def connect():
|
|
async with websockets.connect(GATEWAY_URL) as websocket:
|
|
response = json.loads(await websocket.recv())
|
|
heartbeat_interval = response["d"]["heartbeat_interval"]
|
|
|
|
asyncio.create_task(heartbeat(websocket, heartbeat_interval))
|
|
|
|
await identify(websocket)
|
|
|
|
while True:
|
|
response = json.loads(await websocket.recv())
|
|
match response["op"]:
|
|
case OpCode.DISPATCH:
|
|
await get_event(response)
|
|
case _:
|
|
print(response)
|
|
|
|
|
|
async def main():
|
|
gateway_connect = asyncio.create_task(connect())
|
|
# await init_commands()
|
|
# guild_ids = await get_guilds_ids()
|
|
# for guild_id in guild_ids:
|
|
# await get_guild_members(guild_id)
|
|
await gateway_connect
|
|
|
|
|
|
asyncio.run(main())
|