image for Rammeverket der du kun trenger én pipeline for alle dataintegrasjoneen dine - slik bygde jeg GULP

Rammeverket der du kun trenger én pipeline for alle dataintegrasjoneen dine - slik bygde jeg GULP

Hos altfor mange datateam ser ingestionlaget omtrent slik ut: én notebook per datasett, én pipeline per kilde, og en vedlikeholdsbyrde som vokser proporsjonalt med antall integrasjoner. Etter hvert som teamet skalerer, er det ikke uvanlig å ha 30–50 notebooks som er 90% identiske, men med noe ulike løsninger ut ifra hvem på teamet som bygde integrasjonen. Her er også servernavn, API-endepunkter, tabeller og SQL-spørringer hardkodet inn slik at det er vanskelig å få oversikt over ulike parameterne som styrer den faktiske integrasjonen. Når noe feiler i produksjon, og du som Data Engineer må inn i koden for å debugge blir standardspørsmålet: "Okey, hvordan var det vi hadde løst det her?". Dette skaper store kognitive laster vi gjerne kunne vært foruten.

Som Hermine Grang sa så bra:

"There must be another way!"

Derfor lagde jeg GULP = Generic Unified Load Pipeline (repolink), som er mitt forsøk på å rydde opp i dette rotet. Ideen er enkel: ha én statisk pipeline, og la konfigurasjonen gjøre jobben.

Obs: Dette rammeverket er bygget med Databricks, Azure Data Factory og Azure Data Lake Storage som stack, men akkurat de samme prinsippene kan benyttes for andre teknologier også.

Problemet: ingestionspredning

La oss si at du henter data fra fem SQL-kilder, to REST-APIer og en filserver. Med en tradisjonell tilnærming ender du opp med noe som dette:

notebooks/
extract_source_a_dataset_b.py # kopiér-lim fra forrige
extract_source_a_dataset_c.py
extract_source_b_dataset_x.py
extract_source_b_dataset_y.py
...

Når du skal endre noe logikk som brukes på tvers, må du inn i alle notebooks. Hvordan sikrer du at du løser det likt overalt? Og hvordan sørger du for at alle notebooks faktisk blir oppdatert? Når du legger til en ny kilde, kopierer du en eksisterende og håper du husker alle stedene du må bytte ut navn og verdier. Konfigurasjonen sitter i koden, og koden er konfigurasjonen.

Det fundamentale problemet er at runtimelogikk og datasettkonfigurasjonen er vevd sammen. GULP skiller dem.

Hva mener vi med extract og load?

Før vi ser på løsningen er det verdt å definere begrepene vi bruker.

Extract er steget der data hentes fra en kilde, for eksempel en SQL-database, et REST-API eller en filserver. Ingestion skjer typisk inkrementelt: istedenfor å hente alt hver gang, henter vi kun data som er nyere enn forrige kjøring. For å styre dette bruker vi to tidsstempler last_batch_timestamp (når forrige kjøring skjedde) og current_batch_timestamp (nåværende kjøring) slik at extractoren vet nøyaktig hvilket tidsvindu den skal hente data for. Resultatet skrives til landingssonen i data laken, partisjonert på batchtidsstempel:

landing/<source>/<dataset>/batch_timestamp=<tidsstempel>/<uuid>.parquet

Load er steget der de landede rådataene skrives som en tabell i dataplattformen din.

Det som bevisst mangler her er et transformsteg. GULP handler om å få rådata trygt inn i plattformen din. Transformasjoner hører hjemme nedstrøms, i et separat lag. Dette holder rammeverket smalt og lett å resonnere rundt.

En ExtractLoadConfig følger et deklarativt tankesett: den beskriver hva som skal skje: hvilken kilde, hvilket datasett, hvilken tabell som skal lastes, og ikke hvordan det teknisk gjennomføres. Det er runtimens ansvar.

Ideen illustrert med ett diagram

Hele rammeverket kan oppsummeres i én flyt:

Utvikleren skriver en YAML-config og laster den opp. ADF- og Databricks-runtimen vet ingenting om hvilken kilde de kjører mot, men det vet konfigurasjonen. En pipeline, mange dataintegrasjoner.

De tre delene

GULP er bygget av tre komponenter som kan forstås uavhengig av hverandre.

libs/extract-and-load: modellene

Dette biblioteket definerer alle typede config-modeller med Pydantic v2. Toppnivåmodellen er ExtractLoadConfig:

class ExtractLoadConfig(BaseConfig):
kind: Literal["ExtractLoadConfig"]
metadata: Metadata
spec: ExtractLoadSpec
class ExtractLoadSpec()
common: Common # informasjon som brukes på tvers av extract og load
extract: ExtractConfig # informasjon som kun brukes av extract
load: LoadConfig # informasjon som kun brukes av load

ExtractConfig er en diskriminert union (ref) der kind-feltet bestemmer hvilken implementasjon som brukes:

ExtractConfig = Annotated[
SqlExtractConfig
| DummyApiExtractConfig,
Field(discriminator="kind"),
]

Biblioteket inneholder også BaseExtractor, som er en abstrakt klasse som håndhever felles logikk for alle extractorer, som validering av tidsstempler og bygging av outputstien i ADLS. Som eksempel på en konkret implementasjon viser GULP DummyApiExtractor, som henter JSON fra et offentlig test-API og skriver resultatet som Parquet til ADLS.

libs/manage-extract-and-load-configs: CLIet

Dette er grensesnittet utviklere bruker til daglig. Det installeres som et uv-verktøy og eksponerer to kommandoer:

manage-extract-and-load-configs upload --source-path bundles/extract_load/fixtures/extract_load_configs
manage-extract-and-load-configs destroy --source-path bundles/extract_load/fixtures/extract_load_configs

Configene støtter Jinja2-interpolering, slik at samme template kan bruke ulike verdier i ulike miljøer. For eksempel forskjellige servernavn i sandbox og produksjon. Miljøverdiene hentes fra environment_configs/<env>.yml, der aktivt miljø styres av miljøvariabelen EL_ENV.

Under panseret gjør upload tre ting i rekkefølge:

  1. Rendrer Jinja2-templaten med miljøvariabler fra environment_configs/<env>.yml
  2. Validerer resultatet mot ExtractLoadConfig-modellen
  3. Laster opp filen som JSON* til ADLS som <branch-navn>/<confignavn>.json

*Grunnen til at configene lagres som JSON er rett og slett fordi ADF ikke støtter å lese YAML filer. Ja, du leste rett...

bundles/extract_load: runtime-notebooken
Databricks bundle (DAB) inneholder én parametrisert notebook. Den tar fire inputparametere:

  • extract_load_config_name: navnet på configen som skal kjøres-
  • branch_name: hvilken branch i ADLS configen skal hentes fra (mer om dette i senere)
  • current_batch_timestamp: tidsstempelet for denne kjøringen
  • last_batch_timestamp: tidsstempelet for forrige kjøring

Og gjør deretter én ting: resolver extractor-klassen basert på config-typen og kjører den:

extractor_factory = {
DummyApiExtractConfig: DummyApiExtractor,
}
extract_load_config = ExtractLoadConfig.from_adls(
file_path=f"{branch_name}/{extract_load_config_name}"
)
extractor = extractor_factory[type(extract_load_config.spec.extract)](
extract_and_load_config=extract_load_config,
current_batch_timestamp=current_batch_timestamp,
last_batch_timestamp=last_batch_timestamp,
)
output_path = extractor.run()

Notebooken vet ingenting om hva som faktisk skjer inne i run(), for det er extractorens ansvar. Å støtte en ny kilde er derfor kun et spørsmål om å registrere en ny linje i extractor_factory.

Steg for steg: fra YAML-template til Parquet i landing

La oss nå se hvordan de tre delene spiller sammen i praksis: fra en ny config skrives til dataen ligger i landing.

1. Lag en YAML-config

Opprett bundles/extract_load/fixtures/extract_load_configs/source_x/el.source_x_dataset_y.yml:

kind: ExtractAndLoadConfig
metadata:
name: source_x_dataset_y
description: Extract and load config for source x dataset y
spec:
common:
source: source_x
dataset: dataset_y
extract:
kind: DummyApiExtractConfig
metadata:
name: source_x_dataset_y
description: Extract config for source x dataset y
spec:
feed: posts

For en SQL-kilde med Jinja-interpolering ser det slik ut:

kind: ExtractAndLoadConfig
metadata:
name: source_a_dataset_b
spec:
common:
source: source_a
dataset: dataset_b
extract:
kind: SqlExtractConfig
metadata:
name: source_a_dataset_b
spec:
server: "{{ source_a.dataset_b.server }}"
database: database_a
table: table_b

Placeholders som {{ source_a.dataset_b.server }} blir fylt inn fra environment_configs/sandbox.yml:

source_a:
dataset_b:
server: sandbox-sql.company.internal

2. Last opp med CLI (lokalt og som en del av CI/CD)

Denne kjøres lokalt under utvikling, men legges også til som en del av CI/CD oppsett for opplastning av configer til ADSL i ulike miljøer:

manage-extract-and-load-configs upload \
--source-path bundles/extract_load/fixtures/extract_load_configs
# Uploaded 2 files to container 'extract-load-configs'.
# Total bytes: 4096

Configen ligger nå i ADLS som:

# dev
extract-load-configs/
my_feature_branch/
el.source_x_dataset_y.json
el.source_a_dataset_b.json
# prod
extract-load-configs/
v0.0.1/
el.source_x_dataset_y.json
el.source_a_dataset_b.json

3. ADF og Databricks kjører

Akkurat som at Databricks kun inneholder én notebook, inneholder også orkestratoren vår, ADF, én master-pipeline. pl_master_extract_load_pipeline er inngangspunktet for all ingestion (extract + load). En ADF-trigger sparker den i gang med config-navnet som parameter, her el.source_x_dataset_y. Pipelinen leser denne configen fra ADLS, slår opp når denne configen ble kjørt sist (forrige batchtidsstempel), og ruter til enten en ADF Copy Activity (for kilder som har direkte støtte i ADF) eller Databricksnotebooken.

Det som holder det ærlig: validering og branchnavngivning

To designvalg fortjener ekstra oppmerksomhet.

Pydanticvalidering i CLIet

Det vanlige problemet med config-drevne pipelines er at feilen oppdages i produksjon, og ikke når konfigurasjonen skrives. GULP legger valideringen i CLI-laget. Før en config-fil noensinne når ADLS, rendres den og parses av Pydantic. Mangler et påkrevd felt, er typen feil, eller er kind ugyldig, får du feilmeldingen lokalt:

1 validation error for ExtractLoadConfigspec.extract.kind
Input should be 'SqlExtractConfig' or 'DummyApiExtractConfig'

Det er ingen runtimeoverraskelser for noe som kunne vært fanget opp lokalt.

Branchbasert navngivning i ADLS

Husker du branch_name-parameteren fra notebooken? Her er grunnen til at den er der. Configfiler lagres under <branch-navn>/ i ADLS. Det betyr at du kan laste opp en config fra en feature-branch, teste den i Databricks med branch_name=min-feature-branch, og ikke berøre noe som kjører fra main. Opprydding skjer med destroy-kommandoen.

Legge til en ny extract-type på ~20 linjer

La oss si at du vil hente data fra en filserver. Du trenger tre ting.

1. Definer config-modellen i libs/extract-and-load/src/extract_and_load/models/extract_config.py:

class SftpExtractConfig(BaseModel):
kind: Literal["SftpExtractConfig"]
metadata: Metadata
class Spec(BaseModel):
host: str
path: str
spec: Spec

2. Legg den til i den diskriminerte unionen:

ExtractConfig = Annotated[
SqlExtractConfig | DummyApiExtractConfig | SftpExtractConfig,
Field(discriminator="kind"),
]

3. Skriv en extractor som arver BaseExtractor og implementerer run().

4. Registrer den i notebookens factory-dict:

extractor_factory = {
DummyApiExtractConfig: DummyApiExtractor,
SftpExtractConfig: SftpExtractor,
}

Det er det. Resten av rammeverket: CLI-validering, ADLS-opplasting, ADF-orkestrering, batchtidsstempler fungerer uforandret.

Begrensninger og veien videre

GULP v0.0.1 er et fundament, ikke et ferdig produkt. Noen åpne punkter:

  • LoadConfig er en generisk plassholder. Typede load-typer er ikke implementert ennå.
  • Ingen automatiserte tester er inkludert i denne utgivelsen.
  • Feilhåndtering og varsling ved kjøretidsfeil i Databricks er ikke spesifisert i rammeverket.

Kodebasen er åpen kildekode under MIT-lisens og tilgjengelig på GitHub. Alle kommentarer og bidrag mottas med åpne armer!

Rammeverket der du kun trenger én pipeline for alle dataintegrasjoneen dine - slik bygde jeg GULP