top of page
Search
Writer's pictureMarco G J

Automated RESULTS Forest Cover Polygon WFS retrieval from Timber Mark Opening ID in QGIS

Updated: Dec 6, 2024

For forestry professionals seeking fast, accurate, and official forest cover data extraction in QGIS, we provide a python script that ensures direct access to the most current and authoritative RESULTS data, simplifying workflows and reducing data redundancy. It leverages existing data structures rather than creating duplicates.


All kinds of attribute-based and location-based filters can be performed on the RESULTS authoritative database in order to track and evaluate post-harvest forestry practices and compliance.


We share here a simple Python script for the quasi-instant retrieval of forest cover polygons for any TIMBER_MARK by accessing the current official RESULTS database from the BC Government's WFS service.


Use the RESULTS OPENINGS WFS layer to identify a TIMBER_MARK's OPENING_ID, then use the latter to retrieve matching forest cover polygons from the RESULTS Forest Cover WFS layer.


Two polygon-type spatial layers are outputted. One corresponds to all openings associated with a timber mark. The other corresponds to all forest cover polygons associated with all those openings.




import traceback
from qgis.core import (
    QgsProject,
    QgsVectorLayer,
    QgsFeature,
    QgsWkbTypes,
    QgsField
)
from qgis.utils import iface
from qgis.PyQt.QtWidgets import QInputDialog, QMessageBox
from qgis.PyQt.QtCore import QVariant, QTimer, QObject, pyqtSignal
from urllib.parse import quote
class WFSLoader(QObject):
    layer_loaded = pyqtSignal(QgsVectorLayer)
    def __init__(self, url, typename, srsname, filter_expression, layer_name, max_retries=5, retry_interval=2000):
        super().__init__()
        self.url = url
        self.typename = typename
        self.srsname = srsname
        self.filter_expression = filter_expression
        self.layer_name = layer_name
        self.layer = None
        self.attempts = 0
        self.max_retries = max_retries
        self.retry_interval = retry_interval
    def load_layer(self):
        try:
            encoded_filter = quote(self.filter_expression)
            # Use WFS 1.1.0 and sortBy=OPENING_ID to avoid natural order issues
            uri = (f"{self.url}?service=WFS&version=1.1.0&request=GetFeature&typename={self.typename}"
                   f"&srsname={self.srsname}&sortBy=OPENING_ID&CQL_FILTER={encoded_filter}")
            print(f"[DEBUG] Loading WFS layer from URI: {uri}")
            self.layer = QgsVectorLayer(uri, self.layer_name, "WFS")
            if not self.layer.isValid():
                print(f"[ERROR] Layer {self.layer_name} is invalid.")
                QMessageBox.critical(None, "Layer Load Error", f"Failed to load layer {self.layer_name}.")
                self.layer_loaded.emit(None)
                return
            QgsProject.instance().addMapLayer(self.layer)
            self.layer.triggerRepaint()
            self.layer.updateExtents()
            # Start a retry-based checking process instead of a single-shot 2s timer
            self.check_layer_loaded()
        except Exception:
            print("[EXCEPTION] Exception in load_layer:")
            print(traceback.format_exc())
            self.layer_loaded.emit(None)
    def check_layer_loaded(self):
        # Retry logic to ensure features are loaded
        try:
            if self.layer is None:
                print("[DEBUG] No layer to check.")
                self.layer_loaded.emit(None)
                return
            feature_count = self.layer.featureCount()
            if feature_count > 0:
                print(f"[DEBUG] {self.layer_name} loaded successfully with {feature_count} features.")
                self.layer_loaded.emit(self.layer)
            else:
                self.attempts += 1
                if self.attempts < self.max_retries:
                    print(f"[DEBUG] {self.layer_name} feature count is 0. Retrying {self.attempts}/{self.max_retries}...")
                    QTimer.singleShot(self.retry_interval, self.check_layer_loaded)
                else:
                    print(f"[WARNING] {self.layer_name} still has no features after {self.max_retries} attempts.")
                    QMessageBox.warning(None, "No Features", f"No features in {self.layer_name} with given filter.")
                    self.layer_loaded.emit(self.layer)
        except Exception:
            print("[EXCEPTION] Exception in check_layer_loaded:")
            print(traceback.format_exc())
            self.layer_loaded.emit(None)
def layer_to_memory(source_layer, mem_name):
    if source_layer is None:
        print("[ERROR] source_layer is None in layer_to_memory")
        return None
    print(f"[DEBUG] Converting {source_layer.name()} to memory layer: {mem_name}")
    geom_type = source_layer.wkbType()
    gt = QgsWkbTypes.geometryType(geom_type)
    is_multi = QgsWkbTypes.isMultiType(geom_type)
    has_z = QgsWkbTypes.hasZ(geom_type)
    has_m = QgsWkbTypes.hasM(geom_type)
    if gt == QgsWkbTypes.PointGeometry:
        base = "Point"
    elif gt == QgsWkbTypes.LineGeometry:
        base = "LineString"
    elif gt == QgsWkbTypes.PolygonGeometry:
        base = "Polygon"
    else:
        base = "Polygon"
    if is_multi:
        base = "Multi" + base
    if has_z:
        base += "Z"
    if has_m:
        base += "M"
    geom_str = base
    crs = source_layer.crs().authid()
    mem_layer = QgsVectorLayer(f"{geom_str}?crs={crs}", mem_name, "memory")
    if not mem_layer.isValid():
        print("[ERROR] Memory layer is not valid.")
        return None
    prov = mem_layer.dataProvider()
    prov.addAttributes(source_layer.fields())
    mem_layer.updateFields()
    feats = []
    for f in source_layer.getFeatures():
        new_f = QgsFeature(mem_layer.fields())
        new_f.setGeometry(f.geometry())
        for idx, field in enumerate(source_layer.fields()):
            new_f.setAttribute(idx, f[field.name()])
        feats.append(new_f)
    prov.addFeatures(feats)
    mem_layer.updateExtents()
    QgsProject.instance().addMapLayer(mem_layer)
    feature_count = mem_layer.featureCount()
    print(f"[DEBUG] Memory layer {mem_name} created with {feature_count} features.")
    return mem_layer
def add_fields_from_openings(forest_mem, opening_dict):
    if forest_mem is None:
        print("[ERROR] forest_mem is None in add_fields_from_openings")
        return None
    print("[DEBUG] Adding TIMBER_MARK and CUT_BLOCK_ID fields to forest_mem layer.")
    prov = forest_mem.dataProvider()
    if not forest_mem.isEditable():
        forest_mem.startEditing()
    prov.addAttributes([QgsField("TIMBER_MARK", QVariant.String), QgsField("CUT_BLOCK_ID", QVariant.String)])
    forest_mem.updateFields()
    tm_idx = forest_mem.fields().indexOf("TIMBER_MARK")
    cb_idx = forest_mem.fields().indexOf("CUT_BLOCK_ID")
    for f in forest_mem.getFeatures():
        oid_val = f["OPENING_ID"]
        fid = f.id()
        if oid_val in opening_dict:
            tmark, cblock = opening_dict[oid_val]
            forest_mem.changeAttributeValue(fid, tm_idx, tmark)
            forest_mem.changeAttributeValue(fid, cb_idx, cblock)
    forest_mem.commitChanges()
    return forest_mem
def check_layer_status(opening_layer, forest_cover_layer):
    # If FOREST_COVER has features, zoom to it
    if forest_cover_layer and forest_cover_layer.featureCount() > 0:
        print("[DEBUG] Zooming to forest_cover_layer.")
        iface.setActiveLayer(forest_cover_layer)
        iface.zoomToActiveLayer()
def process_layers(opening_mem_layer, opening_dict):
    opening_ids = set(f['OPENING_ID'] for f in opening_mem_layer.getFeatures())
    if len(opening_ids) == 0:
        print("[WARNING] No OPENING_IDs found in the opening_mem_layer.")
        return
    max_ids = 100
    if len(opening_ids) > max_ids:
        opening_ids = set(list(opening_ids)[:max_ids])
    # Attempt to convert OIDs to int if needed
    try:
        opening_ids_int = [int(oid) for oid in opening_ids]
        opening_ids_list = ','.join(str(oid) for oid in opening_ids_int)
    except:
        opening_ids_list = ','.join(str(oid) for oid in opening_ids)
    forest_cover_filter = f"OPENING_ID IN ({opening_ids_list})"
    forest_cover_layer_name = f"RSLT_FOREST_COVER_INV_SVW_{layer_suffix}"
    def on_forest_cover_layer_loaded(layer):
        if layer is None:
            print("[ERROR] Forest cover layer failed to load.")
            return
        # Debug: print fields from forest cover layer
        fc_field_names = [f.name() for f in layer.fields()]
        print(f"[DEBUG] Forest Cover Layer Fields: {fc_field_names}")
        print(f"[DEBUG] Forest Cover Feature Count: {layer.featureCount()}")
        # Convert FOREST_COVER to memory
        forest_mem = layer_to_memory(layer, layer.name() + "_mem")
        if forest_mem is None:
            print("[ERROR] Failed to create FOREST_COVER memory layer. Keeping WFS layer.")
            return
        # If memory creation succeeded, remove WFS layer
        QgsProject.instance().removeMapLayer(layer.id())
        # Add required fields from openings
        enhanced_layer = add_fields_from_openings(forest_mem, opening_dict)
        check_layer_status(opening_mem_layer, enhanced_layer)
    print("[DEBUG] Loading Forest Cover WFS Layer with filter:", forest_cover_filter)
    forest_cover_loader = WFSLoader(
        "https://openmaps.gov.bc.ca/geo/pub/WHSE_FOREST_VEGETATION.RSLT_FOREST_COVER_INV_SVW/wfs",
        "pub:WHSE_FOREST_VEGETATION.RSLT_FOREST_COVER_INV_SVW",
        "EPSG:3005",
        forest_cover_filter,
        forest_cover_layer_name
    )
    forest_cover_loader.layer_loaded.connect(on_forest_cover_layer_loaded)
    forest_cover_loader.load_layer()
def on_opening_layer_loaded(layer):
    if layer is None:
        print("[ERROR] Opening layer failed to load.")
        return
    # Check fields for the openings layer
    ofield_names = [f.name() for f in layer.fields()]
    print(f"[DEBUG] Opening Layer Fields: {ofield_names}")
    print(f"[DEBUG] Opening Layer Feature Count: {layer.featureCount()}")
    # Convert OPENING WFS to memory
    opening_mem = layer_to_memory(layer, layer.name() + "_mem")
    if opening_mem is None:
        print("[ERROR] Failed to create OPENING memory layer. Keeping WFS layer.")
        return
    # Remove original WFS layer since memory is created successfully
    QgsProject.instance().removeMapLayer(layer.id())
    field_names = [f.name() for f in opening_mem.fields()]
    required_fields = ["OPENING_ID", "TIMBER_MARK", "CUT_BLOCK_ID"]
    for rf in required_fields:
        if rf not in field_names:
            QMessageBox.critical(None, "Missing Fields",
                                 f"OPENING layer missing required field: {rf}")
            return
    opening_dict = {}
    for f in opening_mem.getFeatures():
        oid = f["OPENING_ID"]
        tmark = f["TIMBER_MARK"]
        cblock = f["CUT_BLOCK_ID"]
        if oid is not None:
            opening_dict[oid] = (tmark, cblock)
    # Now we proceed to get the forest cover layer
    process_layers(opening_mem, opening_dict)
# Prompt user for TIMBER_MARK and CUT_BLOCK_ID
timber_mark, ok = QInputDialog.getText(None, "Input", "Enter TIMBER_MARK (required):")
if not ok or not timber_mark.strip():
    raise Exception("No TIMBER_MARK provided")
timber_mark = timber_mark.strip()
cut_block_id, ok2 = QInputDialog.getText(None, "Input", "Enter CUT_BLOCK_ID (optional):")
if not ok2:
    cut_block_id = ""
cut_block_id = cut_block_id.strip()
if cut_block_id:
    opening_filter = f"TIMBER_MARK = '{timber_mark}' AND CUT_BLOCK_ID = '{cut_block_id}'"
    layer_suffix = f"{timber_mark}_{cut_block_id}"
else:
    opening_filter = f"TIMBER_MARK = '{timber_mark}'"
    layer_suffix = timber_mark
opening_layer_name = f"RSLT_OPENING_SVW_{layer_suffix}"
print("[DEBUG] Loading Opening WFS Layer with filter:", opening_filter)
# Load the OPENING layer
opening_loader = WFSLoader(
    "https://openmaps.gov.bc.ca/geo/pub/WHSE_FOREST_VEGETATION.RSLT_OPENING_SVW/wfs",
    "pub:WHSE_FOREST_VEGETATION.RSLT_OPENING_SVW",
    "EPSG:3005",
    opening_filter,
    opening_layer_name
)
opening_loader.layer_loaded.connect(on_opening_layer_loaded)
opening_loader.load_layer()

8 views0 comments

Comments


bottom of page