Wat is een ETL-pipeline?
ETL staat voor Extract, Transform, Load. Het is het proces dat data verplaatst van een bronsysteem naar een doelsysteem, waarbij de data onderweg wordt omgezet naar een bruikbaar formaat.
- Extract: Data ophalen uit de bron (REST API, SQL-database, SFTP-bestand)
- Transform: Data valideren, cleanen, omrekenen en klaarmaken voor opslag
- Load: Data wegschrijven naar de doeldatabase (Azure SQL)
In de context van Power BI is de ETL-pipeline de verbinding tussen het bronsysteem (Exact Online, OnsDB, WiseTech LSP) en het Azure SQL datawarehouse waarop Power BI draait. Een betrouwbare ETL-pipeline is de basis voor een betrouwbaar dashboard.
De vier stappen: extract, validate, stage, log
Argus BI hanteert een uitgebreide variant van ETL waarbij validatie en logging als aparte stappen worden behandeld:
- Extract: Data ophalen via REST API of directe SQL-verbinding
- Validate: Controleer volledigheid, datatypen, vereiste velden en referentiële integriteit
- Stage: Gecleande data laden naar Azure SQL via UPSERT (INSERT of UPDATE op basis van primaire sleutel)
- Log: Sla run-metadata op: start- en eindtijd, aantal records, status, eventuele foutmelding
De validatiestap als afzonderlijke fase — vóór het laden naar de database — is essentieel. Als de validatie faalt, worden er geen records geladen en blijft de database consistent met de vorige succesvolle run.
Stap 1: Data extraheren via REST API
De meeste bronsystemen (Exact Online, AFAS, BettyBlocks) bieden een REST API met JSON-responses. Het basispatroon voor extractie in Python:
def extract_page(url: str, headers: dict) -> dict:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status() # Gooit exception bij 4xx/5xx
return response.json()
def extract_all_pages(endpoint: str, access_token: str) -> list:
headers = {"Authorization": f"Bearer {access_token}"}
all_records = []
url = endpoint
while url:
data = extract_page(url, headers)
all_records.extend(data.get("d", {}).get("results", []))
url = data.get("d", {}).get("__next") # OData paginering
return all_records
Let op de timeout=30: zonder timeout kan een traag-reagerende API de pipeline voor onbepaalde tijd blokkeren. Stel altijd een timeout in.
Stap 2: Data valideren
Validatie in de ETL-pipeline werkt op twee niveaus: structurele validatie (heeft het record de vereiste velden?) en semantische validatie (zijn de waarden logisch?).
def validate_records(df: pd.DataFrame, required_columns: list, entity: str):
# Structurele validatie
missing = [c for c in required_columns if c not in df.columns]
if missing:
raise ValueError(f"{entity}: ontbrekende kolommen: {missing}")
# Volledigheidscheck
null_counts = df[required_columns].isnull().sum()
critical_nulls = null_counts[null_counts > 0]
if not critical_nulls.empty:
raise ValueError(f"{entity}: NULL-waarden in: {critical_nulls.to_dict()}")
# Recordaantal check (verwacht minimaal 1 record)
if len(df) == 0:
raise ValueError(f"{entity}: geen records ontvangen van API")
Stap 3: Data laden in Azure SQL
Voor het laden naar Azure SQL gebruiken we SQLAlchemy als abstractielaag en een MERGE-statement voor UPSERT-functionaliteit. Het basispatroon:
import pandas as pd
def upsert_to_sql(df: pd.DataFrame, table: str, pk_column: str, engine):
# Laad naar staging-tabel
staging_table = f"staging_{table}"
df.to_sql(staging_table, engine, if_exists='replace', index=False)
# MERGE naar doeltabel
merge_sql = f"""
MERGE dbo.{table} AS target
USING staging.{staging_table} AS source
ON target.{pk_column} = source.{pk_column}
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT (...);
"""
with engine.connect() as conn:
conn.execute(merge_sql)
Credentials voor Azure SQL worden nooit hardcoded in de code. Ze worden opgehaald uit Azure Key Vault via de azure-keyvault-secrets bibliotheek, waarbij de Azure Function gebruikt maakt van een Managed Identity voor toegang tot Key Vault.
Stap 4: Logging en monitoring
Elke run van de ETL-pipeline wordt gelogd in een aparte logtabel in Azure SQL:
run_id UNIQUEIDENTIFIER DEFAULT NEWID() PRIMARY KEY,
pipeline_name NVARCHAR(100) NOT NULL,
entity_name NVARCHAR(100) NOT NULL,
start_time DATETIME2 NOT NULL,
end_time DATETIME2,
records_extracted INT,
records_loaded INT,
status NVARCHAR(20), -- 'SUCCESS', 'FAILED', 'PARTIAL'
error_message NVARCHAR(MAX)
);
In Power BI kun je een aparte "monitoring"-pagina bouwen die de logtabel weergeeft: wanneer liep de laatste succesvolle run, hoeveel records zijn geladen, en zijn er recente fouten? Dit maakt het eenvoudig voor beheerders om de pipeline-gezondheid te monitoren.
Scheduling: Azure Functions timer trigger
Azure Functions met een timer trigger is de meest elegante manier om een Python ETL-pipeline te schedulen. Geen aparte server, geen cron-daemon, volledig beheerd door Azure.
De timer trigger wordt geconfigureerd met een NCRONTAB-expressie:
{
"bindings": [
{
"name": "timer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 0 3 * * *" // Dagelijks om 03:00 UTC
}
]
}
Voor de meeste ETL-pipelines is een nachtelijke run om 03:00 UTC (05:00 Nederlandse tijd, vóór de werkdag) de standaardkeuze. Power BI Service haalt bij de ochtend-refresh de vers gevulde Azure SQL-database op.
Foutafhandeling en retry-logica
REST APIs hebben tijdelijke storingen: 429 (rate limit), 503 (service unavailable), 504 (gateway timeout). Zonder retry-logica faalt de gehele pipeline bij de eerste tijdelijke fout.
De tenacity-bibliotheek biedt een elegante decorator voor retry met exponential backoff:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(requests.HTTPError)
)
def fetch_with_retry(url: str, headers: dict) -> dict:
return extract_page(url, headers)
Incrementele laadstrategie
Een volledige laadstrategie (alle records opnieuw laden bij elke run) is eenvoudig maar schaalt niet bij grote datasets. Incrementeel laden beperkt elke run tot nieuwe en gewijzigde records.
Het basispatroon: sla de datum en tijd van de laatste succesvolle run op in de logtabel. Filter bij extractie op de tijdstempelkolom:
endpoint = (
f"https://start.exactonline.nl/api/v1/{division}/financialtransactions/GLTransactions"
f"?$filter=Modified gt datetime'{last_run.isoformat()}Z'"
)
Let op: niet alle APIs ondersteunen tijdstempelfilters. Bij APIs zonder tijdstempel is een volledige laadstrategie de enige optie, of moet je een hashvergelijking implementeren om gewijzigde records te detecteren.
ETL-pipeline laten bouwen?
Argus BI bouwt betrouwbare Python ETL-pipelines voor Exact Online, AFAS, OnsDB, WiseTech LSP en andere systemen. Volledig op Azure, met monitoring en retry-logica.
Bespreek je ETL-koppelingVeelgestelde vragen
Welke Python-bibliotheken zijn het meest geschikt voor ETL?
De kernstack: requests (HTTP), pandas (transformaties), SQLAlchemy (database-abstractie), azure-functions (scheduling), azure-keyvault-secrets (credentials), tenacity (retry-logica). Voor grotere datasets is pyodbc als alternatief voor SQLAlchemy soms sneller vanwege minder overhead.
Hoe implementeer je retry-logica bij tijdelijke API-fouten?
Gebruik exponential backoff via de tenacity-bibliotheek: na de eerste fout wacht je 1 seconde, na de tweede 2 seconden, na de derde 4 seconden. Na 3 mislukte pogingen logt de pipeline de fout en stopt. Dit voorkomt dat een tijdelijke API-storing de hele nacht de pipeline blokkeert.
Wat is het verschil tussen een volledige en incrementele laadstrategie?
Een volledige laadstrategie laadt elke run alle data opnieuw. Eenvoudig maar inefficiënt bij grote datasets. Een incrementele laadstrategie laadt alleen nieuwe en gewijzigde records via een tijdstempelfilter. Incrementeel is veel sneller en belast de bron-API minder, maar vereist een betrouwbaar Modified-tijdstempel in de brondata.
Hoe plan je een Python ETL-job in op Azure?
De meest gebruikte aanpak is Azure Functions met een timer trigger. Je definieert een NCRONTAB-expressie (bijv. 0 0 3 * * * voor dagelijks om 03:00 UTC) in function.json. Azure Functions beheert de scheduling volledig. De code kan worden geïmplementeerd via GitHub Actions CI/CD voor automatische deploys.