diff --git a/dsmr_parser/parsers.py b/dsmr_parser/parsers.py index d49d2bd..017b877 100644 --- a/dsmr_parser/parsers.py +++ b/dsmr_parser/parsers.py @@ -32,6 +32,7 @@ def __init__(self, telegram_specification, apply_checksum_validation=True): object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE) for object in self.telegram_specification['objects'] } + self._telegram_encryption_active = None def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901 """ @@ -46,38 +47,11 @@ def parse(self, telegram_data, encryption_key="", authentication_key="", throw_e :raises ParseError: :raises InvalidChecksumError: """ - - if "general_global_cipher" in self.telegram_specification: - if self.telegram_specification["general_global_cipher"]: - enc_key = unhexlify(encryption_key) - auth_key = unhexlify(authentication_key) - telegram_data = unhexlify(telegram_data) - apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) - if apdu.security_control.security_suite != 0: - logger.warning("Untested security suite") - if apdu.security_control.authenticated and not apdu.security_control.encrypted: - logger.warning("Untested authentication only") - if not apdu.security_control.authenticated and not apdu.security_control.encrypted: - logger.warning("Untested not encrypted or authenticated") - if apdu.security_control.compressed: - logger.warning("Untested compression") - if apdu.security_control.broadcast_key: - logger.warning("Untested broadcast key") - telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") - else: - try: - if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: - raise RuntimeError("Looks like a general_global_cipher frame " - "but telegram specification is not matching!") - except Exception: - pass - else: - try: - if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: - raise RuntimeError( - "Looks like a general_global_cipher frame but telegram specification is not matching!") - except Exception: - pass + telegram_data = self.decrypt_telegram_data( + telegram_data=telegram_data, + encryption_key=encryption_key, + authentication_key=authentication_key + ) if self.apply_checksum_validation and self.telegram_specification['checksum_support']: self.validate_checksum(telegram_data) @@ -112,6 +86,42 @@ def parse(self, telegram_data, encryption_key="", authentication_key="", throw_e return telegram + def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data): + """ + Check if telegram data is encrypted and decrypt if applicable. + """ + # if self._telegram_encryption_active is False: + # # If encryption is not working, stop trying and logging warnings. + # return telegram_data + + if self.telegram_specification.get("general_global_cipher"): + enc_key = unhexlify(encryption_key) + auth_key = unhexlify(authentication_key) + telegram_data = unhexlify(telegram_data) + apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data) + if apdu.security_control.security_suite != 0: + logger.warning("Untested security suite") + if apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested authentication only") + if not apdu.security_control.authenticated and not apdu.security_control.encrypted: + logger.warning("Untested not encrypted or authenticated") + if apdu.security_control.compressed: + logger.warning("Untested compression") + if apdu.security_control.broadcast_key: + logger.warning("Untested broadcast key") + telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii") + self._telegram_encryption_active = True + else: + try: + if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG: + logger.warning("Looks like a general_global_cipher frame " + "but telegram specification is not matching!") + except Exception: + pass + self._telegram_encryption_active = False + + return telegram_data + @staticmethod def validate_checksum(telegram): """ diff --git a/test/clients/test_serialreader.py b/test/clients/test_serialreader.py deleted file mode 100644 index 3457f4c..0000000 --- a/test/clients/test_serialreader.py +++ /dev/null @@ -1,29 +0,0 @@ -import unittest -import tempfile -from unittest import mock - -from dsmr_parser import telegram_specifications -from dsmr_parser.clients.filereader import FileReader -from dsmr_parser.clients.serial_ import SerialReader -from dsmr_parser.clients.settings import SERIAL_SETTINGS_V5 - -from test.example_telegrams import TELEGRAM_V5 - - -class SerialReaderTest(unittest.TestCase): - - @mock.patch('dsmr_parser.clients.serial_.serial.Serial') - def test_read_as_object(self, mock_serial): - serial_handle_mock = mock_serial.return_value - # mock_serial.return_value.in_waiting = 1024 - mock_serial.return_value.read.return_value = [b'Telegram data...', b''] # Return data, then empty bytes - - - serial_reader = SerialReader( - device='/dev/ttyUSB0', - serial_settings=SERIAL_SETTINGS_V5, - telegram_specification=telegram_specifications.V5 - ) - - for telegram in serial_reader.read(): - print(telegram) # see 'Telegram object' docs below \ No newline at end of file diff --git a/test/test_parse_v5.py b/test/test_parse_v5.py index a321b21..db9d588 100644 --- a/test/test_parse_v5.py +++ b/test/test_parse_v5.py @@ -21,7 +21,7 @@ def test_parse(self): telegram = parser.parse(TELEGRAM_V5, throw_ex=True) except Exception as ex: assert False, f"parse trigged an exception {ex}" - print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__) + # P1_MESSAGE_HEADER (1-3:0.2.8) assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject) assert telegram.P1_MESSAGE_HEADER.unit is None