Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
f248b9d
fix: add venv and local_testing to .gitignore Rapid local performance…
mattsan-dev Feb 2, 2026
e58d438
feat: add command-line interface for title-boundary pipeline with arg…
mattsan-dev Feb 2, 2026
c3f82f4
feat: implement file downloader for title-boundary GML files with pro…
mattsan-dev Feb 2, 2026
0b2c2dc
feat: add GML converter with multiple output formats including CSV an…
mattsan-dev Feb 2, 2026
ec0a0de
feat: add GML extractor for title-boundary datasets with ZIP archive …
mattsan-dev Feb 2, 2026
92558de
feat: add Makefile for title boundary pipeline setup and management R…
mattsan-dev Feb 2, 2026
52be8ab
feat: add pipeline configuration management for title-boundary datase…
mattsan-dev Feb 2, 2026
f391856
feat: update .gitignore to include local testing scripts and README R…
mattsan-dev Feb 2, 2026
82857ad
feat: enhance Makefile and CLI for improved pipeline comparison and a…
mattsan-dev Feb 6, 2026
bfeb931
feat: update pipeline configuration files and add README for local te…
mattsan-dev Feb 6, 2026
1ef4f09
Refactor pipeline scripts: remove old main.py, pipeline_report.py, an…
mattsan-dev Feb 6, 2026
ecd0b4f
refactor: improve code formatting and readability across multiple fil…
mattsan-dev Feb 9, 2026
f6aca1f
style: flake8 improve code formatting and readability in GML converte…
mattsan-dev Feb 9, 2026
bf2fe7b
fix: improve pipeline report formatting and update flake8 ignore rule…
mattsan-dev Feb 9, 2026
83a5c93
Add Polars phases for data processing pipeline
mattsan-dev Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ demodata/
.eggs
*.gfs
.venv
/venv
.direnv
var/cache
/collection
Expand All @@ -35,4 +36,16 @@ docs/modules.rst

# don't store data folder for use as storage for notebooks
notebooks/data/
notebooks/.ipynb_checkpoints
notebooks/.ipynb_checkpoints

# local_testing
/local_testing/cache/
/local_testing/converted/
/local_testing/extracted/
/local_testing/output/
/local_testing/polars_phases/
/local_testing/raw/
/local_testing/reports/
/local_testing/specification/
/local_testing/venv/

270 changes: 189 additions & 81 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,32 @@
from digital_land.phase.save import SavePhase
from digital_land.pipeline import run_pipeline, Lookups, Pipeline
from digital_land.pipeline.process import convert_tranformed_csv_to_pq
from digital_land.phase_polars import run_polars_pipeline
from digital_land.phase_polars import (
ConvertPhase as PolarsConvertPhase,
NormalisePhase as PolarsNormalisePhase,
ConcatFieldPhase as PolarsConcatFieldPhase,
FilterPhase as PolarsFilterPhase,
MapPhase as PolarsMapPhase,
PatchPhase as PolarsPatchPhase,
HarmonisePhase as PolarsHarmonisePhase,
DefaultPhase as PolarsDefaultPhase,
MigratePhase as PolarsMigratePhase,
OrganisationPhase as PolarsOrganisationPhase,
FieldPrunePhase as PolarsFieldPrunePhase,
EntityPrunePhase as PolarsEntityPrunePhase,
FactPrunePhase as PolarsFactPrunePhase,
EntityReferencePhase as PolarsEntityReferencePhase,
FactReferencePhase as PolarsFactReferencePhase,
EntityPrefixPhase as PolarsEntityPrefixPhase,
EntityLookupPhase as PolarsEntityLookupPhase,
FactLookupPhase as PolarsFactLookupPhase,
SavePhase as PolarsSavePhase,
PivotPhase as PolarsPivotPhase,
FactCombinePhase as PolarsFactCombinePhase,
FactorPhase as PolarsFactorPhase,
PriorityPhase as PolarsPriorityPhase,
)
from digital_land.schema import Schema
from digital_land.update import add_source_endpoint
from digital_land.configuration.main import Config
Expand Down Expand Up @@ -237,6 +263,7 @@ def pipeline_run(
resource=None,
output_log_dir=None,
converted_path=None,
use_polars=False,
):
# set up paths
cache_dir = Path(cache_dir)
Expand Down Expand Up @@ -302,87 +329,168 @@ def pipeline_run(
if "entry-date" not in default_values:
default_values["entry-date"] = entry_date

# TODO Migrate all of this into a function in the Pipeline function
run_pipeline(
ConvertPhase(
path=input_path,
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
output_path=converted_path,
),
NormalisePhase(skip_patterns=skip_patterns),
ParsePhase(),
ConcatFieldPhase(concats=concats, log=column_field_log),
FilterPhase(filters=pipeline.filters(resource)),
MapPhase(
fieldnames=intermediate_fieldnames,
columns=columns,
log=column_field_log,
),
FilterPhase(filters=pipeline.filters(resource, endpoints=endpoints)),
PatchPhase(
issues=issue_log,
patches=patches,
),
HarmonisePhase(
field_datatype_map=specification.get_field_datatype_map(),
issues=issue_log,
dataset=dataset,
valid_category_values=valid_category_values,
),
DefaultPhase(
default_fields=default_fields,
default_values=default_values,
issues=issue_log,
),
# TBD: move migrating columns to fields to be immediately after map
# this will simplify harmonisation and remove intermediate_fieldnames
# but effects brownfield-land and other pipelines which operate on columns
MigratePhase(
fields=specification.schema_field[schema],
migrations=pipeline.migrations(),
),
OrganisationPhase(organisation=organisation, issues=issue_log),
FieldPrunePhase(fields=specification.current_fieldnames(schema)),
EntityReferencePhase(
dataset=dataset,
prefix=specification.dataset_prefix(dataset),
issues=issue_log,
),
EntityPrefixPhase(dataset=dataset),
EntityLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
operational_issue_log=operational_issue_log,
entity_range=[entity_range_min, entity_range_max],
),
SavePhase(
default_output_path("harmonised", input_path),
fieldnames=intermediate_fieldnames,
enabled=save_harmonised,
),
EntityPrunePhase(dataset_resource_log=dataset_resource_log),
PriorityPhase(config=config, providers=organisations),
PivotPhase(),
FactCombinePhase(issue_log=issue_log, fields=combine_fields),
FactorPhase(),
FactReferencePhase(
field_typology_map=specification.get_field_typology_map(),
field_prefix_map=specification.get_field_prefix_map(),
),
FactLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
odp_collections=specification.get_odp_collections(),
),
FactPrunePhase(),
SavePhase(
output_path,
fieldnames=specification.factor_fieldnames(),
),
)
if use_polars:
# ── Polars-based pipeline ──────────────────────────────────────────
run_polars_pipeline(
PolarsConvertPhase(
path=input_path,
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
output_path=converted_path,
),
PolarsNormalisePhase(skip_patterns=skip_patterns),
# ParsePhase is not needed – ConvertPhase already produces a DataFrame
PolarsConcatFieldPhase(concats=concats, log=column_field_log),
PolarsFilterPhase(filters=pipeline.filters(resource)),
PolarsMapPhase(
fieldnames=intermediate_fieldnames,
columns=columns,
log=column_field_log,
),
PolarsFilterPhase(filters=pipeline.filters(resource, endpoints=endpoints)),
PolarsPatchPhase(
issues=issue_log,
patches=patches,
),
PolarsHarmonisePhase(
field_datatype_map=specification.get_field_datatype_map(),
issues=issue_log,
dataset=dataset,
valid_category_values=valid_category_values,
),
PolarsDefaultPhase(
default_fields=default_fields,
default_values=default_values,
issues=issue_log,
),
PolarsMigratePhase(
fields=specification.schema_field[schema],
migrations=pipeline.migrations(),
),
PolarsOrganisationPhase(organisation=organisation, issues=issue_log),
PolarsFieldPrunePhase(fields=specification.current_fieldnames(schema)),
PolarsEntityReferencePhase(
dataset=dataset,
prefix=specification.dataset_prefix(dataset),
issues=issue_log,
),
PolarsEntityPrefixPhase(dataset=dataset),
PolarsEntityLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
operational_issue_log=operational_issue_log,
entity_range=[entity_range_min, entity_range_max],
),
PolarsSavePhase(
default_output_path("harmonised", input_path),
fieldnames=intermediate_fieldnames,
enabled=save_harmonised,
),
PolarsEntityPrunePhase(dataset_resource_log=dataset_resource_log),
PolarsPriorityPhase(config=config, providers=organisations),
PolarsPivotPhase(),
PolarsFactCombinePhase(issue_log=issue_log, fields=combine_fields),
PolarsFactorPhase(),
PolarsFactReferencePhase(
field_typology_map=specification.get_field_typology_map(),
field_prefix_map=specification.get_field_prefix_map(),
),
PolarsFactLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
odp_collections=specification.get_odp_collections(),
),
PolarsFactPrunePhase(),
PolarsSavePhase(
output_path,
fieldnames=specification.factor_fieldnames(),
),
)
else:
# ── Original streaming pipeline ────────────────────────────────────
# TODO Migrate all of this into a function in the Pipeline function
run_pipeline(
ConvertPhase(
path=input_path,
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
output_path=converted_path,
),
NormalisePhase(skip_patterns=skip_patterns),
ParsePhase(),
ConcatFieldPhase(concats=concats, log=column_field_log),
FilterPhase(filters=pipeline.filters(resource)),
MapPhase(
fieldnames=intermediate_fieldnames,
columns=columns,
log=column_field_log,
),
FilterPhase(filters=pipeline.filters(resource, endpoints=endpoints)),
PatchPhase(
issues=issue_log,
patches=patches,
),
HarmonisePhase(
field_datatype_map=specification.get_field_datatype_map(),
issues=issue_log,
dataset=dataset,
valid_category_values=valid_category_values,
),
DefaultPhase(
default_fields=default_fields,
default_values=default_values,
issues=issue_log,
),
# TBD: move migrating columns to fields to be immediately after map
# this will simplify harmonisation and remove intermediate_fieldnames
# but effects brownfield-land and other pipelines which operate on columns
MigratePhase(
fields=specification.schema_field[schema],
migrations=pipeline.migrations(),
),
OrganisationPhase(organisation=organisation, issues=issue_log),
FieldPrunePhase(fields=specification.current_fieldnames(schema)),
EntityReferencePhase(
dataset=dataset,
prefix=specification.dataset_prefix(dataset),
issues=issue_log,
),
EntityPrefixPhase(dataset=dataset),
EntityLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
operational_issue_log=operational_issue_log,
entity_range=[entity_range_min, entity_range_max],
),
SavePhase(
default_output_path("harmonised", input_path),
fieldnames=intermediate_fieldnames,
enabled=save_harmonised,
),
EntityPrunePhase(dataset_resource_log=dataset_resource_log),
PriorityPhase(config=config, providers=organisations),
PivotPhase(),
FactCombinePhase(issue_log=issue_log, fields=combine_fields),
FactorPhase(),
FactReferencePhase(
field_typology_map=specification.get_field_typology_map(),
field_prefix_map=specification.get_field_prefix_map(),
),
FactLookupPhase(
lookups=lookups,
redirect_lookups=redirect_lookups,
issue_log=issue_log,
odp_collections=specification.get_odp_collections(),
),
FactPrunePhase(),
SavePhase(
output_path,
fieldnames=specification.factor_fieldnames(),
),
)

# In the FactCombinePhase, when combine_fields has some values, we check for duplicates and combine values.
# If we have done this then we will not call duplicate_reference_check as we have already carried out a
Expand Down
41 changes: 0 additions & 41 deletions digital_land/phase_polars/README.md

This file was deleted.

Loading
Loading