Automated Metadata Harvesting Workflows

Manual curation of spatial metadata breaks down quickly once a catalogue holds more than a few hundred records. Distributed OGC-compliant endpoints return records in multiple formats, across inconsistent schema versions, with no reliable change notification mechanism — so the only production-viable approach is a programmatic pipeline that discovers, extracts, validates, transforms, and synchronises records on a schedule. This page details the full architecture for such a pipeline, covering CSW (Catalogue Service for the Web) and OGC API – Records endpoints, with concrete Python code, fault-tolerance patterns, and incremental synchronisation strategies.

This guide is part of the Spatial Metadata & Catalog Integration section, which covers the full lifecycle from raw metadata authoring through validated publication.


Pipeline Architecture Overview

The harvesting pipeline follows five deterministic stages. Each stage must be idempotent — safe to re-run after a failure without duplicating or corrupting stored records.

Automated Metadata Harvesting Pipeline A left-to-right flow diagram showing five stages: Discover (GetCapabilities / conformance), Paginate (GetRecords / limit+next), Extract & Sanitise (defusedxml / Pydantic), Validate & Map (XSD / JSON Schema), Synchronise (upsert / dead-letter). Arrows connect each stage in sequence. Discover GetCapabilities /conformance Paginate GetRecords / limit + next Extract & Sanitise defusedxml / Pydantic Validate & Map XSD / JSON Schema Synchronise upsert / dead-letter validation failures → dead-letter queue

Prerequisites & Architecture Context

This guide assumes familiarity with HTTP request/response mechanics and Python 3.10+. You should understand what a CSW GetRecords operation returns before reading the implementation sections — the OGC Standards Architecture & Service Fundamentals overview explains the OGC service contract model, request binding types, and how capabilities documents advertise supported operations.

Dependencies (install into an isolated virtual environment):

pip install requests lxml defusedxml pydantic tenacity

Runtime environment checklist:

  • Python 3.10+ with the packages above
  • Network access to target CSW/OGC API endpoints, including firewall rules for outbound HTTPS and any proxy configuration needed in secure agency environments
  • An endpoint inventory: base URLs, supported protocol versions (CSW 2.0.2 or OGC API – Records 1.0), authentication mechanisms (API keys, OAuth2, mTLS), and declared pagination limits
  • Local copies of XSD/JSON Schema files for your target profiles (ISO 19115-1, ISO 19139, Dublin Core, DCAT) — do not fetch these over the network at validation time
  • Persistent storage (PostgreSQL/PostGIS or Elasticsearch) configured for upsert operations and UUID-based deduplication
  • An observability stack (Prometheus/Grafana, CloudWatch, or OpenTelemetry) ready to receive structured harvest metrics

Specification Deep-Dive

CSW 2.0.2 Core Operations

CSW exposes four operations relevant to harvesting:

Operation HTTP verb Required parameters Returns
GetCapabilities GET SERVICE=CSW&VERSION=2.0.2&REQUEST=GetCapabilities Capabilities XML listing supported schemas, operations, and pagination limits
GetRecords POST (preferred) / GET SERVICE, VERSION, REQUEST, TYPENAMES, OUTPUTSCHEMA, startPosition, maxRecords GetRecordsResponse containing matched/returned counts and metadata elements
GetRecordById GET SERVICE, VERSION, REQUEST, Id, OUTPUTSCHEMA Single metadata record by persistent identifier
Harvest POST SOURCE, RESOURCETYPE, RESOURCEFORMAT Instructs the catalogue to pull a remote record (server-side harvest)

TYPENAMES specifies the record type — use csw:Record for Dublin Core or gmd:MD_Metadata for ISO 19139. OUTPUTSCHEMA controls the serialisation — http://www.isotc211.org/2005/gmd for ISO 19139 or http://www.opengis.net/cat/csw/2.0.2 for CSW’s native Dublin Core flavour.

OGC API – Records (Part 1: Core)

OGC API – Records replaces the XML-over-HTTP binding with a RESTful JSON API. The key endpoints for harvesting:

Endpoint Method Purpose
/conformance GET Lists conformance classes the server implements
/collections GET Lists available record collections
/collections/{collectionId}/items GET Paginated record list; supports limit, offset, datetime, bbox
/collections/{collectionId}/items/{recordId} GET Single record by ID

Pagination uses next link relations in the response’s links array. Unlike CSW’s startPosition integer, OGC API – Records cursors may be opaque tokens — do not assume they are numeric offsets.

Version Divergence Reference

Concern CSW 2.0.2 OGC API – Records 1.0
Transport encoding KVP GET or XML POST REST + JSON (GeoJSON conformance available)
Pagination mechanism startPosition + maxRecords integers limit + next link relation (token or offset)
Metadata format negotiation OUTPUTSCHEMA parameter Accept header or f=json / f=xml query param
Change notification None (polling only) None currently; STAC-style feeds via extensions
Auth support Basic / IP allowlist typical Bearer token, API key, OAuth2

Python Implementation

The implementation below is structured as a class-based harvester. It handles both CSW 2.0.2 and OGC API – Records endpoints, with retry logic, cursor tracking, and dead-letter routing built in.

"""
harvest.py — Production CSW / OGC API – Records metadata harvester.
Requires: requests, lxml, defusedxml, pydantic, tenacity (Python 3.10+)
"""

from __future__ import annotations

import hashlib
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Generator, Optional

import defusedxml.ElementTree as ET
import requests
from lxml import etree
from pydantic import BaseModel, field_validator
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential_jitter,
)

logger = logging.getLogger(__name__)

# --- Namespace map for ISO 19139 / CSW responses ---
NS = {
    "csw": "http://www.opengis.net/cat/csw/2.0.2",
    "gmd": "http://www.isotc211.org/2005/gmd",
    "gco": "http://www.isotc211.org/2005/gco",
    "ows": "http://www.opengis.net/ows",
}


# ── Pydantic model for lightweight pre-validation ──────────────────────────────

class RawRecord(BaseModel):
    identifier: str
    title: str
    modified: Optional[str] = None
    abstract: Optional[str] = None
    bbox_wgs84: Optional[list[float]] = None   # [minX, minY, maxX, maxY]
    raw_xml: Optional[str] = None
    raw_json: Optional[dict] = None
    source_url: str
    harvest_ts: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())

    @field_validator("bbox_wgs84")
    @classmethod
    def bbox_must_be_four_floats(cls, v: list[float] | None) -> list[float] | None:
        if v is not None and len(v) != 4:
            raise ValueError("bbox_wgs84 must contain exactly 4 values")
        return v


# ── HTTP session with retry ────────────────────────────────────────────────────

@retry(
    retry=retry_if_exception_type(requests.HTTPError),
    stop=stop_after_attempt(4),
    wait=wait_exponential_jitter(initial=2, max=30),
    reraise=True,
)
def _get(session: requests.Session, url: str, **kwargs) -> requests.Response:
    resp = session.get(url, timeout=30, **kwargs)
    if resp.status_code == 429:
        # Respect Retry-After before tenacity's own back-off fires
        retry_after = int(resp.headers.get("Retry-After", 10))
        logger.warning("Rate-limited by %s; sleeping %ds", url, retry_after)
        import time; time.sleep(retry_after)
    resp.raise_for_status()
    return resp


# ── CSW 2.0.2 harvester ───────────────────────────────────────────────────────

@dataclass
class CSWHarvester:
    endpoint: str
    page_size: int = 100
    output_schema: str = "http://www.isotc211.org/2005/gmd"
    session: requests.Session = field(default_factory=requests.Session)
    # Checkpoint: last successful harvest ISO 8601 timestamp (None = full harvest)
    since: Optional[str] = None

    def capabilities(self) -> etree._Element:
        """Parse GetCapabilities and return the root element."""
        params = {
            "SERVICE": "CSW",
            "VERSION": "2.0.2",
            "REQUEST": "GetCapabilities",
        }
        resp = _get(self.session, self.endpoint, params=params)
        return ET.fromstring(resp.content)

    def _build_post_body(self, start_position: int) -> bytes:
        """Construct a GetRecords POST body with optional temporal filter."""
        constraint = ""
        if self.since:
            # OGC Filter 1.1 temporal constraint on apiso:Modified
            constraint = f"""
            <csw:Constraint version="1.1.0">
              <ogc:Filter xmlns:ogc="http://www.opengis.net/ogc">
                <ogc:PropertyIsGreaterThanOrEqualTo>
                  <ogc:PropertyName>apiso:Modified</ogc:PropertyName>
                  <ogc:Literal>{self.since}</ogc:Literal>
                </ogc:PropertyIsGreaterThanOrEqualTo>
              </ogc:Filter>
            </csw:Constraint>"""

        return f"""<?xml version="1.0" encoding="UTF-8"?>
<csw:GetRecords
    xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
    service="CSW" version="2.0.2"
    resultType="results"
    startPosition="{start_position}"
    maxRecords="{self.page_size}"
    outputSchema="{self.output_schema}">
  <csw:Query typeNames="gmd:MD_Metadata">
    <csw:ElementSetName>full</csw:ElementSetName>
    {constraint}
  </csw:Query>
</csw:GetRecords>""".encode()

    def harvest(self) -> Generator[RawRecord, None, None]:
        """Yield RawRecord objects page by page until the catalogue is exhausted."""
        start = 1
        matched: Optional[int] = None

        while True:
            body = self._build_post_body(start)
            resp = self.session.post(
                self.endpoint,
                data=body,
                headers={"Content-Type": "application/xml"},
                timeout=60,
            )
            resp.raise_for_status()

            # Use defusedxml for safe parsing of untrusted catalogue responses
            root = ET.fromstring(resp.content)

            search_results = root.find("csw:SearchResults", NS)
            if search_results is None:
                logger.error("No SearchResults element in response from %s", self.endpoint)
                break

            if matched is None:
                matched = int(search_results.attrib.get("numberOfRecordsMatched", 0))
                logger.info("Catalogue reports %d matched records", matched)

            returned = int(search_results.attrib.get("numberOfRecordsReturned", 0))
            if returned == 0:
                break  # exhausted; some servers return 0 before matched reaches 0

            for md_elem in search_results.findall("gmd:MD_Metadata", NS):
                record = _extract_iso19139_record(md_elem, self.endpoint)
                if record:
                    yield record

            start += returned
            if matched and start > matched:
                break


def _extract_iso19139_record(elem: etree._Element, source_url: str) -> Optional[RawRecord]:
    """Extract core fields from a gmd:MD_Metadata element."""
    def text(path: str) -> Optional[str]:
        node = elem.find(path, NS)
        return node.text.strip() if node is not None and node.text else None

    identifier = text("gmd:fileIdentifier/gco:CharacterString")
    title = text(
        "gmd:identificationInfo/gmd:MD_DataIdentification"
        "/gmd:citation/gmd:CI_Citation/gmd:title/gco:CharacterString"
    )
    if not identifier or not title:
        logger.debug("Skipping record missing fileIdentifier or title")
        return None

    raw_xml = ET.tostring(elem, encoding="unicode")
    return RawRecord(
        identifier=identifier,
        title=title,
        modified=text("gmd:dateStamp/gco:DateTime") or text("gmd:dateStamp/gco:Date"),
        abstract=text(
            "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:abstract/gco:CharacterString"
        ),
        raw_xml=raw_xml,
        source_url=source_url,
    )


# ── OGC API – Records harvester ───────────────────────────────────────────────

@dataclass
class OGCAPIRecordsHarvester:
    base_url: str          # e.g. https://example.com/ogcapi
    collection_id: str
    page_size: int = 100
    session: requests.Session = field(default_factory=requests.Session)
    since: Optional[str] = None  # ISO 8601 datetime for incremental harvests

    def harvest(self) -> Generator[RawRecord, None, None]:
        """Yield RawRecord objects by following next link relations."""
        params: dict = {"limit": self.page_size, "f": "json"}
        if self.since:
            # datetime filter: open-ended range from since to now
            params["datetime"] = f"{self.since}/.."

        url = f"{self.base_url}/collections/{self.collection_id}/items"

        while url:
            resp = _get(self.session, url, params=params)
            payload = resp.json()
            params = {}  # subsequent pages use the full next URL; clear params

            for feature in payload.get("features", []):
                record = _extract_ogcapi_record(feature, self.base_url)
                if record:
                    yield record

            # Follow next link; absent means we've reached the end
            url = next(
                (lnk["href"] for lnk in payload.get("links", []) if lnk.get("rel") == "next"),
                None,
            )


def _extract_ogcapi_record(feature: dict, source_url: str) -> Optional[RawRecord]:
    """Extract core fields from an OGC API – Records GeoJSON feature."""
    props = feature.get("properties", {})
    identifier = feature.get("id") or props.get("identifier")
    title = props.get("title")
    if not identifier or not title:
        return None

    bbox_raw = feature.get("bbox")
    return RawRecord(
        identifier=str(identifier),
        title=str(title),
        modified=props.get("updated") or props.get("datetime"),
        abstract=props.get("description"),
        bbox_wgs84=bbox_raw if bbox_raw and len(bbox_raw) >= 4 else None,
        raw_json=feature,
        source_url=source_url,
    )


# ── Content-hash deduplication ────────────────────────────────────────────────

def record_fingerprint(record: RawRecord) -> str:
    """SHA-256 of the record's raw content, for deduplication."""
    content = record.raw_xml or json.dumps(record.raw_json, sort_keys=True)
    return hashlib.sha256(content.encode()).hexdigest()

Persisting to PostgreSQL with Upsert

import psycopg2

UPSERT_SQL = """
INSERT INTO harvested_records
    (identifier, source_url, title, abstract, modified, bbox_wgs84, fingerprint, raw_payload, harvest_ts)
VALUES
    (%(identifier)s, %(source_url)s, %(title)s, %(abstract)s, %(modified)s,
     ST_MakeEnvelope(%(min_x)s, %(min_y)s, %(max_x)s, %(max_y)s, 4326),
     %(fingerprint)s, %(raw_payload)s, %(harvest_ts)s)
ON CONFLICT (identifier, source_url)
DO UPDATE SET
    title        = EXCLUDED.title,
    abstract     = EXCLUDED.abstract,
    modified     = EXCLUDED.modified,
    bbox_wgs84   = EXCLUDED.bbox_wgs84,
    fingerprint  = EXCLUDED.fingerprint,
    raw_payload  = EXCLUDED.raw_payload,
    harvest_ts   = EXCLUDED.harvest_ts
WHERE harvested_records.fingerprint != EXCLUDED.fingerprint;
"""

def persist(conn: psycopg2.extensions.connection, record: RawRecord) -> None:
    fp = record_fingerprint(record)
    bbox = record.bbox_wgs84 or [None, None, None, None]
    raw = record.raw_xml or json.dumps(record.raw_json)
    with conn.cursor() as cur:
        cur.execute(UPSERT_SQL, {
            "identifier": record.identifier,
            "source_url": record.source_url,
            "title": record.title,
            "abstract": record.abstract,
            "modified": record.modified,
            "min_x": bbox[0], "min_y": bbox[1], "max_x": bbox[2], "max_y": bbox[3],
            "fingerprint": fp,
            "raw_payload": raw,
            "harvest_ts": record.harvest_ts,
        })
    conn.commit()

Error Handling & Edge Cases

XXE injection in legacy CSW responses. Some older catalogues embed external entity references in their XML. Always parse with defusedxml — never with lxml.etree.fromstring directly on untrusted input.

Namespace fragmentation. ISO 19139 documents occasionally use unprefixed or custom-prefixed namespaces. If elem.find("gmd:MD_Metadata", NS) returns None, inspect the raw bytes: print(resp.content[:2000]). Use etree.QName to strip prefixes before matching when namespace maps are inconsistent.

numberOfRecordsMatched drift. Catalogues being updated concurrently may return a different numberOfRecordsMatched on each page. Drive the loop on actual returned-records-per-page, not on the declared total. Break when returned == 0.

OGC API – Records opaque cursors. The next link href value may include a server-generated token rather than a simple offset integer. Never reconstruct the next URL manually from the current page number — always follow the href verbatim from the links array.

Empty result sets vs. service exceptions. A CSW server that cannot complete a query may return an ows:ExceptionReport element with HTTP 200 rather than a 4xx/5xx code. After parsing, always check whether the root element tag contains ExceptionReport before treating the response as valid records.

Character encoding mismatches. Declare UTF-8 in the XML declaration; fall back to ISO-8859-1 if resp.encoding says latin-1. Use resp.content (bytes) rather than resp.text when passing to XML parsers, so you control decoding explicitly.


Schema Validation & Profile Mapping

Raw harvested records must clear at least two validation layers before being promoted to the search index. The approach aligns with the patterns described in Schema Validation for Spatial Records, which covers XSD-based validation in detail.

from lxml import etree

def validate_iso19139(raw_xml: str, xsd_path: str) -> list[str]:
    """
    Validate a raw ISO 19139 XML string against a local XSD.
    Returns a list of error messages; empty list means valid.
    """
    with open(xsd_path, "rb") as f:
        schema_doc = etree.parse(f)
    schema = etree.XMLSchema(schema_doc)
    try:
        doc = etree.fromstring(raw_xml.encode())
    except etree.XMLSyntaxError as exc:
        return [f"XML parse error: {exc}"]
    schema.validate(doc)
    return [str(e) for e in schema.error_log]

For mapping to ISO 19115 metadata standards, enforce these mandatory elements at minimum before writing to the index:

ISO 19115 element ISO 19139 XPath Mandatory?
title gmd:identificationInfo/…/gmd:title/gco:CharacterString Yes
abstract gmd:identificationInfo/…/gmd:abstract/gco:CharacterString Yes
fileIdentifier gmd:fileIdentifier/gco:CharacterString Yes
dateStamp gmd:dateStamp/gco:DateTime Yes
language gmd:language/gmd:LanguageCode Conditional
spatialRepresentationType gmd:identificationInfo/…/gmd:spatialRepresentationType Conditional
extent/geographicBoundingBox gmd:identificationInfo/…/gmd:extent/gmd:EX_Extent/… Strongly recommended

Records failing mandatory element checks go to a dead-letter table. Do not discard them — they may be valid but use a non-standard element path resolvable by manual review.


Testing & Compliance Verification

import unittest
from unittest.mock import MagicMock, patch

class TestCSWHarvester(unittest.TestCase):

    def _make_session(self, xml_body: bytes) -> MagicMock:
        session = MagicMock()
        response = MagicMock()
        response.content = xml_body
        response.raise_for_status.return_value = None
        session.post.return_value = response
        return session

    MINIMAL_CSW_RESPONSE = b"""<?xml version="1.0"?>
    <csw:GetRecordsResponse xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
                            xmlns:gmd="http://www.isotc211.org/2005/gmd"
                            xmlns:gco="http://www.isotc211.org/2005/gco">
      <csw:SearchResults numberOfRecordsMatched="1" numberOfRecordsReturned="1" nextRecord="0">
        <gmd:MD_Metadata>
          <gmd:fileIdentifier><gco:CharacterString>test-001</gco:CharacterString></gmd:fileIdentifier>
          <gmd:dateStamp><gco:Date>2024-01-15</gco:Date></gmd:dateStamp>
          <gmd:identificationInfo>
            <gmd:MD_DataIdentification>
              <gmd:citation>
                <gmd:CI_Citation>
                  <gmd:title><gco:CharacterString>Test Dataset</gco:CharacterString></gmd:title>
                </gmd:CI_Citation>
              </gmd:citation>
              <gmd:abstract><gco:CharacterString>A test record.</gco:CharacterString></gmd:abstract>
            </gmd:MD_DataIdentification>
          </gmd:identificationInfo>
        </gmd:MD_Metadata>
      </csw:SearchResults>
    </csw:GetRecordsResponse>"""

    def test_single_record_harvest(self):
        harvester = CSWHarvester(
            endpoint="https://example.com/csw",
            session=self._make_session(self.MINIMAL_CSW_RESPONSE),
        )
        records = list(harvester.harvest())
        self.assertEqual(len(records), 1)
        self.assertEqual(records[0].identifier, "test-001")
        self.assertEqual(records[0].title, "Test Dataset")

    def test_missing_identifier_skipped(self):
        broken = self.MINIMAL_CSW_RESPONSE.replace(
            b"<gco:CharacterString>test-001</gco:CharacterString>", b""
        )
        harvester = CSWHarvester(
            endpoint="https://example.com/csw",
            session=self._make_session(broken),
        )
        records = list(harvester.harvest())
        self.assertEqual(len(records), 0)

    def test_fingerprint_deterministic(self):
        record = RawRecord(
            identifier="x", title="T", source_url="https://example.com", raw_xml="<a/>"
        )
        self.assertEqual(record_fingerprint(record), record_fingerprint(record))

Performance & Scaling Notes

Page size tuning. Start with maxRecords=100. Some CSW servers cap responses at 50 or 200 records regardless of what you request — read the value from GetCapabilities under ows:Constraint name="MaxRecordDefault" before issuing paginated requests.

Connection pooling. Reuse a single requests.Session across all pages for a given endpoint. Sessions maintain HTTP keep-alive connections, reducing TCP handshake overhead on large harvests. Create a new session per endpoint rather than per page.

Parallelising across endpoints. Use concurrent.futures.ThreadPoolExecutor to harvest multiple independent endpoints in parallel. Each harvester instance should own its own session — do not share sessions across threads.

Incremental harvest scheduling. Run incremental harvests (filter by modified >= last_checkpoint) daily. Run full re-harvests weekly or after catalogue migrations, because incremental syncs cannot detect deleted records or identifier changes.

Caching capabilities responses. Store the parsed capabilities document in a local file or Redis key with a 24-hour TTL. Querying GetCapabilities on every run adds latency and may count against rate limits on public catalogues.

Elasticsearch bulk indexing. If downstream storage is Elasticsearch, batch harvested records into _bulk API requests of 500–1000 documents rather than indexing one at a time. Normalise spatial extents to WGS84 before indexing, and store as geo_shape to support bounding-box queries. For DCAT-AP publication from the index, see DCAT-AP for Spatial Data Portals.


Gotchas & Frequently Asked Questions

How do I handle CSW endpoints that return different record counts than numberOfRecordsMatched?

Always drive pagination from the actual records returned per response rather than trusting numberOfRecordsMatched. Some CSW implementations return stale counts when the catalogue is being updated concurrently. Terminate the loop when the response body contains zero records or when startPosition exceeds numberOfRecordsMatched by more than one page.

Should I run full re-harvests or incremental harvests in production?

Run incremental harvests daily using the modified timestamp filter, and schedule a full re-harvest weekly or after major catalogue migrations. Full re-harvests catch deletions and identifier changes that incremental syncs miss, but are too resource-intensive to run every cycle. Always store a checkpoint timestamp after each successful incremental run.

What is the safest deduplication key for records harvested from multiple source catalogues?

Prefer the source’s declared persistent identifier — fileIdentifier in ISO 19139, or id in OGC API – Records. When that is absent or inconsistent across mirrors, fall back to a SHA-256 hash of the canonicalised, namespace-normalised XML or JSON body. Never deduplicate on title alone — duplicate titles are common across jurisdictions and thematic datasets.

How do I prevent a single broken endpoint from stalling the entire harvest run?

Implement per-endpoint circuit breakers via tenacity. After three consecutive 5xx responses, mark the endpoint as OPEN and skip it for a configurable back-off window (e.g. 30 minutes). Route all endpoint state changes to a structured log so on-call engineers can distinguish transient outages from persistent misconfiguration. Harvest other endpoints normally while the broken one is paused.

How do I map harvested records to INSPIRE spatial data service classifications?

Extract the srv:serviceType and srv:serviceTypeVersion elements from the ISO 19139 service metadata block. Map these against the INSPIRE spatial data service type code list. Reject or quarantine records where serviceType is absent, and log them to the dead-letter queue for manual classification. The OGC Standards Architecture & Service Fundamentals overview lists the canonical service type identifiers used across OGC service families.


Back to Spatial Metadata & Catalog Integration