Manual curation of spatial metadata breaks down quickly once a catalogue holds more than a few hundred records. Distributed OGC-compliant endpoints return records in multiple formats, across inconsistent schema versions, with no reliable change notification mechanism — so the only production-viable approach is a programmatic pipeline that discovers, extracts, validates, transforms, and synchronises records on a schedule. This page details the full architecture for such a pipeline, covering CSW (Catalogue Service for the Web) and OGC API – Records endpoints, with concrete Python code, fault-tolerance patterns, and incremental synchronisation strategies.
This guide is part of the Spatial Metadata & Catalog Integration section, which covers the full lifecycle from raw metadata authoring through validated publication.
The harvesting pipeline follows five deterministic stages. Each stage must be idempotent — safe to re-run after a failure without duplicating or corrupting stored records.
This guide assumes familiarity with HTTP request/response mechanics and Python 3.10+. You should understand what a CSW GetRecords operation returns before reading the implementation sections — the OGC Standards Architecture & Service Fundamentals overview explains the OGC service contract model, request binding types, and how capabilities documents advertise supported operations.
Dependencies (install into an isolated virtual environment):
pip install requests lxml defusedxml pydantic tenacity
Runtime environment checklist:
CSW exposes four operations relevant to harvesting:
| Operation | HTTP verb | Required parameters | Returns |
|---|---|---|---|
GetCapabilities |
GET | SERVICE=CSW&VERSION=2.0.2&REQUEST=GetCapabilities |
Capabilities XML listing supported schemas, operations, and pagination limits |
GetRecords |
POST (preferred) / GET | SERVICE, VERSION, REQUEST, TYPENAMES, OUTPUTSCHEMA, startPosition, maxRecords |
GetRecordsResponse containing matched/returned counts and metadata elements |
GetRecordById |
GET | SERVICE, VERSION, REQUEST, Id, OUTPUTSCHEMA |
Single metadata record by persistent identifier |
Harvest |
POST | SOURCE, RESOURCETYPE, RESOURCEFORMAT |
Instructs the catalogue to pull a remote record (server-side harvest) |
TYPENAMES specifies the record type — use csw:Record for Dublin Core or gmd:MD_Metadata for ISO 19139. OUTPUTSCHEMA controls the serialisation — http://www.isotc211.org/2005/gmd for ISO 19139 or http://www.opengis.net/cat/csw/2.0.2 for CSW’s native Dublin Core flavour.
OGC API – Records replaces the XML-over-HTTP binding with a RESTful JSON API. The key endpoints for harvesting:
| Endpoint | Method | Purpose |
|---|---|---|
/conformance |
GET | Lists conformance classes the server implements |
/collections |
GET | Lists available record collections |
/collections/{collectionId}/items |
GET | Paginated record list; supports limit, offset, datetime, bbox |
/collections/{collectionId}/items/{recordId} |
GET | Single record by ID |
Pagination uses next link relations in the response’s links array. Unlike CSW’s startPosition integer, OGC API – Records cursors may be opaque tokens — do not assume they are numeric offsets.
| Concern | CSW 2.0.2 | OGC API – Records 1.0 |
|---|---|---|
| Transport encoding | KVP GET or XML POST | REST + JSON (GeoJSON conformance available) |
| Pagination mechanism | startPosition + maxRecords integers |
limit + next link relation (token or offset) |
| Metadata format negotiation | OUTPUTSCHEMA parameter |
Accept header or f=json / f=xml query param |
| Change notification | None (polling only) | None currently; STAC-style feeds via extensions |
| Auth support | Basic / IP allowlist typical | Bearer token, API key, OAuth2 |
The implementation below is structured as a class-based harvester. It handles both CSW 2.0.2 and OGC API – Records endpoints, with retry logic, cursor tracking, and dead-letter routing built in.
"""
harvest.py — Production CSW / OGC API – Records metadata harvester.
Requires: requests, lxml, defusedxml, pydantic, tenacity (Python 3.10+)
"""
from __future__ import annotations
import hashlib
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Generator, Optional
import defusedxml.ElementTree as ET
import requests
from lxml import etree
from pydantic import BaseModel, field_validator
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
logger = logging.getLogger(__name__)
# --- Namespace map for ISO 19139 / CSW responses ---
NS = {
"csw": "http://www.opengis.net/cat/csw/2.0.2",
"gmd": "http://www.isotc211.org/2005/gmd",
"gco": "http://www.isotc211.org/2005/gco",
"ows": "http://www.opengis.net/ows",
}
# ── Pydantic model for lightweight pre-validation ──────────────────────────────
class RawRecord(BaseModel):
identifier: str
title: str
modified: Optional[str] = None
abstract: Optional[str] = None
bbox_wgs84: Optional[list[float]] = None # [minX, minY, maxX, maxY]
raw_xml: Optional[str] = None
raw_json: Optional[dict] = None
source_url: str
harvest_ts: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
@field_validator("bbox_wgs84")
@classmethod
def bbox_must_be_four_floats(cls, v: list[float] | None) -> list[float] | None:
if v is not None and len(v) != 4:
raise ValueError("bbox_wgs84 must contain exactly 4 values")
return v
# ── HTTP session with retry ────────────────────────────────────────────────────
@retry(
retry=retry_if_exception_type(requests.HTTPError),
stop=stop_after_attempt(4),
wait=wait_exponential_jitter(initial=2, max=30),
reraise=True,
)
def _get(session: requests.Session, url: str, **kwargs) -> requests.Response:
resp = session.get(url, timeout=30, **kwargs)
if resp.status_code == 429:
# Respect Retry-After before tenacity's own back-off fires
retry_after = int(resp.headers.get("Retry-After", 10))
logger.warning("Rate-limited by %s; sleeping %ds", url, retry_after)
import time; time.sleep(retry_after)
resp.raise_for_status()
return resp
# ── CSW 2.0.2 harvester ───────────────────────────────────────────────────────
@dataclass
class CSWHarvester:
endpoint: str
page_size: int = 100
output_schema: str = "http://www.isotc211.org/2005/gmd"
session: requests.Session = field(default_factory=requests.Session)
# Checkpoint: last successful harvest ISO 8601 timestamp (None = full harvest)
since: Optional[str] = None
def capabilities(self) -> etree._Element:
"""Parse GetCapabilities and return the root element."""
params = {
"SERVICE": "CSW",
"VERSION": "2.0.2",
"REQUEST": "GetCapabilities",
}
resp = _get(self.session, self.endpoint, params=params)
return ET.fromstring(resp.content)
def _build_post_body(self, start_position: int) -> bytes:
"""Construct a GetRecords POST body with optional temporal filter."""
constraint = ""
if self.since:
# OGC Filter 1.1 temporal constraint on apiso:Modified
constraint = f"""
<csw:Constraint version="1.1.0">
<ogc:Filter xmlns:ogc="http://www.opengis.net/ogc">
<ogc:PropertyIsGreaterThanOrEqualTo>
<ogc:PropertyName>apiso:Modified</ogc:PropertyName>
<ogc:Literal>{self.since}</ogc:Literal>
</ogc:PropertyIsGreaterThanOrEqualTo>
</ogc:Filter>
</csw:Constraint>"""
return f"""<?xml version="1.0" encoding="UTF-8"?>
<csw:GetRecords
xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
service="CSW" version="2.0.2"
resultType="results"
startPosition="{start_position}"
maxRecords="{self.page_size}"
outputSchema="{self.output_schema}">
<csw:Query typeNames="gmd:MD_Metadata">
<csw:ElementSetName>full</csw:ElementSetName>
{constraint}
</csw:Query>
</csw:GetRecords>""".encode()
def harvest(self) -> Generator[RawRecord, None, None]:
"""Yield RawRecord objects page by page until the catalogue is exhausted."""
start = 1
matched: Optional[int] = None
while True:
body = self._build_post_body(start)
resp = self.session.post(
self.endpoint,
data=body,
headers={"Content-Type": "application/xml"},
timeout=60,
)
resp.raise_for_status()
# Use defusedxml for safe parsing of untrusted catalogue responses
root = ET.fromstring(resp.content)
search_results = root.find("csw:SearchResults", NS)
if search_results is None:
logger.error("No SearchResults element in response from %s", self.endpoint)
break
if matched is None:
matched = int(search_results.attrib.get("numberOfRecordsMatched", 0))
logger.info("Catalogue reports %d matched records", matched)
returned = int(search_results.attrib.get("numberOfRecordsReturned", 0))
if returned == 0:
break # exhausted; some servers return 0 before matched reaches 0
for md_elem in search_results.findall("gmd:MD_Metadata", NS):
record = _extract_iso19139_record(md_elem, self.endpoint)
if record:
yield record
start += returned
if matched and start > matched:
break
def _extract_iso19139_record(elem: etree._Element, source_url: str) -> Optional[RawRecord]:
"""Extract core fields from a gmd:MD_Metadata element."""
def text(path: str) -> Optional[str]:
node = elem.find(path, NS)
return node.text.strip() if node is not None and node.text else None
identifier = text("gmd:fileIdentifier/gco:CharacterString")
title = text(
"gmd:identificationInfo/gmd:MD_DataIdentification"
"/gmd:citation/gmd:CI_Citation/gmd:title/gco:CharacterString"
)
if not identifier or not title:
logger.debug("Skipping record missing fileIdentifier or title")
return None
raw_xml = ET.tostring(elem, encoding="unicode")
return RawRecord(
identifier=identifier,
title=title,
modified=text("gmd:dateStamp/gco:DateTime") or text("gmd:dateStamp/gco:Date"),
abstract=text(
"gmd:identificationInfo/gmd:MD_DataIdentification/gmd:abstract/gco:CharacterString"
),
raw_xml=raw_xml,
source_url=source_url,
)
# ── OGC API – Records harvester ───────────────────────────────────────────────
@dataclass
class OGCAPIRecordsHarvester:
base_url: str # e.g. https://example.com/ogcapi
collection_id: str
page_size: int = 100
session: requests.Session = field(default_factory=requests.Session)
since: Optional[str] = None # ISO 8601 datetime for incremental harvests
def harvest(self) -> Generator[RawRecord, None, None]:
"""Yield RawRecord objects by following next link relations."""
params: dict = {"limit": self.page_size, "f": "json"}
if self.since:
# datetime filter: open-ended range from since to now
params["datetime"] = f"{self.since}/.."
url = f"{self.base_url}/collections/{self.collection_id}/items"
while url:
resp = _get(self.session, url, params=params)
payload = resp.json()
params = {} # subsequent pages use the full next URL; clear params
for feature in payload.get("features", []):
record = _extract_ogcapi_record(feature, self.base_url)
if record:
yield record
# Follow next link; absent means we've reached the end
url = next(
(lnk["href"] for lnk in payload.get("links", []) if lnk.get("rel") == "next"),
None,
)
def _extract_ogcapi_record(feature: dict, source_url: str) -> Optional[RawRecord]:
"""Extract core fields from an OGC API – Records GeoJSON feature."""
props = feature.get("properties", {})
identifier = feature.get("id") or props.get("identifier")
title = props.get("title")
if not identifier or not title:
return None
bbox_raw = feature.get("bbox")
return RawRecord(
identifier=str(identifier),
title=str(title),
modified=props.get("updated") or props.get("datetime"),
abstract=props.get("description"),
bbox_wgs84=bbox_raw if bbox_raw and len(bbox_raw) >= 4 else None,
raw_json=feature,
source_url=source_url,
)
# ── Content-hash deduplication ────────────────────────────────────────────────
def record_fingerprint(record: RawRecord) -> str:
"""SHA-256 of the record's raw content, for deduplication."""
content = record.raw_xml or json.dumps(record.raw_json, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
import psycopg2
UPSERT_SQL = """
INSERT INTO harvested_records
(identifier, source_url, title, abstract, modified, bbox_wgs84, fingerprint, raw_payload, harvest_ts)
VALUES
(%(identifier)s, %(source_url)s, %(title)s, %(abstract)s, %(modified)s,
ST_MakeEnvelope(%(min_x)s, %(min_y)s, %(max_x)s, %(max_y)s, 4326),
%(fingerprint)s, %(raw_payload)s, %(harvest_ts)s)
ON CONFLICT (identifier, source_url)
DO UPDATE SET
title = EXCLUDED.title,
abstract = EXCLUDED.abstract,
modified = EXCLUDED.modified,
bbox_wgs84 = EXCLUDED.bbox_wgs84,
fingerprint = EXCLUDED.fingerprint,
raw_payload = EXCLUDED.raw_payload,
harvest_ts = EXCLUDED.harvest_ts
WHERE harvested_records.fingerprint != EXCLUDED.fingerprint;
"""
def persist(conn: psycopg2.extensions.connection, record: RawRecord) -> None:
fp = record_fingerprint(record)
bbox = record.bbox_wgs84 or [None, None, None, None]
raw = record.raw_xml or json.dumps(record.raw_json)
with conn.cursor() as cur:
cur.execute(UPSERT_SQL, {
"identifier": record.identifier,
"source_url": record.source_url,
"title": record.title,
"abstract": record.abstract,
"modified": record.modified,
"min_x": bbox[0], "min_y": bbox[1], "max_x": bbox[2], "max_y": bbox[3],
"fingerprint": fp,
"raw_payload": raw,
"harvest_ts": record.harvest_ts,
})
conn.commit()
XXE injection in legacy CSW responses. Some older catalogues embed external entity references in their XML. Always parse with defusedxml — never with lxml.etree.fromstring directly on untrusted input.
Namespace fragmentation. ISO 19139 documents occasionally use unprefixed or custom-prefixed namespaces. If elem.find("gmd:MD_Metadata", NS) returns None, inspect the raw bytes: print(resp.content[:2000]). Use etree.QName to strip prefixes before matching when namespace maps are inconsistent.
numberOfRecordsMatched drift. Catalogues being updated concurrently may return a different numberOfRecordsMatched on each page. Drive the loop on actual returned-records-per-page, not on the declared total. Break when returned == 0.
OGC API – Records opaque cursors. The next link href value may include a server-generated token rather than a simple offset integer. Never reconstruct the next URL manually from the current page number — always follow the href verbatim from the links array.
Empty result sets vs. service exceptions. A CSW server that cannot complete a query may return an ows:ExceptionReport element with HTTP 200 rather than a 4xx/5xx code. After parsing, always check whether the root element tag contains ExceptionReport before treating the response as valid records.
Character encoding mismatches. Declare UTF-8 in the XML declaration; fall back to ISO-8859-1 if resp.encoding says latin-1. Use resp.content (bytes) rather than resp.text when passing to XML parsers, so you control decoding explicitly.
Raw harvested records must clear at least two validation layers before being promoted to the search index. The approach aligns with the patterns described in Schema Validation for Spatial Records, which covers XSD-based validation in detail.
from lxml import etree
def validate_iso19139(raw_xml: str, xsd_path: str) -> list[str]:
"""
Validate a raw ISO 19139 XML string against a local XSD.
Returns a list of error messages; empty list means valid.
"""
with open(xsd_path, "rb") as f:
schema_doc = etree.parse(f)
schema = etree.XMLSchema(schema_doc)
try:
doc = etree.fromstring(raw_xml.encode())
except etree.XMLSyntaxError as exc:
return [f"XML parse error: {exc}"]
schema.validate(doc)
return [str(e) for e in schema.error_log]
For mapping to ISO 19115 metadata standards, enforce these mandatory elements at minimum before writing to the index:
| ISO 19115 element | ISO 19139 XPath | Mandatory? |
|---|---|---|
title |
gmd:identificationInfo/…/gmd:title/gco:CharacterString |
Yes |
abstract |
gmd:identificationInfo/…/gmd:abstract/gco:CharacterString |
Yes |
fileIdentifier |
gmd:fileIdentifier/gco:CharacterString |
Yes |
dateStamp |
gmd:dateStamp/gco:DateTime |
Yes |
language |
gmd:language/gmd:LanguageCode |
Conditional |
spatialRepresentationType |
gmd:identificationInfo/…/gmd:spatialRepresentationType |
Conditional |
extent/geographicBoundingBox |
gmd:identificationInfo/…/gmd:extent/gmd:EX_Extent/… |
Strongly recommended |
Records failing mandatory element checks go to a dead-letter table. Do not discard them — they may be valid but use a non-standard element path resolvable by manual review.
import unittest
from unittest.mock import MagicMock, patch
class TestCSWHarvester(unittest.TestCase):
def _make_session(self, xml_body: bytes) -> MagicMock:
session = MagicMock()
response = MagicMock()
response.content = xml_body
response.raise_for_status.return_value = None
session.post.return_value = response
return session
MINIMAL_CSW_RESPONSE = b"""<?xml version="1.0"?>
<csw:GetRecordsResponse xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
xmlns:gmd="http://www.isotc211.org/2005/gmd"
xmlns:gco="http://www.isotc211.org/2005/gco">
<csw:SearchResults numberOfRecordsMatched="1" numberOfRecordsReturned="1" nextRecord="0">
<gmd:MD_Metadata>
<gmd:fileIdentifier><gco:CharacterString>test-001</gco:CharacterString></gmd:fileIdentifier>
<gmd:dateStamp><gco:Date>2024-01-15</gco:Date></gmd:dateStamp>
<gmd:identificationInfo>
<gmd:MD_DataIdentification>
<gmd:citation>
<gmd:CI_Citation>
<gmd:title><gco:CharacterString>Test Dataset</gco:CharacterString></gmd:title>
</gmd:CI_Citation>
</gmd:citation>
<gmd:abstract><gco:CharacterString>A test record.</gco:CharacterString></gmd:abstract>
</gmd:MD_DataIdentification>
</gmd:identificationInfo>
</gmd:MD_Metadata>
</csw:SearchResults>
</csw:GetRecordsResponse>"""
def test_single_record_harvest(self):
harvester = CSWHarvester(
endpoint="https://example.com/csw",
session=self._make_session(self.MINIMAL_CSW_RESPONSE),
)
records = list(harvester.harvest())
self.assertEqual(len(records), 1)
self.assertEqual(records[0].identifier, "test-001")
self.assertEqual(records[0].title, "Test Dataset")
def test_missing_identifier_skipped(self):
broken = self.MINIMAL_CSW_RESPONSE.replace(
b"<gco:CharacterString>test-001</gco:CharacterString>", b""
)
harvester = CSWHarvester(
endpoint="https://example.com/csw",
session=self._make_session(broken),
)
records = list(harvester.harvest())
self.assertEqual(len(records), 0)
def test_fingerprint_deterministic(self):
record = RawRecord(
identifier="x", title="T", source_url="https://example.com", raw_xml="<a/>"
)
self.assertEqual(record_fingerprint(record), record_fingerprint(record))
Page size tuning. Start with maxRecords=100. Some CSW servers cap responses at 50 or 200 records regardless of what you request — read the value from GetCapabilities under ows:Constraint name="MaxRecordDefault" before issuing paginated requests.
Connection pooling. Reuse a single requests.Session across all pages for a given endpoint. Sessions maintain HTTP keep-alive connections, reducing TCP handshake overhead on large harvests. Create a new session per endpoint rather than per page.
Parallelising across endpoints. Use concurrent.futures.ThreadPoolExecutor to harvest multiple independent endpoints in parallel. Each harvester instance should own its own session — do not share sessions across threads.
Incremental harvest scheduling. Run incremental harvests (filter by modified >= last_checkpoint) daily. Run full re-harvests weekly or after catalogue migrations, because incremental syncs cannot detect deleted records or identifier changes.
Caching capabilities responses. Store the parsed capabilities document in a local file or Redis key with a 24-hour TTL. Querying GetCapabilities on every run adds latency and may count against rate limits on public catalogues.
Elasticsearch bulk indexing. If downstream storage is Elasticsearch, batch harvested records into _bulk API requests of 500–1000 documents rather than indexing one at a time. Normalise spatial extents to WGS84 before indexing, and store as geo_shape to support bounding-box queries. For DCAT-AP publication from the index, see DCAT-AP for Spatial Data Portals.
numberOfRecordsMatched?Always drive pagination from the actual records returned per response rather than trusting numberOfRecordsMatched. Some CSW implementations return stale counts when the catalogue is being updated concurrently. Terminate the loop when the response body contains zero records or when startPosition exceeds numberOfRecordsMatched by more than one page.
Run incremental harvests daily using the modified timestamp filter, and schedule a full re-harvest weekly or after major catalogue migrations. Full re-harvests catch deletions and identifier changes that incremental syncs miss, but are too resource-intensive to run every cycle. Always store a checkpoint timestamp after each successful incremental run.
Prefer the source’s declared persistent identifier — fileIdentifier in ISO 19139, or id in OGC API – Records. When that is absent or inconsistent across mirrors, fall back to a SHA-256 hash of the canonicalised, namespace-normalised XML or JSON body. Never deduplicate on title alone — duplicate titles are common across jurisdictions and thematic datasets.
Implement per-endpoint circuit breakers via tenacity. After three consecutive 5xx responses, mark the endpoint as OPEN and skip it for a configurable back-off window (e.g. 30 minutes). Route all endpoint state changes to a structured log so on-call engineers can distinguish transient outages from persistent misconfiguration. Harvest other endpoints normally while the broken one is paused.
Extract the srv:serviceType and srv:serviceTypeVersion elements from the ISO 19139 service metadata block. Map these against the INSPIRE spatial data service type code list. Reject or quarantine records where serviceType is absent, and log them to the dead-letter queue for manual classification. The OGC Standards Architecture & Service Fundamentals overview lists the canonical service type identifiers used across OGC service families.
Back to Spatial Metadata & Catalog Integration