
Rammeverket der du kun trenger én pipeline for alle dataintegrasjoneen dine - slik bygde jeg GULP
Hos altfor mange datateam ser ingestionlaget omtrent slik ut: én notebook per datasett, én pipeline per kilde, og en vedlikeholdsbyrde som vokser proporsjonalt med antall integrasjoner. Etter hvert som teamet skalerer, er det ikke uvanlig å ha 30–50 notebooks som er 90% identiske, men med noe ulike løsninger ut ifra hvem på teamet som bygde integrasjonen. Her er også servernavn, API-endepunkter, tabeller og SQL-spørringer hardkodet inn slik at det er vanskelig å få oversikt over ulike parameterne som styrer den faktiske integrasjonen. Når noe feiler i produksjon, og du som Data Engineer må inn i koden for å debugge blir standardspørsmålet: "Okey, hvordan var det vi hadde løst det her?". Dette skaper store kognitive laster vi gjerne kunne vært foruten.
Som Hermine Grang sa så bra:
"There must be another way!"
Derfor lagde jeg GULP = Generic Unified Load Pipeline (repolink), som er mitt forsøk på å rydde opp i dette rotet. Ideen er enkel: ha én statisk pipeline, og la konfigurasjonen gjøre jobben.
Obs: Dette rammeverket er bygget med Databricks, Azure Data Factory og Azure Data Lake Storage som stack, men akkurat de samme prinsippene kan benyttes for andre teknologier også.
Problemet: ingestionspredning
La oss si at du henter data fra fem SQL-kilder, to REST-APIer og en filserver. Med en tradisjonell tilnærming ender du opp med noe som dette:
notebooks/extract_source_a_dataset_b.py # kopiér-lim fra forrigeextract_source_a_dataset_c.pyextract_source_b_dataset_x.pyextract_source_b_dataset_y.py...
Når du skal endre noe logikk som brukes på tvers, må du inn i alle notebooks. Hvordan sikrer du at du løser det likt overalt? Og hvordan sørger du for at alle notebooks faktisk blir oppdatert? Når du legger til en ny kilde, kopierer du en eksisterende og håper du husker alle stedene du må bytte ut navn og verdier. Konfigurasjonen sitter i koden, og koden er konfigurasjonen.
Det fundamentale problemet er at runtimelogikk og datasettkonfigurasjonen er vevd sammen. GULP skiller dem.
Hva mener vi med extract og load?
Før vi ser på løsningen er det verdt å definere begrepene vi bruker.
Extract er steget der data hentes fra en kilde, for eksempel en SQL-database, et REST-API eller en filserver. Ingestion skjer typisk inkrementelt: istedenfor å hente alt hver gang, henter vi kun data som er nyere enn forrige kjøring. For å styre dette bruker vi to tidsstempler last_batch_timestamp (når forrige kjøring skjedde) og current_batch_timestamp (nåværende kjøring) slik at extractoren vet nøyaktig hvilket tidsvindu den skal hente data for. Resultatet skrives til landingssonen i data laken, partisjonert på batchtidsstempel:
landing/<source>/<dataset>/batch_timestamp=<tidsstempel>/<uuid>.parquet
Load er steget der de landede rådataene skrives som en tabell i dataplattformen din.
Det som bevisst mangler her er et transformsteg. GULP handler om å få rådata trygt inn i plattformen din. Transformasjoner hører hjemme nedstrøms, i et separat lag. Dette holder rammeverket smalt og lett å resonnere rundt.
En ExtractLoadConfig følger et deklarativt tankesett: den beskriver hva som skal skje: hvilken kilde, hvilket datasett, hvilken tabell som skal lastes, og ikke hvordan det teknisk gjennomføres. Det er runtimens ansvar.
Ideen illustrert med ett diagram
Hele rammeverket kan oppsummeres i én flyt:
Utvikleren skriver en YAML-config og laster den opp. ADF- og Databricks-runtimen vet ingenting om hvilken kilde de kjører mot, men det vet konfigurasjonen. En pipeline, mange dataintegrasjoner.
De tre delene
GULP er bygget av tre komponenter som kan forstås uavhengig av hverandre.
libs/extract-and-load: modellene
Dette biblioteket definerer alle typede config-modeller med Pydantic v2. Toppnivåmodellen er ExtractLoadConfig:
class ExtractLoadConfig(BaseConfig):kind: Literal["ExtractLoadConfig"]metadata: Metadataspec: ExtractLoadSpecclass ExtractLoadSpec()common: Common # informasjon som brukes på tvers av extract og loadextract: ExtractConfig # informasjon som kun brukes av extractload: LoadConfig # informasjon som kun brukes av load
ExtractConfig er en diskriminert union (ref) der kind-feltet bestemmer hvilken implementasjon som brukes:
ExtractConfig = Annotated[SqlExtractConfig| DummyApiExtractConfig,Field(discriminator="kind"),]
Biblioteket inneholder også BaseExtractor, som er en abstrakt klasse som håndhever felles logikk for alle extractorer, som validering av tidsstempler og bygging av outputstien i ADLS. Som eksempel på en konkret implementasjon viser GULP DummyApiExtractor, som henter JSON fra et offentlig test-API og skriver resultatet som Parquet til ADLS.
libs/manage-extract-and-load-configs: CLIet
Dette er grensesnittet utviklere bruker til daglig. Det installeres som et uv-verktøy og eksponerer to kommandoer:
manage-extract-and-load-configs upload --source-path bundles/extract_load/fixtures/extract_load_configsmanage-extract-and-load-configs destroy --source-path bundles/extract_load/fixtures/extract_load_configs
Configene støtter Jinja2-interpolering, slik at samme template kan bruke ulike verdier i ulike miljøer. For eksempel forskjellige servernavn i sandbox og produksjon. Miljøverdiene hentes fra environment_configs/<env>.yml, der aktivt miljø styres av miljøvariabelen EL_ENV.
Under panseret gjør upload tre ting i rekkefølge:
- Rendrer Jinja2-templaten med miljøvariabler fra
environment_configs/<env>.yml - Validerer resultatet mot
ExtractLoadConfig-modellen - Laster opp filen som JSON* til ADLS som
<branch-navn>/<confignavn>.json
*Grunnen til at configene lagres som JSON er rett og slett fordi ADF ikke støtter å lese YAML filer. Ja, du leste rett...
bundles/extract_load: runtime-notebooken
Databricks bundle (DAB) inneholder én parametrisert notebook. Den tar fire inputparametere:
extract_load_config_name: navnet på configen som skal kjøres-branch_name: hvilken branch i ADLS configen skal hentes fra (mer om dette i senere)current_batch_timestamp: tidsstempelet for denne kjøringenlast_batch_timestamp: tidsstempelet for forrige kjøring
Og gjør deretter én ting: resolver extractor-klassen basert på config-typen og kjører den:
extractor_factory = {DummyApiExtractConfig: DummyApiExtractor,}extract_load_config = ExtractLoadConfig.from_adls(file_path=f"{branch_name}/{extract_load_config_name}")extractor = extractor_factory[type(extract_load_config.spec.extract)](extract_and_load_config=extract_load_config,current_batch_timestamp=current_batch_timestamp,last_batch_timestamp=last_batch_timestamp,)output_path = extractor.run()
Notebooken vet ingenting om hva som faktisk skjer inne i run(), for det er extractorens ansvar. Å støtte en ny kilde er derfor kun et spørsmål om å registrere en ny linje i extractor_factory.
Steg for steg: fra YAML-template til Parquet i landing
La oss nå se hvordan de tre delene spiller sammen i praksis: fra en ny config skrives til dataen ligger i landing.
1. Lag en YAML-config
Opprett bundles/extract_load/fixtures/extract_load_configs/source_x/el.source_x_dataset_y.yml:
kind: ExtractAndLoadConfigmetadata:name: source_x_dataset_ydescription: Extract and load config for source x dataset yspec:common:source: source_xdataset: dataset_yextract:kind: DummyApiExtractConfigmetadata:name: source_x_dataset_ydescription: Extract config for source x dataset yspec:feed: posts
For en SQL-kilde med Jinja-interpolering ser det slik ut:
kind: ExtractAndLoadConfigmetadata:name: source_a_dataset_bspec:common:source: source_adataset: dataset_bextract:kind: SqlExtractConfigmetadata:name: source_a_dataset_bspec:server: "{{ source_a.dataset_b.server }}"database: database_atable: table_b
Placeholders som {{ source_a.dataset_b.server }} blir fylt inn fra environment_configs/sandbox.yml:
source_a:dataset_b:server: sandbox-sql.company.internal
2. Last opp med CLI (lokalt og som en del av CI/CD)
Denne kjøres lokalt under utvikling, men legges også til som en del av CI/CD oppsett for opplastning av configer til ADSL i ulike miljøer:
manage-extract-and-load-configs upload \--source-path bundles/extract_load/fixtures/extract_load_configs# Uploaded 2 files to container 'extract-load-configs'.# Total bytes: 4096
Configen ligger nå i ADLS som:
# devextract-load-configs/my_feature_branch/el.source_x_dataset_y.jsonel.source_a_dataset_b.json# prodextract-load-configs/v0.0.1/el.source_x_dataset_y.jsonel.source_a_dataset_b.json
3. ADF og Databricks kjører
Akkurat som at Databricks kun inneholder én notebook, inneholder også orkestratoren vår, ADF, én master-pipeline. pl_master_extract_load_pipeline er inngangspunktet for all ingestion (extract + load). En ADF-trigger sparker den i gang med config-navnet som parameter, her el.source_x_dataset_y. Pipelinen leser denne configen fra ADLS, slår opp når denne configen ble kjørt sist (forrige batchtidsstempel), og ruter til enten en ADF Copy Activity (for kilder som har direkte støtte i ADF) eller Databricksnotebooken.
Det som holder det ærlig: validering og branchnavngivning
To designvalg fortjener ekstra oppmerksomhet.
Pydanticvalidering i CLIet
Det vanlige problemet med config-drevne pipelines er at feilen oppdages i produksjon, og ikke når konfigurasjonen skrives. GULP legger valideringen i CLI-laget. Før en config-fil noensinne når ADLS, rendres den og parses av Pydantic. Mangler et påkrevd felt, er typen feil, eller er kind ugyldig, får du feilmeldingen lokalt:
1 validation error for ExtractLoadConfigspec.extract.kindInput should be 'SqlExtractConfig' or 'DummyApiExtractConfig'
Det er ingen runtimeoverraskelser for noe som kunne vært fanget opp lokalt.
Branchbasert navngivning i ADLS
Husker du branch_name-parameteren fra notebooken? Her er grunnen til at den er der. Configfiler lagres under <branch-navn>/ i ADLS. Det betyr at du kan laste opp en config fra en feature-branch, teste den i Databricks med branch_name=min-feature-branch, og ikke berøre noe som kjører fra main. Opprydding skjer med destroy-kommandoen.
Legge til en ny extract-type på ~20 linjer
La oss si at du vil hente data fra en filserver. Du trenger tre ting.
1. Definer config-modellen i libs/extract-and-load/src/extract_and_load/models/extract_config.py:
class SftpExtractConfig(BaseModel):kind: Literal["SftpExtractConfig"]metadata: Metadataclass Spec(BaseModel):host: strpath: strspec: Spec
2. Legg den til i den diskriminerte unionen:
ExtractConfig = Annotated[SqlExtractConfig | DummyApiExtractConfig | SftpExtractConfig,Field(discriminator="kind"),]
3. Skriv en extractor som arver BaseExtractor og implementerer run().
4. Registrer den i notebookens factory-dict:
extractor_factory = {DummyApiExtractConfig: DummyApiExtractor,SftpExtractConfig: SftpExtractor,}
Det er det. Resten av rammeverket: CLI-validering, ADLS-opplasting, ADF-orkestrering, batchtidsstempler fungerer uforandret.
Begrensninger og veien videre
GULP v0.0.1 er et fundament, ikke et ferdig produkt. Noen åpne punkter:
LoadConfiger en generisk plassholder. Typede load-typer er ikke implementert ennå.- Ingen automatiserte tester er inkludert i denne utgivelsen.
- Feilhåndtering og varsling ved kjøretidsfeil i Databricks er ikke spesifisert i rammeverket.
Kodebasen er åpen kildekode under MIT-lisens og tilgjengelig på GitHub. Alle kommentarer og bidrag mottas med åpne armer!