1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#!/usr/bin/env python3
"""
matrix-redact: Delete all messages and media a user has sent in a matrix room.
"""
import random
import math
import asyncio
import argparse
import getpass
import sys
import subprocess
from nio import(
AsyncClient,
MatrixRoom,
LoginResponse,
RoomInfo,
responses
)
from typing import Optional
async def select_room(client: AsyncClient, sync_response) -> MatrixRoom:
rooms = []
for room_id, room in client.rooms.items():
if room_id in sync_response.rooms.join:
rooms.append({
"room_id": room_id,
"display_name": room.display_name,
})
print(f"{rooms[-1]['room_id']} | {rooms[-1]['display_name']}")
selection = input("\nManually type the full room ID to which redaction will be performed: ")
for room in rooms:
if room['room_id'] == selection:
print(f"{selection} will be redacted")
return room
print(f"{selection} is invalid. ")
return None
async def client_login() -> Optional[AsyncClient]:
uid = input("Full user ID (eg. @user:matrix.server.com): ")
upass = getpass.getpass(f"Enter the password for {uid}: ")
user, hserv = uid[1:].split(':', 1)
client = AsyncClient(f"https://{hserv}", uid)
if not isinstance(await client.login(upass), LoginResponse):
print("could not login, returning None")
return None
print("login was successful, returning client.")
return client
# TODO: redact_room
async def redact_room() -> None:
# TODO: ensure user is 100% sure (verify correct room, etc.). This is a permanent action
return None
async def main(args) -> None:
print("\n" + "="*80)
print("WARNING: THIS PROGRAM PERFORMS A PERMANENT DESTRUCTIVE ACTION IN A PROVIDED MATRIX ROOM.")
print("""
This program will (applies only to a provided room and user ID):
- Delete all user-uploaded media (images, files, etc.)
- Delete all messages you've sent
- Delete any other content you've posted
- Operate *irreversibly* on the room ID you provide
- Act as the logged-in user on the specified homeserver
THIS ACTION CANNOT BE UNDONE.
Make absolutely sure you understand what this program does before proceeding.
We take zero liability for any misuse of this program.
""")
confirmation = input("Type 'YES I UNDERSTAND' to continue: ")
if confirmation != "YES I UNDERSTAND":
print("Exiting.")
sys.exit(1)
x = random.randint(2, 12)
a = random.randint(2, 12)
b = a * x
print(f"Solve this: {a} * x = {b}")
if int(input("Answer: ").strip()) != x:
print("Incorrect math solution. Exiting.")
sys.exit(1)
client = None
try:
client = await client_login()
if not client:
print("Could not login.")
sys.exit(1)
sync_resp = await client.sync(timeout=30000, full_state=True)
room = await select_room(client, sync_resp)
if not room:
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
finally:
if client:
print("logging out.")
await client.logout()
await client.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="matrix-mcnt: Matrix Unread Message Count"
)
asyncio.run(main(parser.parse_args()))
|