From 4dfe562b26aa17299fb0151a50c47798dc760228 Mon Sep 17 00:00:00 2001 From: henrikek Date: Tue, 27 Jan 2026 22:39:05 +0100 Subject: [PATCH] XMLSchemaValidator with support for local xsd files --- ESSArch_Core/essxml/util.py | 56 ++++++++++++++----- .../fixity/validation/backends/xml.py | 3 + ESSArch_Core/ip/utils.py | 10 +++- ESSArch_Core/util.py | 12 ++-- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/ESSArch_Core/essxml/util.py b/ESSArch_Core/essxml/util.py index e82bbff11..05f99beea 100644 --- a/ESSArch_Core/essxml/util.py +++ b/ESSArch_Core/essxml/util.py @@ -519,32 +519,58 @@ def parse_file(filepath, fid, relpath=None, algorithm='SHA-256', rootdir='', pro return fileinfo -def download_imported_https_schemas(schema, dst): +def download_imported_schemas(schema_tree, dst, rootdir=None): + """ + Recursively download imported schemas, supports both remote and local. + """ logger = logging.getLogger('essarch') - from ESSArch_Core.ip.utils import download_schema - for url in schema.xpath('//*[local-name()="import"]/@schemaLocation'): - protocol = urlparse(url) - if protocol == 'http': + + for el in schema_tree.xpath('//*[local-name()="import"]'): + location = el.get('schemaLocation') + if not location: + continue + + parsed = urlparse(location) + scheme = parsed.scheme.lower() + + # Remote schema + if scheme in ('http', 'https'): + from ESSArch_Core.ip.utils import download_schema + new_path = download_schema(dst, logger, location) + el.attrib['schemaLocation'] = Path(new_path).as_uri() + continue + + # Absolute local path + if os.path.isabs(location): + el.attrib['schemaLocation'] = Path(location).as_uri() continue - new_path = download_schema(dst, logger, url) - new_path = Path(new_path) - el = url.getparent() - el.attrib['schemaLocation'] = new_path.as_uri() - return schema + # Relative local path → resolve using rootdir + if rootdir: + resolved = os.path.abspath(os.path.join(rootdir, location)) + if os.path.isfile(resolved): + el.attrib['schemaLocation'] = Path(resolved).as_uri() + continue + + # logger.warning('Unresolved schemaLocation: %s', location) + + return schema_tree def validate_against_schema(xmlfile, schema=None, rootdir=None): - doc = etree.ElementTree(file=xmlfile) + """ + Validate an XML file against a schema. Downloads remote schemas if needed. + """ + doc = etree.parse(xmlfile) if schema: - xmlschema = etree.parse(schema) + xmlschema_tree = etree.parse(schema) else: - xmlschema = getSchemas(doc=doc) + xmlschema_tree = getSchemas(doc=doc, rootdir=rootdir) with tempfile.TemporaryDirectory() as tempdir: - xmlschema = download_imported_https_schemas(xmlschema, tempdir) - xmlschema = etree.XMLSchema(xmlschema) + xmlschema_tree = download_imported_schemas(xmlschema_tree, tempdir, rootdir=rootdir) + xmlschema = etree.XMLSchema(xmlschema_tree) xmlschema.assertValid(doc) if rootdir is None: diff --git a/ESSArch_Core/fixity/validation/backends/xml.py b/ESSArch_Core/fixity/validation/backends/xml.py index b9efbf5ff..d4249bb28 100644 --- a/ESSArch_Core/fixity/validation/backends/xml.py +++ b/ESSArch_Core/fixity/validation/backends/xml.py @@ -331,6 +331,9 @@ def validate(self, filepath, expected=None): logger.debug('Validating schema of {xml}'.format(xml=filepath)) rootdir = self.options.get('rootdir') + if not rootdir: + # Use parent directory of the file, or '.' if it's a directory itself + rootdir = str(Path(filepath).parent) etree.clear_error_log() started = timezone.now() relpath = Path(os.path.relpath(filepath, rootdir)).as_posix() diff --git a/ESSArch_Core/ip/utils.py b/ESSArch_Core/ip/utils.py index ff10c9296..5a9733a62 100644 --- a/ESSArch_Core/ip/utils.py +++ b/ESSArch_Core/ip/utils.py @@ -335,6 +335,14 @@ def download_schemas(ip, logger, verify): @retry(retry=retry_if_exception_type(RequestException), reraise=True, stop=stop_after_attempt(5), wait=wait_fixed(60)) def download_schema(dirname, logger, schema, verify=None): + """ + Download a schema from a URL or return local file path. + """ + # If schema is a local file, just return its absolute path + if os.path.isfile(schema): + logger.info('Using local schema file: {}'.format(schema)) + return os.path.abspath(schema) + if verify is None: verify = settings.REQUESTS_VERIFY @@ -344,7 +352,7 @@ def download_schema(dirname, logger, schema, verify=None): r = requests.get(schema, stream=True, verify=verify) r.raise_for_status() with open(dst, 'wb') as f: - for chunk in r: + for chunk in r.iter_content(chunk_size=8192): f.write(chunk) f.flush() # Flush Python buffer os.fsync(f.fileno()) # Flush OS buffer to disk diff --git a/ESSArch_Core/util.py b/ESSArch_Core/util.py index bedde04fc..d3602c8c0 100644 --- a/ESSArch_Core/util.py +++ b/ESSArch_Core/util.py @@ -191,7 +191,7 @@ def get_value_from_path(root, path): return el.text -def getSchemas(doc=None, filename=None, base_url=None, visited=None): +def getSchemas(doc=None, filename=None, base_url=None, visited=None, rootdir=None): """ Creates a schema based on the schemas specified in the provided XML file's schemaLocation attribute @@ -219,7 +219,7 @@ def getSchemas(doc=None, filename=None, base_url=None, visited=None): schema_root = etree.Element(xsd_NS + "schema", nsmap=NSMAP) schema_root.attrib["elementFormDefault"] = "qualified" - def process_schema_location(ns, loc, current_base_url): + def process_schema_location(ns, loc, current_base_url, rootdir=None): # Resolve schemaLocation against current base URL if current_base_url and not (loc.startswith('http://') or loc.startswith('https://') or os.path.isabs(loc)): resolved_loc = os.path.abspath(os.path.join(current_base_url, loc)) @@ -245,6 +245,8 @@ def process_schema_location(ns, loc, current_base_url): imported_doc = etree.parse(response) new_base_url = resolved_loc.rsplit('/', 1)[0] else: + if not os.path.isabs(resolved_loc) and rootdir: + resolved_loc = os.path.abspath(os.path.join(rootdir, resolved_loc)) imported_doc = etree.parse(resolved_loc) new_base_url = os.path.dirname(resolved_loc) except Exception as e: @@ -261,14 +263,14 @@ def process_schema_location(ns, loc, current_base_url): nested_ns = elem.get("namespace") nested_loc = elem.get("schemaLocation") if nested_loc: - process_schema_location(nested_ns, nested_loc, new_base_url) + process_schema_location(nested_ns, nested_loc, new_base_url, rootdir=rootdir) for elem in nested_includes: # usually does NOT have a namespace attribute nested_loc = elem.get("schemaLocation") if nested_loc: # includes are from the same namespace as the including schema - process_schema_location(ns, nested_loc, new_base_url) + process_schema_location(ns, nested_loc, new_base_url, rootdir=rootdir) # Get all xsi:schemaLocation attributes in the original doc schema_locations = set(doc.xpath("//*/@xsi:schemaLocation", namespaces={'xsi': xsi_NS})) @@ -276,7 +278,7 @@ def process_schema_location(ns, loc, current_base_url): for schema_location in schema_locations: ns_locs = schema_location.split() for ns, loc in zip(ns_locs[::2], ns_locs[1::2]): - process_schema_location(ns, loc, base_url) + process_schema_location(ns, loc, base_url, rootdir=rootdir) return schema_root