#!/usr/bin/env python3 """ matrix-redact: Delete all messages and media a user has sent in a matrix room. """ import random import math import asyncio import argparse import getpass import sys import subprocess from datetime import datetime from nio import( AsyncClient, MatrixRoom, LoginResponse, RoomInfo, MessageDirection, RedactedEvent, RedactionEvent, RoomMessagesError, RoomRedactError, RoomMemberEvent, RoomCreateEvent, RoomEncryptionEvent, RoomGuestAccessEvent, RoomHistoryVisibilityEvent, RoomJoinRulesEvent, PowerLevelsEvent, responses ) from typing import Optional, List async def select_room(client: AsyncClient, sync_response) -> MatrixRoom: rooms = [] for room_id, room in client.rooms.items(): if room_id in sync_response.rooms.join: rooms.append({ "room_id": room_id, "display_name": room.display_name, }) print(f"{rooms[-1]['room_id']} | {rooms[-1]['display_name']}") selection = input("\nManually type the full room ID to which redaction will be performed: ") for room in rooms: if room['room_id'] == selection: print(f"{selection} will be redacted") return room print(f"{selection} is invalid. ") return None async def client_login() -> Optional[AsyncClient]: uid = input("Full user ID (eg. @user:matrix.server.com): ") upass = getpass.getpass(f"Enter the password for {uid}: ") user, hserv = uid[1:].split(':', 1) client = AsyncClient(f"https://{hserv}", uid) if not isinstance(await client.login(upass), LoginResponse): print("could not login, returning None") return None print("login was successful, returning client.") return client async def get_all_events(client: AsyncClient, room: MatrixRoom): sync_resp = await client.sync(timeout=30000, full_state=True) current_token = sync_resp.rooms.join[room['room_id']].timeline.prev_batch events = [] while True: try: print(f"getting messages in {room['room_id']}") resp = await client.room_messages( room['room_id'], current_token, limit=500, direction=MessageDirection.back, ) except Exception as e: print(f"could not get room message: {e}") break if isinstance(resp, RoomMessagesError): print(f"Could not get room message: {resp}") break current_token = resp.end if len(resp.chunk) == 0: print("no more messages to fetch.") break for event in resp.chunk: event_id = event.event_id if (event_id in {e.event_id for e in events} or not hasattr(event, 'sender') or event.sender != client.user_id or isinstance(event, RedactionEvent) or isinstance(event, RedactedEvent) or isinstance(event, (RoomMemberEvent, RoomCreateEvent, RoomEncryptionEvent, RoomGuestAccessEvent, RoomHistoryVisibilityEvent, RoomJoinRulesEvent, PowerLevelsEvent))): continue events.append(event) if len(events) > 1: return events return None async def redact_room(client: AsyncClient, room: MatrixRoom, dry_run) -> None: print(f"\nAll events sent by '{client.user_id}' in " + f"'{room['room_id']}' ({room['display_name']}) will be deleted.") if not input("\n\nIs this room correct? (Type 'Continue') to proceed: ") == "Continue": return None events = await get_all_events(client, room) events_redacted = 0 if not events: print("Could not get events.") return None print(f"Total number of events to redact: {len(events)}") for event in events: print(f"'{event.event_id}'") try: if not dry_run: redact_resp = await client.room_redact( room['room_id'], event.event_id, reason="" ) if isinstance(redact_resp, RoomRedactError): print(f"Could not redact event {event} in {room['room_id']}: {redact_resp}") continue events_redacted += 1 print('\r', end='') print(f"Progress: {events_redacted}/{len(events)}" + f" ({(events_redacted / len(events)) * 100:.2f}%)\t ", end='', flush=True) except Exception as e: print(f"Error redacting event {event}: {e}") if events_redacted: print(f"Total messages redacted: {events_redacted}") else: print("Could not redact any messages, room is probably redacted already") async def main(args) -> None: print("\n" + "="*80) print("WARNING: THIS PROGRAM PERFORMS A PERMANENT DESTRUCTIVE ACTION IN A PROVIDED MATRIX ROOM.\n") print("Make absolutely sure you understand what this program does before proceeding.\n\n") confirmation = input("Type 'YES I UNDERSTAND' to continue: ") if confirmation != "YES I UNDERSTAND": print("Exiting.") sys.exit(1) client = None try: client = await client_login() if not client: print("Could not login.") sys.exit(1) sync_resp = await client.sync(timeout=30000, full_state=True) room = await select_room(client, sync_resp) if room: await redact_room(client, room, args.dry) except Exception as e: print(f"Error: {e}") finally: if client: print("logging out.") await client.logout() await client.close() if __name__ == "__main__": parser = argparse.ArgumentParser( description="matrix-mcnt: Matrix Unread Message Count" ) parser.add_argument( "--dry", help="Dry run (doesn't actually delete messages)", action="store_true" ) asyncio.run(main(parser.parse_args()))