Vai al catalogo
PIPELINE-DATI • Setup 24–72h su esempio

Pipeline

Dai file sparsi a dati affidabili: import, validazione, caricamento e report qualità. Ogni giorno, in automatico.

Richiedi stima Torna al catalogo
CodePulse Execution Environment
SCRIPT ENGINE (PYTHON)
import pandas as pd
from sqlalchemy import create_engine
import numpy as np

def execute_etl_pipeline(sales_csv_path, sql_conn_string):
    # 1. EXTRACT: Lettura massiva con Types pre-ottimizzati per risparmiare RAM
    dtypes = {'user_id': 'int32', 'total': 'float32', 'status': 'category'}
    df_sales = pd.read_csv(sales_csv_path, dtype=dtypes, engine='c')
    
    engine = create_engine(sql_conn_string)
    df_users = pd.read_sql("SELECT id, country, acqusition_channel FROM users", con=engine)
    
    # 2. TRANSFORM: Data Cleansing & Business Logic
    # Drop ordini annullati e valori Null
    df_clean = df_sales[df_sales['status'] == 'Shipped'].dropna(subset=['total'])
    
    # Riconciliazione Numerica e Normalizzazione stringhe
    df_clean['total'] = df_clean['total'].apply(lambda x: np.round(abs(x), 2))
    
    # Table JOINs 
    df_master = pd.merge(df_clean, df_users, left_on='user_id', right_on='id', how='left')
    
    # Creazione Aggregati Dimensionali (Data Warehouse)
    df_cube = df_master.groupby(['country', 'acqusition_channel']).agg({
        'total': ['sum', 'mean'],
        'user_id': 'count'
    }).reset_index()
    df_cube.columns = ['Paese', 'Sorgente', 'Vendite_Totali', 'Scontrino_Medio', 'Num_Ordini']
    
    # 3. LOAD: Scrittura in Bulk sul Data Warehouse (es. Snowflake o BigQuery)
    df_cube.to_sql('vw_analytics_sales_cube', con=engine, if_exists='replace', index=False, chunksize=10000)
    
    return f"Pipeline completata. {len(df_clean)} records validati e trasferiti."
FLUSSO DATI IN INGRESSO
db_ecommerce.sql (1.2 MLN Righe)
fornitori_esterni_dropship.csv (Mancano i prefissi +39)
costi_magazzino.json
MONITOR ATTIVITÀ
Sistema in standby. In attesa del comando di esecuzione...

Il problema

Dati da più fonti, formati incoerenti e controlli assenti: errori che si propagano nei report e nelle decisioni.

Cosa fa

  • Importa dati (CSV/Excel/JSON) e applica validazioni (schema, tipi, duplicati, range).
  • Trasforma e normalizza (mapping campi, codifiche, date, deduplica).
  • Carica su database o produce output puliti + report qualità (scarti, motivi, conteggi).

Esecuzione

Prima Import manuali e dati “quasi giusti”, difficili da fidarsi.
Dopo Pipeline ripetibile, controlli visibili e qualità misurabile nel tempo.

IO Schema

Input: File o export periodici, Regole di validazione/mapping
Output: DB o output pulito, Report qualità, Log

Pacchetti

Starter
€ 149
Import/clean base + output standard

  • Import/clean base
  • Output standard
Stima rapida
Business
€ 2.390
2–3 sorgenti + mapping complesso + deduplica + 30 gg supporto

  • 2–3 sorgenti
  • Mapping complesso
  • Deduplica
  • 30 gg supporto
Stima rapida
Su misura
da € 4.900
DB, schedulazione, versioning, environments, SLA

  • DB
  • Schedulazione
  • Versioning
  • Environments
  • SLA
Stima rapida
Pipeline Verifica gratuita
Dati alla mano