diff --git a/Pipfile b/Pipfile index c29b2292..fe61303a 100644 --- a/Pipfile +++ b/Pipfile @@ -23,6 +23,7 @@ lockfile = "*" defusedxml = "*" "flask-rq2" = "*" simplejson = "*" +asn1crypto = "*" [dev-packages] bandit = "*" diff --git a/Pipfile.lock b/Pipfile.lock index a4ed6380..b69b40cf 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "7aff94ddfb4f3f3ebf7f7910f3ade4eebd546b297cf72863a618824f87ec76fc" + "sha256": "03d5c2a739febe9a3c10d599ad5825ef603130098ecd73ce9833310d1eaed253" }, "pipfile-spec": 6, "requires": { @@ -36,6 +36,7 @@ "sha256:2f1adbb7546ed199e3c90ef23ec95c5cf3585bac7d11fb7eb562a3fe89c64e87", "sha256:9d5c20441baf0cb60a4ac34cc447c6c189024b6b4c6cd7877034f4965c464e49" ], + "index": "pypi", "version": "==0.24.0" }, "certifi": { diff --git a/atst/utils/pdf_verification.py b/atst/utils/pdf_verification.py new file mode 100644 index 00000000..ba6fc489 --- /dev/null +++ b/atst/utils/pdf_verification.py @@ -0,0 +1,225 @@ +import hashlib +from OpenSSL import crypto +from asn1crypto import cms, pem, core +from atst.domain.authnid.crl import CRLCache, CRLRevocationException +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding + + +class PDFSignature: + def __init__(self, byte_range_start=None, pdf=None): + self.pdf = pdf + self.byte_range_start = byte_range_start + self._signers_cert = None + + # assert byte_range_start != -1 and self.start != -1 and self.stop != -1 + + @property + def byte_range(self): + """ + This returns an array of 4 numbers that represent the byte range of + the PDF binary file that is signed by the certificate. + + E.G: [0, 2045, 3012, 5012] + + Bytes 0 to 2045 - represent part A of the signed file + Bytes 2046 to 3012 - would contain the signature and certificate information + Bytes 3013 to 5012 - represent part B of the signed file + """ + start = self.pdf.find(b"[", self.byte_range_start) + stop = self.pdf.find(b"]", start) + contents_range = [int(i, 10) for i in self.pdf[start + 1 : stop].split()] + + return contents_range + + @property + def signed_binary_data(self): + """ + This is the binary data stored in the signature + """ + br = self.byte_range + contents = self.pdf[br[0] + br[1] + 1 : br[2] - 1] + data = [] + + for i in range(0, len(contents), 2): + data.append(int(contents[i : i + 2], 16)) + + return cms.ContentInfo.load(bytes(data))["content"] + + @property + def signers_cert(self): + """ + This returns the certificate used to sign the PDF + """ + if self._signers_cert == None: + for cert in self.signed_binary_data["certificates"]: + if ( + self.signers_serial + == cert.native["tbs_certificate"]["serial_number"] + ): + cert = cert.dump() + self._signers_cert = pem.armor("CERTIFICATE", cert) + break + + return self._signers_cert + + @property + def signers_serial(self): + """ + Return the signers serial from their certificate + """ + return self.signed_binary_data["signer_infos"][0]["sid"].native["serial_number"] + + @property + def hashing_algorithm(self): + """ + This is the hashing algorithm used to generate the hash of binary file content + which is then signed by the certificate. + + E.G. sha256, sha1 + """ + return self.signed_binary_data["digest_algorithms"][0]["algorithm"].native + + @property + def cert_common_name(self): + """ + This returns the common name on the certificate. This might be a name or + a DOD ID for example. + """ + return ( + crypto.load_certificate(crypto.FILETYPE_PEM, self.signers_cert) + .get_subject() + .commonName + ) + + @property + def encrypted_hash_of_signed_document(self): + """ + This is the calculated hash of the PDF binary data stored in the + signature. We calculate it outselves and then compare to this + so we can see if data has changed. + """ + stored_hash = None + + for attr in self.signed_binary_data["signer_infos"][0]["signed_attrs"]: + if attr["type"].native == "message_digest": + stored_hash = attr["values"].native[0] + break + + return stored_hash + + @property + def binary_data(self): + """ + Take the byte range and return the binary data for that rage. + """ + br = self.byte_range + data1 = self.pdf[br[0] : br[0] + br[1]] + data2 = self.pdf[br[2] : br[2] + br[3]] + + return data1 + data2 + + @property + def hashed_binary_data(self): + """ + Takes the data in the byte range and hashes it using + the hashing algorithm specified in the signed PDF. We + can later compare this to the encrypted_hash_of_signed_document. + """ + return getattr(hashlib, self.hashing_algorithm)(self.binary_data) + + @property + def is_cert_valid(self): + """ + Takes the signing certificate and runs it through the CRLCache + checker. Returns a boolean. + """ + try: + cache = CRLCache( + "ssl/server-certs/ca-chain.pem", + crl_locations=["ssl/client-certs/client-ca.der.crl"], + ) + return cache.crl_check(self.signers_cert) + except CRLRevocationException: + return False + + @property + def is_signature_valid(self): + """ + Get signed PDF signature and determine if it was actually signed + by the certificate that it claims it was. Returns a boolean. + """ + public_key = ( + crypto.load_certificate(crypto.FILETYPE_PEM, self.signers_cert) + .get_pubkey() + .to_cryptography_key() + ) + attrs = self.signed_binary_data["signer_infos"][0]["signed_attrs"] + signedData = None + + if attrs is not None and not isinstance(attrs, core.Void): + signedData = attrs.dump() + signedData = b"\x31" + signedData[1:] + else: + signedData = self.binary_data + + try: + public_key.verify( + bytes(self.signed_binary_data["signer_infos"][0]["signature"]), + signedData, + padding.PKCS1v15(), + getattr(hashes, self.hashing_algorithm.upper())(), + ) + return True + except Exception: + return False + + @property + def to_dict(self): + is_cert_valid = self.is_cert_valid + is_signature_valid = self.is_signature_valid + is_hash_valid = ( + self.hashed_binary_data.digest() == self.encrypted_hash_of_signed_document + ) + + return { + "cert_common_name": self.cert_common_name, + "hashed_binary_data": self.hashed_binary_data.hexdigest(), + "hashing_algorithm": self.hashing_algorithm, + "is_valid": is_cert_valid and is_hash_valid and is_signature_valid, + "is_valid_cert": is_cert_valid, + "is_valid_hash": is_hash_valid, + "is_valid_signature": is_signature_valid, + "signers_serial": self.signers_serial, + } + + +def pdf_signature_validations(pdf=None): + signatures = [] + start_byte = 0 + + while True: + start = start_byte + 1 + n = pdf.find(b"/ByteRange", start) + + if n == -1: + break + + signatures.append(PDFSignature(byte_range_start=n, pdf=pdf)) + start_byte = n + + response = {"result": None, "signature_count": len(signatures), "signatures": []} + + for signature in signatures: + sig = signature.to_dict + response["signatures"].append(sig) + + if not sig["is_valid"]: + response["result"] = "FAILURE" + elif response["result"] is not "FAILURE": + response["result"] = "OK" + + if len(signatures) == 0: + response["result"] = "FAILURE" + + return response diff --git a/tests/fixtures/sally-darth-signed.pdf b/tests/fixtures/sally-darth-signed.pdf new file mode 100644 index 00000000..67c2cf6c Binary files /dev/null and b/tests/fixtures/sally-darth-signed.pdf differ diff --git a/tests/fixtures/signed-expired-cert.pdf b/tests/fixtures/signed-expired-cert.pdf new file mode 100644 index 00000000..550c2365 Binary files /dev/null and b/tests/fixtures/signed-expired-cert.pdf differ diff --git a/tests/fixtures/signed-pdf-not-dod.pdf b/tests/fixtures/signed-pdf-not-dod.pdf new file mode 100644 index 00000000..8858f933 Binary files /dev/null and b/tests/fixtures/signed-pdf-not-dod.pdf differ diff --git a/tests/utils/test_pdf_verification.py b/tests/utils/test_pdf_verification.py new file mode 100644 index 00000000..648a84ec --- /dev/null +++ b/tests/utils/test_pdf_verification.py @@ -0,0 +1,151 @@ +import pytest +from atst.utils.pdf_verification import pdf_signature_validations + + +def test_unsigned_pdf(): + unsigned_pdf = open("tests/fixtures/sample.pdf", "rb").read() + result = pdf_signature_validations(pdf=unsigned_pdf) + + assert result == {"result": "FAILURE", "signature_count": 0, "signatures": []} + + +def test_valid_signed_pdf(): + valid_signed_pdf = open("tests/fixtures/sally-darth-signed.pdf", "rb").read() + result = pdf_signature_validations(pdf=valid_signed_pdf) + + assert result == { + "result": "OK", + "signature_count": 2, + "signatures": [ + { + "cert_common_name": "WILLIAMS.SALLY.3453453453", + "hashed_binary_data": "b879a15e19eece534dc63019d3fe539ff4a3efbf8e8f5403a8bdae26a9b713ea", + "hashing_algorithm": "sha256", + "is_valid": True, + "is_valid_cert": True, + "is_valid_hash": True, + "is_valid_signature": True, + "signers_serial": 9_662_248_800_192_484_626, + }, + { + "cert_common_name": "VADER.DARTH.9012345678", + "hashed_binary_data": "d98339766c20a369219f236220d7b450111554acc902e242d015dd6d306c7809", + "hashing_algorithm": "sha256", + "is_valid": True, + "is_valid_cert": True, + "is_valid_hash": True, + "is_valid_signature": True, + "signers_serial": 9_662_248_800_192_484_627, + }, + ], + } + + +def test_signed_pdf_thats_been_modified(): + valid_signed_pdf = open("tests/fixtures/sally-darth-signed.pdf", "rb").read() + modified_pdf = valid_signed_pdf.replace(b"PDF-1.6", b"PDF-1.7") + result = pdf_signature_validations(pdf=modified_pdf) + + assert result == { + "result": "FAILURE", + "signature_count": 2, + "signatures": [ + { + "cert_common_name": "WILLIAMS.SALLY.3453453453", + "hashed_binary_data": "d1fb3c955b57f139331586276ba4abca90ecc5d36b53fe6bbbbbd8707d7124bb", + "hashing_algorithm": "sha256", + "is_valid": False, + "is_valid_cert": True, + "is_valid_hash": False, + "is_valid_signature": True, + "signers_serial": 9_662_248_800_192_484_626, + }, + { + "cert_common_name": "VADER.DARTH.9012345678", + "hashed_binary_data": "75ef47824de4b5477c75665c5a90e39a2b8a8985422cf2f7f641661a7b5217a8", + "hashing_algorithm": "sha256", + "is_valid": False, + "is_valid_cert": True, + "is_valid_hash": False, + "is_valid_signature": True, + "signers_serial": 9_662_248_800_192_484_627, + }, + ], + } + + +def test_signed_pdf_not_on_chain(): + signed_pdf_not_on_chain = open("tests/fixtures/signed-pdf-not-dod.pdf", "rb").read() + result = pdf_signature_validations(pdf=signed_pdf_not_on_chain) + + assert result == { + "result": "FAILURE", + "signature_count": 1, + "signatures": [ + { + "cert_common_name": "John B Harris", + "hashed_binary_data": "3f0047e6cb5b9bb089254b20d174445c3ba4f513", + "hashing_algorithm": "sha1", + "is_valid": False, + "is_valid_cert": False, + "is_valid_hash": True, + "is_valid_signature": True, + "signers_serial": 514, + } + ], + } + + +@pytest.mark.skip(reason="Need fixture file") +def test_signed_pdf_dod_revoked(): + signed_pdf_dod_revoked = open( + "tests/fixtures/signed-pdf-dod_revoked.pdf", "rb" + ).read() + result = pdf_signature_validations(pdf=signed_pdf_dod_revoked) + + assert result == { + "result": "FAILURE", + "signature_count": 1, + "signatures": [ + { + "cert_common_name": None, + "hashed_binary_data": None, + "hashing_algorithm": None, + "is_valid": None, + "is_valid_cert": None, + "is_valid_hash": None, + "signers_serial": None, + } + ], + } + + +def test_signed_dod_pdf_signer_cert_expired(): + # + # TODO: Is this good enough? Do we want an expired DOD certificate? This test is using + # a fake DOD certificate. + # + signed_pdf_dod_revoked = open("tests/fixtures/signed-expired-cert.pdf", "rb").read() + result = pdf_signature_validations(pdf=signed_pdf_dod_revoked) + + assert result == { + "result": "FAILURE", + "signature_count": 1, + "signatures": [ + { + "cert_common_name": "Bob Alice", + "hashed_binary_data": "bcfad46c89b1695325f5b6e73b589d086e3925ab384def6fcb13904991e69077", + "hashing_algorithm": "sha256", + "is_valid": False, + "is_valid_cert": False, + "is_valid_hash": True, + "is_valid_signature": True, + "signers_serial": -180_673_825_300_246_991_177_196, + } + ], + } + + +@pytest.mark.skip(reason="TODO") +def test_crl_check_unavailable(): + pass