# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html # For details: https://github.com/pylint-dev/pylint/blob/main/LICENSE # Copyright (c) https://github.com/pylint-dev/pylint/blob/main/CONTRIBUTORS.txt from __future__ import annotations import csv import operator import platform import sys from collections import Counter from io import StringIO from pathlib import Path from typing import TextIO import pytest from _pytest.config import Config from pylint import checkers from pylint.config.config_initialization import _config_initialization from pylint.lint import PyLinter from pylint.message.message import Message from pylint.testutils.constants import _EXPECTED_RE, _OPERATORS, UPDATE_OPTION # need to import from functional.test_file to avoid cyclic import from pylint.testutils.functional.test_file import ( FunctionalTestFile, NoFileError, parse_python_version, ) from pylint.testutils.output_line import OutputLine from pylint.testutils.reporter_for_tests import FunctionalTestReporter MessageCounter = Counter[tuple[int, str]] PYLINTRC = Path(__file__).parent / "testing_pylintrc" class LintModuleTest: maxDiff = None def __init__( self, test_file: FunctionalTestFile, config: Config | None = None ) -> None: _test_reporter = FunctionalTestReporter() self._linter = PyLinter() self._linter.config.persistent = 0 checkers.initialize(self._linter) # See if test has its own .rc file, if so we use that one rc_file: Path | str = PYLINTRC try: rc_file = test_file.option_file self._linter.disable("suppressed-message") self._linter.disable("locally-disabled") self._linter.disable("useless-suppression") except NoFileError: pass self._test_file = test_file try: args = [test_file.source] except NoFileError: # If we're still raising NoFileError the actual source file doesn't exist args = [""] if config and config.getoption("minimal_messages_config"): with self._open_source_file() as f: messages_to_enable = {msg[1] for msg in self.get_expected_messages(f)} # Always enable fatal errors messages_to_enable.add("astroid-error") messages_to_enable.add("fatal") messages_to_enable.add("syntax-error") args.extend(["--disable=all", f"--enable={','.join(messages_to_enable)}"]) # Add testoptions self._linter._arg_parser.add_argument( "--min_pyver", type=parse_python_version, default=(2, 5) ) self._linter._arg_parser.add_argument( "--max_pyver", type=parse_python_version, default=(4, 0) ) self._linter._arg_parser.add_argument( "--min_pyver_end_position", type=parse_python_version, default=(3, 8) ) self._linter._arg_parser.add_argument( "--requires", type=lambda s: [i.strip() for i in s.split(",")], default=[] ) self._linter._arg_parser.add_argument( "--except_implementations", type=lambda s: [i.strip() for i in s.split(",")], default=[], ) self._linter._arg_parser.add_argument( "--exclude_platforms", type=lambda s: [i.strip() for i in s.split(",")], default=[], ) self._linter._arg_parser.add_argument( "--exclude_from_minimal_messages_config", default=False ) _config_initialization( self._linter, args_list=args, config_file=rc_file, reporter=_test_reporter ) self._check_end_position = ( sys.version_info >= self._linter.config.min_pyver_end_position ) self._config = config def setUp(self) -> None: if self._should_be_skipped_due_to_version(): pytest.skip( f"Test cannot run with Python {sys.version.split(' ', maxsplit=1)[0]}." ) missing = [] for requirement in self._linter.config.requires: try: __import__(requirement) except ImportError: missing.append(requirement) if missing: pytest.skip(f"Requires {','.join(missing)} to be present.") except_implementations = self._linter.config.except_implementations if except_implementations: if platform.python_implementation() in except_implementations: msg = "Test cannot run with Python implementation %r" pytest.skip(msg % platform.python_implementation()) excluded_platforms = self._linter.config.exclude_platforms if excluded_platforms: if sys.platform.lower() in excluded_platforms: pytest.skip(f"Test cannot run on platform {sys.platform!r}") if ( self._config and self._config.getoption("minimal_messages_config") and self._linter.config.exclude_from_minimal_messages_config ): pytest.skip("Test excluded from --minimal-messages-config") def runTest(self) -> None: self._runTest() def _should_be_skipped_due_to_version(self) -> bool: return ( # type: ignore[no-any-return] sys.version_info < self._linter.config.min_pyver or sys.version_info > self._linter.config.max_pyver ) def __str__(self) -> str: return f"{self._test_file.base} ({self.__class__.__module__}.{self.__class__.__name__})" @staticmethod def get_expected_messages(stream: TextIO) -> MessageCounter: """Parses a file and get expected messages. :param stream: File-like input stream. :type stream: enumerable :returns: A dict mapping line,msg-symbol tuples to the count on this line. :rtype: dict """ messages: MessageCounter = Counter() for i, line in enumerate(stream): match = _EXPECTED_RE.search(line) if match is None: continue line = match.group("line") if line is None: lineno = i + 1 elif line.startswith(("+", "-")): lineno = i + 1 + int(line) else: lineno = int(line) version = match.group("version") op = match.group("op") if version: required = parse_python_version(version) if not _OPERATORS[op](sys.version_info, required): continue for msg_id in match.group("msgs").split(","): messages[lineno, msg_id.strip()] += 1 return messages @staticmethod def multiset_difference( expected_entries: MessageCounter, actual_entries: MessageCounter, ) -> tuple[MessageCounter, dict[tuple[int, str], int]]: """Takes two multisets and compares them. A multiset is a dict with the cardinality of the key as the value. """ missing = expected_entries.copy() missing.subtract(actual_entries) unexpected = {} for key, value in list(missing.items()): if value <= 0: missing.pop(key) if value < 0: unexpected[key] = -value return missing, unexpected def _open_expected_file(self) -> TextIO: try: return open(self._test_file.expected_output, encoding="utf-8") except FileNotFoundError: return StringIO("") def _open_source_file(self) -> TextIO: if self._test_file.base == "invalid_encoded_data": return open(self._test_file.source, encoding="utf-8") if "latin1" in self._test_file.base: return open(self._test_file.source, encoding="latin1") return open(self._test_file.source, encoding="utf8") def _get_expected(self) -> tuple[MessageCounter, list[OutputLine]]: with self._open_source_file() as f: expected_msgs = self.get_expected_messages(f) if not expected_msgs: expected_msgs = Counter() with self._open_expected_file() as f: expected_output_lines = [ OutputLine.from_csv(row, self._check_end_position) for row in csv.reader(f, "test") ] return expected_msgs, expected_output_lines def _get_actual(self) -> tuple[MessageCounter, list[OutputLine]]: messages: list[Message] = self._linter.reporter.messages messages.sort(key=lambda m: (m.line, m.symbol, m.msg)) received_msgs: MessageCounter = Counter() received_output_lines = [] for msg in messages: assert ( msg.symbol != "fatal" ), f"Pylint analysis failed because of '{msg.msg}'" received_msgs[msg.line, msg.symbol] += 1 received_output_lines.append( OutputLine.from_msg(msg, self._check_end_position) ) return received_msgs, received_output_lines def _runTest(self) -> None: __tracebackhide__ = True # pylint: disable=unused-variable modules_to_check = [self._test_file.source] self._linter.check(modules_to_check) expected_messages, expected_output = self._get_expected() actual_messages, actual_output = self._get_actual() assert ( expected_messages == actual_messages ), self.error_msg_for_unequal_messages( actual_messages, expected_messages, actual_output ) self._check_output_text(expected_messages, expected_output, actual_output) def error_msg_for_unequal_messages( self, actual_messages: MessageCounter, expected_messages: MessageCounter, actual_output: list[OutputLine], ) -> str: msg = [f'Wrong message(s) raised for "{Path(self._test_file.source).name}":'] missing, unexpected = self.multiset_difference( expected_messages, actual_messages ) if missing: msg.append("\nExpected in testdata:") msg.extend(f" {msg[0]:3}: {msg[1]}" for msg in sorted(missing)) if unexpected: msg.append("\nUnexpected in testdata:") msg.extend(f" {msg[0]:3}: {msg[1]}" for msg in sorted(unexpected)) error_msg = "\n".join(msg) if self._config and self._config.getoption("verbose") > 0: error_msg += "\n\nActual pylint output for this file:\n" error_msg += "\n".join(str(o) for o in actual_output) return error_msg def error_msg_for_unequal_output( self, expected_lines: list[OutputLine], received_lines: list[OutputLine], ) -> str: missing = set(expected_lines) - set(received_lines) unexpected = set(received_lines) - set(expected_lines) error_msg = f'Wrong output for "{Path(self._test_file.expected_output).name}":' sort_by_line_number = operator.attrgetter("lineno") if missing: error_msg += "\n- Missing lines:\n" for line in sorted(missing, key=sort_by_line_number): error_msg += f"{line}\n" if unexpected: error_msg += "\n- Unexpected lines:\n" for line in sorted(unexpected, key=sort_by_line_number): error_msg += f"{line}\n" error_msg += ( "\nYou can update the expected output automatically with:\n'" f"python tests/test_functional.py {UPDATE_OPTION} -k " f'"test_functional[{self._test_file.base}]"\'\n\n' "Here's the update text in case you can't:\n" ) expected_csv = StringIO() writer = csv.writer(expected_csv, dialect="test") for line in sorted(received_lines, key=sort_by_line_number): writer.writerow(line.to_csv()) error_msg += expected_csv.getvalue() return error_msg def _check_output_text( self, _: MessageCounter, expected_output: list[OutputLine], actual_output: list[OutputLine], ) -> None: """This is a function because we want to be able to update the text in LintModuleOutputUpdate. """ assert expected_output == actual_output, self.error_msg_for_unequal_output( expected_output, actual_output )