import json import subprocess import re import logging from dataclasses import dataclass from collections import defaultdict from utils import decrypt_encrypted_key import pandas as pd from pandas import DataFrame from sklearn.feature_extraction import DictVectorizer logging.basicConfig() logger = logging.getLogger('main') logger.setLevel(logging.INFO) SQLCIPHER_PATH = "/usr/bin/sqlcipher" # the decryption key for your signal DB lives in this config file # CONFIG_PATH = expanduser("~/.config/Signal/config.json") # DB_PATH = expanduser("~/.config/Signal/sql/db.sqlite") @dataclass class Message: sender: str conversation: str timestamp: int body: str def get_decryption_key(path, password=None): with open(path) as fh: config = json.load(fh) if 'key' in config and 'encryptedKey' in config: raise Exception('config has both `key` and `encryptedKey` in it') if 'key' in config: if password is not None: raise Exception('you cannot use the -p option with your signal config') return config['key'] elif 'encryptedKey' in config: if password is None: raise Exception('you must use the -p option to pass the password in for your signal db') return decrypt_encrypted_key(password, config['encryptedKey']) else: dict_keys = ', '.join(str(x) for x in config.keys()) raise Exception(f'no `key` or `encryptedKey` found for db in config.json, only saw: {dict_keys}') def query_signal_db(db, key, query): """Runs `query` against the signal DB, assumes each row is a json result, and returns a generator which yields dictionaries. """ result = subprocess.run( [SQLCIPHER_PATH, '-list', '-noheader', db, f"PRAGMA key = \"x'{key}'\";{query}"], capture_output=True) messages = result.stdout prefix = messages[:3] if prefix != b'ok\n': raise Exception(f'unexpected result from sqlcipher: {prefix}...') messages = messages[3:] messages = messages.decode('utf8').split('\n') for line in messages: if line == '': continue yield json.loads(line) def get_signal_uuid(db, key): uuid = next(query_signal_db(db, key, "select json from items where id='uuid_id'") )['value'] assert uuid.endswith('.2') return uuid[:-2] # for some reason mine has .2 at the end. version? def get_messages(db, key): """Returns every message in the signal db. Given a path to a signal sqlite db, and a decryption key, returns a generator that yields json objects- one for each message in the message db. """ host_uuid = get_signal_uuid(db, key) for message in query_signal_db(db, key, 'select json from messages;'): cid = message['conversationId'] if 'body' not in message: continue body = message['body'] timestamp = message['timestamp'] if message['type'] == 'incoming': yield Message(message['sourceServiceId'], cid, timestamp, body) if message['type'] == 'outgoing': yield Message(host_uuid, cid, timestamp, body) def get_conversations(db, key, group_only=False): """Returns every conversation in the signal db. A conversation represents a group chat or DM. Given a path to a signal sqlite db, and a decryption key, returns a generator that yields json objects- one for each message in the message db. """ results = query_signal_db(db, key, 'select json from conversations;') if group_only: return (g for g in results if g['type'] == 'group') else: return results def get_conversation_names(db, key): """ Returns a dictionary that maps jrom converation ID to its name """ messages = get_conversations(db, key, group_only=True) return {message['id']: message.get('name') for message in messages if message.get('name')} def get_service_ids(db, key): messages = get_messages(db, key) return {m.sender for m in messages} def get_profile_names(db, key): convos = get_conversations(db, key) privates = (c for c in convos if c['type'] == 'private') return {c['serviceId']: c.get('profileName') for c in privates} def get_membership(db, key): convos = get_conversations(db, key) groups = [{'name': c.get('name'), 'members': [m['aci'] for m in c.get('membersV2', [])]} for c in convos if c['type'] == 'group'] return groups def print_messages_by_user(db, key, regex): unique_service_ids = get_service_ids(db, key) profile_names = get_profile_names(db, key) names_to_sids = {val: key for key, val in profile_names.items()} convo_names = get_conversation_names(db, key) print(f'incoming messages from {len(unique_service_ids)} unique service ids') print(f'profile names for {len(profile_names)} service ids') for sid in unique_service_ids: assert sid in profile_names print() username = None for name in names_to_sids: if name and re.search(regex, name): username = name sid = names_to_sids[name] print(username, sid) break assert username is not None for m in get_messages(db, key): if m.sender == sid: convo = convo_names.get(m.conversation, 'DM') print(f'[{convo}]:', m.body) def print_messages_by_chat(db, key, regex): profile_names = get_profile_names(db, key) cid, name = None, None for cid, name in get_conversation_names(db, key).items(): if re.search(regex, name): # `name` persists in the code below break else: return for message in get_messages(db, key): if message.conversation != cid: continue name = profile_names.get(message.sender) print(f'{name}: ', message.body) def get_message_counts(db, key, chat_list=None): messages = get_messages(db, key) convos = defaultdict(lambda: defaultdict(int)) for message in messages: if chat_list is None or message.conversation in chat_list: convos[message.conversation][message.sender] += 1 # convos[convo][user] = max(convos[convo][user], 1) convoIds, messageCounts = zip(*convos.items()) v = DictVectorizer(sparse=False) return v, convoIds, v.fit_transform(messageCounts) def resolve_filter_list(convo_id_to_name, filter_file): name_to_convo_id = {val: key for key, val in convo_id_to_name.items()} with open(filter_file) as fh: chat_list = fh.read().split('\n') if chat_list and chat_list[-1].isspace() or chat_list[-1] == '': chat_list.pop() collecting = [] for name in chat_list: if cid := name_to_convo_id.get(name): collecting.append(cid) else: logger.warn(f'you are either not a member of the group "{name}" or you mispelled the name in chat_list.txt') return collecting def dump_message_count_table(db, key, output, filter_file=None): convo_id_to_name = get_conversation_names(db, key) if filter_file is None: chat_list = None else: chat_list = resolve_filter_list(convo_id_to_name, filter_file) v, convoIds, message_counts = get_message_counts(db, key, chat_list) profile_names = get_profile_names(db, key) convo_labels = [convo_id_to_name.get(cid) for cid in convoIds] people_labels = [profile_names.get(x) for x in v.get_feature_names_out()] df = DataFrame(message_counts, columns=people_labels, index=convo_labels) logger.info(f'writing message count table to {output}') df[~pd.isna(df.index)].to_csv(output)