Wat is een ETL-pipeline?

ETL staat voor Extract, Transform, Load. Het is het proces dat data verplaatst van een bronsysteem naar een doelsysteem, waarbij de data onderweg wordt omgezet naar een bruikbaar formaat.

  • Extract: Data ophalen uit de bron (REST API, SQL-database, SFTP-bestand)
  • Transform: Data valideren, cleanen, omrekenen en klaarmaken voor opslag
  • Load: Data wegschrijven naar de doeldatabase (Azure SQL)

In de context van Power BI is de ETL-pipeline de verbinding tussen het bronsysteem (Exact Online, OnsDB, WiseTech LSP) en het Azure SQL datawarehouse waarop Power BI draait. Een betrouwbare ETL-pipeline is de basis voor een betrouwbaar dashboard.

De vier stappen: extract, validate, stage, log

Argus BI hanteert een uitgebreide variant van ETL waarbij validatie en logging als aparte stappen worden behandeld:

  1. Extract: Data ophalen via REST API of directe SQL-verbinding
  2. Validate: Controleer volledigheid, datatypen, vereiste velden en referentiële integriteit
  3. Stage: Gecleande data laden naar Azure SQL via UPSERT (INSERT of UPDATE op basis van primaire sleutel)
  4. Log: Sla run-metadata op: start- en eindtijd, aantal records, status, eventuele foutmelding

De validatiestap als afzonderlijke fase — vóór het laden naar de database — is essentieel. Als de validatie faalt, worden er geen records geladen en blijft de database consistent met de vorige succesvolle run.

Stap 1: Data extraheren via REST API

De meeste bronsystemen (Exact Online, AFAS, BettyBlocks) bieden een REST API met JSON-responses. Het basispatroon voor extractie in Python:

import requests

def extract_page(url: str, headers: dict) -> dict:
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status() # Gooit exception bij 4xx/5xx
    return response.json()

def extract_all_pages(endpoint: str, access_token: str) -> list:
    headers = {"Authorization": f"Bearer {access_token}"}
    all_records = []
    url = endpoint
    while url:
        data = extract_page(url, headers)
        all_records.extend(data.get("d", {}).get("results", []))
        url = data.get("d", {}).get("__next") # OData paginering
    return all_records

Let op de timeout=30: zonder timeout kan een traag-reagerende API de pipeline voor onbepaalde tijd blokkeren. Stel altijd een timeout in.

Stap 2: Data valideren

Validatie in de ETL-pipeline werkt op twee niveaus: structurele validatie (heeft het record de vereiste velden?) en semantische validatie (zijn de waarden logisch?).

import pandas as pd

def validate_records(df: pd.DataFrame, required_columns: list, entity: str):
    # Structurele validatie
    missing = [c for c in required_columns if c not in df.columns]
    if missing:
        raise ValueError(f"{entity}: ontbrekende kolommen: {missing}")

    # Volledigheidscheck
    null_counts = df[required_columns].isnull().sum()
    critical_nulls = null_counts[null_counts > 0]
    if not critical_nulls.empty:
        raise ValueError(f"{entity}: NULL-waarden in: {critical_nulls.to_dict()}")

    # Recordaantal check (verwacht minimaal 1 record)
    if len(df) == 0:
        raise ValueError(f"{entity}: geen records ontvangen van API")

Stap 3: Data laden in Azure SQL

Voor het laden naar Azure SQL gebruiken we SQLAlchemy als abstractielaag en een MERGE-statement voor UPSERT-functionaliteit. Het basispatroon:

from sqlalchemy import create_engine
import pandas as pd

def upsert_to_sql(df: pd.DataFrame, table: str, pk_column: str, engine):
    # Laad naar staging-tabel
    staging_table = f"staging_{table}"
    df.to_sql(staging_table, engine, if_exists='replace', index=False)

    # MERGE naar doeltabel
    merge_sql = f"""
        MERGE dbo.{table} AS target
        USING staging.{staging_table} AS source
        ON target.{pk_column} = source.{pk_column}
        WHEN MATCHED THEN UPDATE SET ...
        WHEN NOT MATCHED THEN INSERT (...);
    """
    with engine.connect() as conn:
        conn.execute(merge_sql)

Credentials voor Azure SQL worden nooit hardcoded in de code. Ze worden opgehaald uit Azure Key Vault via de azure-keyvault-secrets bibliotheek, waarbij de Azure Function gebruikt maakt van een Managed Identity voor toegang tot Key Vault.

Stap 4: Logging en monitoring

Elke run van de ETL-pipeline wordt gelogd in een aparte logtabel in Azure SQL:

CREATE TABLE etl.pipeline_log (
    run_id UNIQUEIDENTIFIER DEFAULT NEWID() PRIMARY KEY,
    pipeline_name NVARCHAR(100) NOT NULL,
    entity_name NVARCHAR(100) NOT NULL,
    start_time DATETIME2 NOT NULL,
    end_time DATETIME2,
    records_extracted INT,
    records_loaded INT,
    status NVARCHAR(20), -- 'SUCCESS', 'FAILED', 'PARTIAL'
    error_message NVARCHAR(MAX)
);

In Power BI kun je een aparte "monitoring"-pagina bouwen die de logtabel weergeeft: wanneer liep de laatste succesvolle run, hoeveel records zijn geladen, en zijn er recente fouten? Dit maakt het eenvoudig voor beheerders om de pipeline-gezondheid te monitoren.

Scheduling: Azure Functions timer trigger

Azure Functions met een timer trigger is de meest elegante manier om een Python ETL-pipeline te schedulen. Geen aparte server, geen cron-daemon, volledig beheerd door Azure.

De timer trigger wordt geconfigureerd met een NCRONTAB-expressie:

# function.json
{
  "bindings": [
    {
      "name": "timer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 3 * * *" // Dagelijks om 03:00 UTC
    }
  ]
}

Voor de meeste ETL-pipelines is een nachtelijke run om 03:00 UTC (05:00 Nederlandse tijd, vóór de werkdag) de standaardkeuze. Power BI Service haalt bij de ochtend-refresh de vers gevulde Azure SQL-database op.

Foutafhandeling en retry-logica

REST APIs hebben tijdelijke storingen: 429 (rate limit), 503 (service unavailable), 504 (gateway timeout). Zonder retry-logica faalt de gehele pipeline bij de eerste tijdelijke fout.

De tenacity-bibliotheek biedt een elegante decorator voor retry met exponential backoff:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(requests.HTTPError)
)
def fetch_with_retry(url: str, headers: dict) -> dict:
    return extract_page(url, headers)

Incrementele laadstrategie

Een volledige laadstrategie (alle records opnieuw laden bij elke run) is eenvoudig maar schaalt niet bij grote datasets. Incrementeel laden beperkt elke run tot nieuwe en gewijzigde records.

Het basispatroon: sla de datum en tijd van de laatste succesvolle run op in de logtabel. Filter bij extractie op de tijdstempelkolom:

last_run = get_last_successful_run("gl_transactions")
endpoint = (
    f"https://start.exactonline.nl/api/v1/{division}/financialtransactions/GLTransactions"
    f"?$filter=Modified gt datetime'{last_run.isoformat()}Z'"
)

Let op: niet alle APIs ondersteunen tijdstempelfilters. Bij APIs zonder tijdstempel is een volledige laadstrategie de enige optie, of moet je een hashvergelijking implementeren om gewijzigde records te detecteren.

ETL-pipeline laten bouwen?

Argus BI bouwt betrouwbare Python ETL-pipelines voor Exact Online, AFAS, OnsDB, WiseTech LSP en andere systemen. Volledig op Azure, met monitoring en retry-logica.

Bespreek je ETL-koppeling

Veelgestelde vragen

Welke Python-bibliotheken zijn het meest geschikt voor ETL?

De kernstack: requests (HTTP), pandas (transformaties), SQLAlchemy (database-abstractie), azure-functions (scheduling), azure-keyvault-secrets (credentials), tenacity (retry-logica). Voor grotere datasets is pyodbc als alternatief voor SQLAlchemy soms sneller vanwege minder overhead.

Hoe implementeer je retry-logica bij tijdelijke API-fouten?

Gebruik exponential backoff via de tenacity-bibliotheek: na de eerste fout wacht je 1 seconde, na de tweede 2 seconden, na de derde 4 seconden. Na 3 mislukte pogingen logt de pipeline de fout en stopt. Dit voorkomt dat een tijdelijke API-storing de hele nacht de pipeline blokkeert.

Wat is het verschil tussen een volledige en incrementele laadstrategie?

Een volledige laadstrategie laadt elke run alle data opnieuw. Eenvoudig maar inefficiënt bij grote datasets. Een incrementele laadstrategie laadt alleen nieuwe en gewijzigde records via een tijdstempelfilter. Incrementeel is veel sneller en belast de bron-API minder, maar vereist een betrouwbaar Modified-tijdstempel in de brondata.

Hoe plan je een Python ETL-job in op Azure?

De meest gebruikte aanpak is Azure Functions met een timer trigger. Je definieert een NCRONTAB-expressie (bijv. 0 0 3 * * * voor dagelijks om 03:00 UTC) in function.json. Azure Functions beheert de scheduling volledig. De code kan worden geïmplementeerd via GitHub Actions CI/CD voor automatische deploys.