You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
7.7 KiB

import json
import subprocess
import re
import logging
from dataclasses import dataclass
from collections import defaultdict
from utils import decrypt_encrypted_key
import pandas as pd
from pandas import DataFrame
from sklearn.feature_extraction import DictVectorizer
logging.basicConfig()
logger = logging.getLogger('main')
logger.setLevel(logging.INFO)
SQLCIPHER_PATH = "/usr/bin/sqlcipher"
# the decryption key for your signal DB lives in this config file
# CONFIG_PATH = expanduser("~/.config/Signal/config.json")
# DB_PATH = expanduser("~/.config/Signal/sql/db.sqlite")
@dataclass
class Message:
sender: str
conversation: str
timestamp: int
body: str
def get_decryption_key(path, password=None):
with open(path) as fh:
config = json.load(fh)
if 'key' in config and 'encryptedKey' in config:
raise Exception('config has both `key` and `encryptedKey` in it')
if 'key' in config:
if password is not None:
raise Exception('you cannot use the -p option with your signal config')
return config['key']
elif 'encryptedKey' in config:
if password is None:
raise Exception('you must use the -p option to pass the password in for your signal db')
return decrypt_encrypted_key(password, config['encryptedKey'])
else:
dict_keys = ', '.join(str(x) for x in config.keys())
raise Exception(f'no `key` or `encryptedKey` found for db in config.json, only saw: {dict_keys}')
def query_signal_db(db, key, query):
"""Runs `query` against the signal DB, assumes each row is a json result,
and returns a generator which yields dictionaries.
"""
result = subprocess.run(
[SQLCIPHER_PATH,
'-list', '-noheader', db,
f"PRAGMA key = \"x'{key}'\";{query}"],
capture_output=True)
messages = result.stdout
prefix = messages[:3]
if prefix != b'ok\n':
raise Exception(f'unexpected result from sqlcipher: {prefix}...')
messages = messages[3:]
messages = messages.decode('utf8').split('\n')
for line in messages:
if line == '':
continue
yield json.loads(line)
def get_signal_uuid(db, key):
uuid = next(query_signal_db(db,
key,
"select json from items where id='uuid_id'")
)['value']
assert uuid.endswith('.2')
return uuid[:-2] # for some reason mine has .2 at the end. version?
def get_messages(db, key):
"""Returns every message in the signal db.
Given a path to a signal sqlite db, and a decryption key, returns a
generator that yields json objects- one for each message in the message db.
"""
host_uuid = get_signal_uuid(db, key)
for message in query_signal_db(db, key, 'select json from messages;'):
cid = message['conversationId']
if 'body' not in message:
continue
body = message['body']
timestamp = message['timestamp']
if message['type'] == 'incoming':
yield Message(message['sourceServiceId'], cid, timestamp, body)
if message['type'] == 'outgoing':
yield Message(host_uuid, cid, timestamp, body)
def get_conversations(db, key, group_only=False):
"""Returns every conversation in the signal db.
A conversation represents a group chat or DM.
Given a path to a signal sqlite db, and a decryption key, returns a
generator that yields json objects- one for each message in the message db.
"""
results = query_signal_db(db, key, 'select json from conversations;')
if group_only:
return (g for g in results
if g['type'] == 'group')
else:
return results
def get_conversation_names(db, key):
""" Returns a dictionary that maps jrom converation ID to its name """
messages = get_conversations(db, key, group_only=True)
return {message['id']: message.get('name')
for message in messages
if message.get('name')}
def get_service_ids(db, key):
messages = get_messages(db, key)
return {m.sender for m in messages}
def get_profile_names(db, key):
convos = get_conversations(db, key)
privates = (c for c in convos if c['type'] == 'private')
return {c['serviceId']: c.get('profileName') for c in privates}
def get_membership(db, key):
convos = get_conversations(db, key)
groups = [{'name': c.get('name'), 'members': [m['aci'] for m in c.get('membersV2', [])]}
for c in convos if c['type'] == 'group']
return groups
def print_messages_by_user(db, key, regex):
unique_service_ids = get_service_ids(db, key)
profile_names = get_profile_names(db, key)
names_to_sids = {val: key for key, val in profile_names.items()}
convo_names = get_conversation_names(db, key)
print(f'incoming messages from {len(unique_service_ids)} unique service ids')
print(f'profile names for {len(profile_names)} service ids')
for sid in unique_service_ids:
assert sid in profile_names
print()
username = None
for name in names_to_sids:
if name and re.search(regex, name):
username = name
sid = names_to_sids[name]
print(username, sid)
break
assert username is not None
for m in get_messages(db, key):
if m.sender == sid:
convo = convo_names.get(m.conversation, 'DM')
print(f'[{convo}]:', m.body)
def print_messages_by_chat(db, key, regex):
profile_names = get_profile_names(db, key)
cid, name = None, None
for cid, name in get_conversation_names(db, key).items():
if re.search(regex, name):
# `name` persists in the code below
break
else:
return
for message in get_messages(db, key):
if message.conversation != cid:
continue
name = profile_names.get(message.sender)
print(f'{name}: ', message.body)
def get_message_counts(db, key, chat_list=None):
messages = get_messages(db, key)
convos = defaultdict(lambda: defaultdict(int))
for message in messages:
if chat_list is None or message.conversation in chat_list:
convos[message.conversation][message.sender] += 1
# convos[convo][user] = max(convos[convo][user], 1)
convoIds, messageCounts = zip(*convos.items())
v = DictVectorizer(sparse=False)
return v, convoIds, v.fit_transform(messageCounts)
def resolve_filter_list(convo_id_to_name, filter_file):
name_to_convo_id = {val: key for key, val in convo_id_to_name.items()}
with open(filter_file) as fh:
chat_list = fh.read().split('\n')
if chat_list and chat_list[-1].isspace() or chat_list[-1] == '':
chat_list.pop()
collecting = []
for name in chat_list:
if cid := name_to_convo_id.get(name):
collecting.append(cid)
else:
logger.warn(f'you are either not a member of the group "{name}" or you mispelled the name in chat_list.txt')
return collecting
def dump_message_count_table(db, key, output, filter_file=None):
convo_id_to_name = get_conversation_names(db, key)
if filter_file is None:
chat_list = None
else:
chat_list = resolve_filter_list(convo_id_to_name, filter_file)
v, convoIds, message_counts = get_message_counts(db, key, chat_list)
profile_names = get_profile_names(db, key)
convo_labels = [convo_id_to_name.get(cid) for cid in convoIds]
people_labels = [profile_names.get(x) for x in v.get_feature_names_out()]
df = DataFrame(message_counts,
columns=people_labels,
index=convo_labels)
logger.info(f'writing message count table to {output}')
df[~pd.isna(df.index)].to_csv(output)