#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Handle sending out of signed schleuder keyrings (to anyone, not just subscribers). This code is (c) 2020 by Rote Hilfe Heidelberg , distributed unter the GNU Affero General Public Licence. See http://www.gnu.org/licenses/agpl-3.0.html Dependency: python3-gpg Here's how this is intended to work: (1) send out a mail to the schleuder to get its keyring: send-keys request keys (this uses LIST_REQUEST_ADDRESS and SUBSCRIBED_USER; both need to be adapted when you use this somewhere else). (2) When you get back the mail, pipe it to send_keys to generate a signed template: | send-keys create template (this is for mutt or similar MUAs; others may need to save the mail and then run create template). Copy keyring.asc to TEMPLATE_DIST (/var/share/send-keys/keyring-template.txt) on the system that will distribute the material. (3) To send a signed copy to addr@example.org, run: send-keys send addr@example.org on the distributing system. (4) On the distributing system, you can run this as a cgi; better: run send-keys sever (it binds to localhost) and reverse-proxy it. You will have to set the path we're running in LOCATION. All this assumes there's a working sendmail (or equivalent) on both the local (signing) and distributing box. If you don't have that already, check nullmailer. """ import glob import hashlib import io import os import shutil import struct import subprocess import sys import tempfile import traceback from http import server import email from email import message # new API, migrate there from email import policy # new API, migrate there from email.encoders import encode_7or8bit, encode_noop from email.mime.application import MIMEApplication from email.mime.message import MIMEMessage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.parser import Parser import gpg def ws_path(rel): """returns a path rel within our working space. """ return os.path.join("/var/share/send-keys", rel) # name of the formatted mail to be sent (create writes there) TEMPLATE_FILENAME = ws_path("keyring-template.txt") # name of a file just containing the keyring (again, written by create) KEYRING_FILENAME = ws_path("keyring.txt") # name of a directory containing extra keys (as name.key) EXTRA_KEYS_DIR = ws_path("extra-keys") # directory for webkeys WEBKEY_DIR = ws_path("webkeys") # address of the *request* endpoint of the list LIST_REQUEST_ADDRESS = "rh-alle-request@cryptolists.so36.net" # address of a user subscribed to the list; this is the person # that will receive the keyring SUBSCRIBED_USER = "datenschutzbeauftragte_r@rote-hilfe.de" # address the keyring mail is supposed to be sent from FROM_USER = "schluessel@rote-hilfe.de" # key id that will be used to sign the mail SIGNING_USER = "schluessel@rote-hilfe.de" # HTTP path relative to the embedding server this script is visible at. SCRIPT_PATH = "/sk" class NoKey(Exception): pass def get_key_for(ctx, address, secret=False): """returns a a key for address. This will return a non-expired key that has a non-revoked uid for address. If no such thing exists, some suitable exception is being raised. """ for key in ctx.keylist(address, secret=secret): if key.expired: continue for uid in key.uids: if uid.revoked: continue if uid.email==address: return key raise NoKey("Cannot find a non-expired, non-revoked key for %s"% address) def format_signed_mail(signers, payload): """returns bytes for a signed PGP/MIME mail containing the MIME message payload. You'll get back a MIMEMultipart that you still have to furnish with mail headers, as well as the normalised payload and the detached signature for non-mail use. """ normalised_payload = payload.as_string( ).replace('\n', '\r\n').encode("ascii") with gpg.Context(armor=True, signers=signers) as ctx: signature, r = ctx.sign( normalised_payload, mode=gpg.constants.sig.mode.DETACH) sig_att = MIMEApplication(_data=signature, _subtype="pgp-signature", _encoder=encode_7or8bit) sig_att['Content-Description'] = 'OpenPGP signature' msg = MIMEMultipart('signed', protocol='application/pgp-signature') msg.attach(payload) msg.attach(sig_att) return msg, normalised_payload, signature def make_template(data): with gpg.Context() as ctx: signers = [get_key_for(ctx, SIGNING_USER)] payload = MIMEApplication(_data=data, _subtype='pgp-keys', _encoder=encode_noop) payload['Content-Description'] = "Rote Hilfe PGP keyring" payload['Content-Disposition'] = "inline" msg, normalised, detached_sig = format_signed_mail(signers, payload) msg["Subject"] = "Rote Hilfe Keyring" msg["From"] = FROM_USER msg["Reply-To"] = SUBSCRIBED_USER msg["To"] = "%%%%recipient%%%%" return msg.as_string() def add_extra_keys(ctx): """adds keys from EXTRA_LEYS_DIR to the gpg context ctx. """ if not os.path.exists(EXTRA_KEYS_DIR): return for f_name in glob.glob(EXTRA_KEYS_DIR+"/*.key"): with open(f_name, "r", encoding="utf-8") as f: ctx.key_import(f) def attachments_to_keyring(in_file): """turns the keys attached to an (encrypted) mail in in_file into a keyring and returns that ascii-armored. """ # locate the ciphertext msg = email.message_from_binary_file(in_file) for part in msg.walk(): if part["Content-Description"]=="OpenPGP encrypted message": break else: raise Exception("No ciphertext found") # decrypt it, building the actual message keys = [] with gpg.Context() as ctx: cleartext, _, _ = ctx.decrypt(part.get_payload().encode("ascii")) with tempfile.TemporaryDirectory() as home: with gpg.Context(armor=True, offline=True, home_dir=home) as ctx: msg = email.message_from_bytes(cleartext) for part in msg.walk(): if part.get_content_type()=="application/pgp-keys": ctx.key_import(io.BytesIO(part.get_payload().encode("ascii"))) add_extra_keys(ctx) return ctx.key_export_minimal() def send_mail(mail_text, sender=FROM_USER): """hands over mail_text as-is to sendmail. """ sendmail = subprocess.Popen(["/usr/sbin/sendmail", "-oi", "-odq", "-f", sender, "-t"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) sendmail.communicate(mail_text.encode("ascii")) def send_keyring(recipient): with open(TEMPLATE_FILENAME, "rb") as f: tpl = f.read().decode("ascii") tpl = tpl.replace("%%%%recipient%%%%", recipient) send_mail(tpl) _ZB_SYMBOLS = "ybndrfg8ejkmcpqxot1uwisza345h769" def zbase32(data): """returns the bytes in data z-base-32 (RFC6189) encoded. The encoded data comes as a string. """ data = list(data) res, cur_bits, cur_in = [], 0, 0 while data or cur_in>=5: if cur_in<5: cur_bits = ((cur_bits<<8)+data.pop(0)) & 0xffff cur_in += 8 symbol_ind = (cur_bits>>(cur_in-5)) & 31 res.append(_ZB_SYMBOLS[symbol_ind]) cur_in -= 5 if cur_in>0: res.append(_ZB_SYMBOLS[(cur_bits << (5-cur_in)) & 31]) return "".join(res) def zbase32_dec(enc): """returns the z-base-32-decoded (RFC6109) enc as bytes. """ if isinstance(enc, str): enc = enc.encode("ascii") byte_to_bits = dict((b, i) for i, b in enumerate(_ZB_SYMBOLS.encode("ascii"))) res, accum, cur_bits = [], 0, 0 for char in enc: accum, cur_bits = (accum<<5)+byte_to_bits[char], cur_bits+5 if cur_bits>=8: res.append((accum >> (cur_bits-8)) & 0xff) accum, cur_bits = accum & 0xff, cur_bits-8 if cur_bits: res.append(accum & ((1<<(cur_bits+1))-1)) return bytes(res) def export_webkey(key, home, dir): """writes webkeys for the non-revoked uids on key to dir. This only includes uids with the same domain as SIGNING_USER. """ _, for_domain = SIGNING_USER.lower().split("@") for uid in key.uids: local_part, domain = uid.address.lower().split("@") if domain!=for_domain: continue dest_name = os.path.join(dir, zbase32(hashlib.sha1(local_part.encode("utf-8")).digest())) with gpg.Context(home_dir=home) as ctx: with open(dest_name, "wb") as f: f.write(ctx.key_export_minimal(pattern=uid.address)) def make_web_keys(keyring): """writes the keys in the (serialised) keying as webkeys according to draft-koch-openpgp-webkey-service-13. """ if not os.path.isdir(WEBKEY_DIR): return with tempfile.TemporaryDirectory() as home: with gpg.Context(home_dir=home) as ctx: ctx.key_import(keyring) for key in ctx.keylist(): export_webkey(key, home, WEBKEY_DIR) def send_key_request_mail(): """sends a mail to request all keys from LIST_REQUEST_ADDRESS as SUBSCRIBED_USER. This is the PGP/MIME version, and both this (inline signature) and some code with a detatched signature didn't work with schleuder. I have no idea why it doesn't recognise either version (both are, as far as I can see RFC 3156 compliant). I'm using inline PGP for now (see below) but let this stand because this is what we should eventually use. This is signed and encrypted as SUBSCRIBED_USER as required by Schleuder. """ list_address = LIST_REQUEST_ADDRESS.replace("-request@", "@") mail_body = ("x-list-name: %s\nx-list-keys\nx-get-key: *\n"%list_address) inner_mail = MIMEText(_text=mail_body, _subtype="plain") with gpg.Context(armor=True) as ctx: signers = [get_key_for(ctx, SUBSCRIBED_USER, secret=True)] with gpg.Context(armor=True, signers=signers) as ctx: recipients = [get_key_for(ctx, LIST_REQUEST_ADDRESS)] crypted, _, _ = ctx.encrypt(inner_mail.as_string().encode("ascii"), recipients, always_trust=True, sign=True) payload = MIMEApplication(_data=crypted, _subtype='octet-stream; name="encrypted.asc"', _encoder=encode_7or8bit) payload['Content-Description'] = 'OpenPGP encrypted message' payload['Content-Disposition'] = 'attachment; filename="encrypted.asc"' payload.set_charset('us-ascii') control = MIMEApplication(_data='Version: 1\n', _subtype='pgp-encrypted', _encoder=encode_7or8bit) control.set_charset('us-ascii') msg = MIMEMultipart('encrypted', protocol='application/pgp-encrypted') msg.attach(control) msg.attach(payload) msg["Subject"] = "Get keys" msg["From"] = SUBSCRIBED_USER msg["To"] = LIST_REQUEST_ADDRESS send_mail(msg.as_string(), sender=SUBSCRIBED_USER) def parse_command_line(): import argparse parser = argparse.ArgumentParser() parser.add_argument("action", type=str, choices=["create", "send", "request", "serve"]) parser.add_argument("arg", type=str) return parser.parse_args() def cli_main(): """executed when I think I'm running as a command line programme. """ args = parse_command_line() if args.action=="create": keyring = attachments_to_keyring(sys.stdin.buffer) with open(KEYRING_FILENAME, "wb") as f: f.write(keyring) make_web_keys(keyring) tpl = make_template(keyring) with open(TEMPLATE_FILENAME, "wb") as f: f.write(tpl.encode("ascii")) elif args.action=="send": send_keyring(args.arg) elif args.action=="request": send_key_request_mail() elif args.action=="serve": run_server(args.arg) #################### CGI/web server stuff below here import cgi import re # The initial empty line is important in cgi mode (it's the end of # the headers) COMMON_HEADER = """ """ COMMON_FOOTER = f"""

[Bei Rückfragen] | [Quelltext]

""" FORM_TEMPLATE = COMMON_HEADER+f""" Schlüsselbund bestellen

Schlüsselbund bestellen

Wenn ihr unten eine Mailadresse angebt und die kleine Frage beantwortet (das machen wir, damit nicht wildlaufende Robots hunderten Menschen unerwünschte Mail schicken), bekommt ihr einen signierten Schlüsselbund zugeschickt mit allen Schlüsseln, die ihr für den Kontakt mit der Roten Hilfe brauchen könntet.

Infos zur zuverlässigen Verifizierung mithilfe einer Papierausgabe der Zeitung der Roten Hilfe (Ups: Durch ein Versehen ist der Fingerprint in der RHZ 3/2020 falsch – sorry) findet ihr auf der Kontakt-Seite der Roten Hilfe.


Wenn ihr eurem HTTPS traut, könnt ihr den Schlüsselbund auch einfach runterladen.

Ergänzung zu unseren Datenschutzhinweisen: Wenn ihr hier eine Mail verschickt, bleibt auf unserem Server für bis zu vier Tage ein Log über die Verschickung der Mail mit Zeit und Zieladresse, bei Fehlern auch länger (aber das ist dann wahrscheinlich nicht eure Adresse). Wie lange Relais (also Maschinen, die die Mails zwischen uns und euch bearbeiten) solche Logs behalten, liegt natürlich nicht in unserer Hand.

"""+COMMON_FOOTER SUCCESS_TEMPLATE = COMMON_HEADER+f""" Schlüsselbund verschickt

Schlüsselbund verschickt

Mail ist asynchron, es kann also etwas dauern, bis der Schlüsselbund ankommt (ca. 1 Minute ist zu erwarten). Wenn wirklich auch in einer Stunde oder noch nichts angekommen ist, probiers nochmal, am besten mit einer anderen Adresse.

"""+COMMON_FOOTER ERROR_TEMPLATE = COMMON_HEADER+f""" Hat nicht geklappt

Hat nicht geklappt

Ich habe die Mail nicht verschickt, weil es einen Fehler gab. Vielleicht hilft dir Folgendes:

{{}}

Probiers nochmal.

"""+COMMON_FOOTER def send_to_client_cgi(s): """sends a string to stdout, encoding it in utf-8. Use this as send_to_client when running as a CGI. """ sys.stdout.buffer.write(s.encode("utf-8")) def serve_form(send_to_client): """writes a form to enter a mail address to stdout. """ send_to_client(FORM_TEMPLATE.format()) def handle_form(send_to_client, form): """interprets the form by sending a mail if the captcha's right. """ try: if form.getfirst("captcha", "")=="Solidaritat": dest_addr = form.getfirst("dest_address", "") # so some basic sanitation; dest_addr is directly copied into a # mail header dest_addr = re.sub( "[^ -~]+", "", dest_addr.encode("ascii", "ignore" ).decode("ascii")[:100]) send_keyring(dest_addr) send_to_client(SUCCESS_TEMPLATE.format()) else: send_to_client(ERROR_TEMPLATE.format( "Die richtige Lösung fürs Textcha ist „Solidaritat”.")) except Exception as ex: traceback.print_exc() send_to_client(ERROR_TEMPLATE.format( str(ex).replace("<", "<"))) def cgi_main(): """executed when I think I'm running as a CGI. """ send_to_client_cgi("Content-type: text/html;charset=utf-8\n") if os.environ["REQUEST_METHOD"]=="POST": handle_form(send_to_client_cgi, cgi.FieldStorage()) else: serve_form(send_to_client_cgi) class HTTPRequestHandler(server.BaseHTTPRequestHandler): """The request handler we use when run in stand-alone server (non-cgi) mode. """ def _serve_content(self, response_code, content_type, content, encoding=None, additional_headers={}): """serves content, encoding strings if necessary """ if encoding and isinstance(content, str): content = content.encode(encoding) self.send_response(response_code) self.send_header("Content-type", content_type) self.send_header("Content-length", "%d"%len(content)) for key, value in additional_headers.items(): self.send_header(key, value) self.end_headers() self.wfile.write(content) def _serve_error(self, err_code, msg): """serves an error page with an http response code errCode. """ self._serve_content(err_code, "text/html; charset=utf-8", ERROR_TEMPLATE.format(msg), encoding="utf-8") def _handle_with_content_producer(self, content_producer, *args): """calls content_producer and serves out what it passes to its argument. content_producer must take one function(s) that arranges for s to be served to the client. args will be passed as positional arguments to content_producer. """ try: buf = [] content_producer(buf.append, *args) self._serve_content(200, "text/html;charset=utf-8", "".join(buf), encoding="utf-8") except Exception as msg: traceback.print_exc() self._serve_error(500, str(msg)) def _do_404(self): self._serve_error(404, "{} does not exist on this server".format(self.path)) def do_GET(self): if self.path==SCRIPT_PATH: self._handle_with_content_producer(serve_form) elif self.path==SCRIPT_PATH+"/source": with open(__file__, encoding="utf-8") as f: self._serve_content(200, "text/plain;charset=utf-8", f.read(), encoding="utf-8") elif self.path==SCRIPT_PATH+"/keyring": with open(KEYRING_FILENAME) as f: self._serve_content(200, "text/plain;charset=ascii", f.read(), encoding="ascii") else: self._do_404() def do_POST(self): if self.path!=SCRIPT_PATH: self._do_404() payload = "" if "content-length" in self.headers: payload = self.rfile.read(int(self.headers["content-length"])) # hack to make cgi.FieldStorage parse our payload os.environ["REQUEST_METHOD"] = "POST" self._handle_with_content_producer( handle_form, cgi.FieldStorage( io.BytesIO(payload), headers=self.headers)) def run_server(listen_port): """runs an http server standing in for the CGIs on listen_port """ listen_port = int(listen_port) https = server.ThreadingHTTPServer(('localhost', listen_port), HTTPRequestHandler) https.serve_forever() if __name__=="__main__": if "REQUEST_METHOD" in os.environ: cgi_main() else: cli_main()