首页 > 基础资料 博客日记

数据仓库笔记 第六篇:PSA 层 SCD2 处理方式

2026-04-28 00:30:02基础资料围观1

这篇文章介绍了数据仓库笔记 第六篇:PSA 层 SCD2 处理方式,分享给大家做个参考,收藏极客资料网收获更多编程知识

数据仓库笔记 第六篇:PSA 层 SCD2 处理方式

摘要

SCD2(Slowly Changing Dimension Type 2) 是维度建模中处理历史变更的行业标准且最常用的方案。相比快照方式,SCD2只写入真正发生变化的数据行,通过 有效期 字段精确记录每条数据的生命周期,用极低的存储成本实现完整的历史追溯能力。这一篇用详细的代码解释其工作流程。

此笔记使用的数据库为SQLServer,相应的示例脚本都围绕于此,其它数据库的相应实现会略有不同。

《PSA 层(持久化暂存区)详解》 一文中,PSA层采用快照全量方式加载数据——每次ETL都将源表全量数据写入PSA,所有记录统一标记 psa_operation='I'。这种方式的优点是实现简单,适合数据量小、变更不频繁的场景。

但在实际生产环境中,快照方式的局限性很明显:

问题 说明
无法追踪变更历史 无法知道某条数据在哪一天从什么值变成了什么值
存储浪费严重 每次全量写入,数据量膨胀快

本篇将以PSA层的客户表、订单表、商品表为例,完整演示如何用SCD2方式重构PSA层。


SCD2 核心概念

Type 2 的三条核心规则

SCD2在每条历史记录上附加三个关键字段:

字段 类型 说明
start_date DATETIME 该版本数据的生效起始时间
end_date DATETIME 该版本数据的失效时间(9999-12-31 表示当前有效)
is_current CHAR(1) Y=当前版本,N=历史版本

三条变更规则:

┌─────────────────────────────────────────────────────────────┐
│                    SCD2 变更判定逻辑                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  新增(源表有,PSA无)                                      │
│  → 插入新行,start_date=当前时间,end_date=9999-12-31,     │
│    is_current='Y'                                           │
│                                                             │
│  属性变化(源表有,PSA有,但Checksum不同)                   │
│  → 旧行标记过期:end_date=当前时间-1秒,is_current='N'      │
│  → 插入新行:start_date=当前时间,end_date=9999-12-31,     │
│    is_current='Y'                                           │
│                                                             │
│  删除(源表无,PSA有)                                      │
│  → 旧行标记过期:end_date=当前时间,is_current='N'          │
│  → 不插入新行                                               │
│                                                             │
│  无变化(源表有,PSA有,Checksum相同)                       │
│  → 不做任何操作                                             │
└─────────────────────────────────────────────────────────────┘

通过什么来检测变化

数据仓库为了能够回溯数据的变更历史,识别源系统数据的变化是关键。但是源系统的表通常有好多列,变化检测是怎么做的呢?是一列一列的去做对比吗?

最常用的方法就是做Checksum数据校验,是SCD2变化检测的核心。每次ETL时,计算源表每行所有业务字段的MD5值:

-- 示例:张三的Checksum计算
CONVERT(VARCHAR(32), HASHBYTES('MD5',
    CONCAT(
        '张三',          -- customer_name
        '|', 'zhangsan@email.com',   -- email
        '|', '13800138001',          -- phone
        '|', '北京市朝阳区建国路88号', -- address
        ...
    )
), 2)
-- 结果:0xE8B2A5C3D9F1...  ← 32位MD5值

也就是会把需要检测的字段都揉成一列,然后转成MD5值,这样只要任何一个业务字段发生变化,MD5值就完全不同,从而触发SCD2变更流程,从而以更高效的方式识别出了源系统数据的变化。


PSA-SCD2 表结构设计

同第二篇里创建的PSA的三张表,这里我们创建带scd2后缀的表,以此跟第二篇里的表做出区分。

客户表 SCD2 结构

-- ============================================================
-- PSA-SCD2 客户表
-- 相比快照方式:增加 start_date / end_date / is_current
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.customers_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.customers_scd2;
GO

CREATE TABLE dbo.customers_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.customers',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    customer_id        VARCHAR(50) NOT NULL,
    customer_name      NVARCHAR(100) NOT NULL,
    email              VARCHAR(100) NULL,
    phone              VARCHAR(20) NULL,
    address            NVARCHAR(200) NULL,
    city               NVARCHAR(50) NULL,
    region             NVARCHAR(50) NULL,
    register_date      DATE NOT NULL,
    customer_type      VARCHAR(20) NOT NULL,
    is_active          BIT NULL,
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_customers_scd2_bk
    ON dbo.customers_scd2(customer_id, is_current);

CREATE NONCLUSTERED INDEX idx_customers_scd2_period
    ON dbo.customers_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_customers_scd2_batch
    ON dbo.customers_scd2(psa_batch_id);
GO

设计说明: SCD2表不设主键约束(因为同一业务键会有多条历史版本)。用 (customer_id, is_current='Y') 定位当前版本。

订单表 SCD2 结构

-- ============================================================
-- PSA-SCD2 订单表
-- 订单的主要变更是状态流转(pending→confirmed→shipped→cancelled)
-- SCD2能够完整记录订单的全生命周期
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.orders_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.orders_scd2;
GO

CREATE TABLE dbo.orders_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.orders',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    order_id           VARCHAR(50) NOT NULL,
    customer_id        VARCHAR(50) NOT NULL,
    product_id         VARCHAR(50) NOT NULL,
    order_date         DATE NOT NULL,
    quantity           INT NOT NULL,
    unit_price         DECIMAL(10,2) NOT NULL,
    total_amount       DECIMAL(12,2) NOT NULL,
    status             VARCHAR(20) NOT NULL,  -- pending/confirmed/shipped/cancelled
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_orders_scd2_bk
    ON dbo.orders_scd2(order_id, is_current);

CREATE NONCLUSTERED INDEX idx_orders_scd2_period
    ON dbo.orders_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_orders_scd2_batch
    ON dbo.orders_scd2(psa_batch_id);

CREATE NONCLUSTERED INDEX idx_orders_scd2_customer
    ON dbo.orders_scd2(customer_id, is_current);
GO

商品表 SCD2 结构

-- ============================================================
-- PSA-SCD2 商品表
-- 商品表变更多为价格调整、上下架、品类调整
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.products_scd2', 'U') IS NOT NULL
    DROP TABLE dbo.products_scd2;
GO

CREATE TABLE dbo.products_scd2 (
    -- PSA 系统字段
    psa_record_seq     BIGINT IDENTITY(1,1) NOT NULL,
    psa_load_time      DATETIME NOT NULL DEFAULT GETDATE(),
    psa_batch_id       VARCHAR(50) NOT NULL,
    psa_source_table   VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.products',
    psa_checksum       VARCHAR(32) NULL,

    -- SCD2 有效期字段
    start_date         DATETIME NOT NULL,
    end_date           DATETIME NOT NULL,
    is_current         CHAR(1) NOT NULL DEFAULT 'Y',

    -- 业务字段
    product_id         VARCHAR(50) NOT NULL,
    product_name       NVARCHAR(200) NOT NULL,
    category           NVARCHAR(50) NOT NULL,
    sub_category       NVARCHAR(50) NULL,
    brand              NVARCHAR(50) NULL,
    unit_cost          DECIMAL(10,2) NULL,
    unit_price         DECIMAL(10,2) NOT NULL,
    supplier_id        VARCHAR(50) NULL,
    is_active          BIT NULL,
    created_at         DATETIME NULL,
    updated_at         DATETIME NULL
);
GO

CREATE NONCLUSTERED INDEX idx_products_scd2_bk
    ON dbo.products_scd2(product_id, is_current);

CREATE NONCLUSTERED INDEX idx_products_scd2_period
    ON dbo.products_scd2(start_date, end_date);

CREATE NONCLUSTERED INDEX idx_products_scd2_batch
    ON dbo.products_scd2(psa_batch_id);
GO

SCD2 ETL 存储过程:客户表

从这里开始就进入到ETL的关键部分,代码会有点长,但每个代码的处理逻辑都是一样的。

完整实现

-- ============================================================
-- SCD2 加载存储过程:客户表
-- 使用 MERGE 语句一次完成 新增 / 更新 / 删除 的判定
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_customers_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_customers_scd2;
GO

CREATE PROCEDURE dbo.sp_load_customers_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @start_time DATETIME = GETDATE();
    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';

    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed   INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        -- 记录 ETL 开始
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'customers_scd2',
            @start_time, 'RUNNING'
        );

        -- ================================================
        -- SCD2 MERGE 语句
        -- 匹配逻辑:ON target(customer_id) = source(customer_id)
        --           AND target.is_current = 'Y'(只匹配当前版本)
        -- ================================================
        ;MERGE dbo.customers_scd2 AS target
        USING (
            SELECT
                customer_id,
                customer_name,
                email,
                phone,
                address,
                city,
                region,
                register_date,
                customer_type,
                is_active,
                created_at,
                updated_at,
                -- 计算业务字段的MD5校验和
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(customer_name, N''),
                        N'|', ISNULL(email, N''),
                        N'|', ISNULL(phone, N''),
                        N'|', ISNULL(address, N''),
                        N'|', ISNULL(city, N''),
                        N'|', ISNULL(region, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
                        N'|', ISNULL(customer_type, N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.customers
        ) AS source
        ON (
            target.customer_id = source.customer_id
            AND target.is_current = 'Y'
        )

        -- ② 属性变化 → 关闭旧版本
        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current = 'N'

        -- ③ 新增数据 → 插入新行
        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            customer_id, customer_name, email, phone,
            address, city, region, register_date,
            customer_type, is_active, created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.customers',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.customer_id,
            source.customer_name,
            source.email,
            source.phone,
            source.address,
            source.city,
            source.region,
            source.register_date,
            source.customer_type,
            source.is_active,
            source.created_at,
            source.updated_at
        )

        -- ④ 软删除(源表已删除)→ 关闭旧版本
        WHEN NOT MATCHED BY SOURCE
             AND target.is_current = 'Y'
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current = 'N';

        -- ================================================
        -- 插入【被关闭的旧记录对应的新版本数据】
        -- ================================================
        INSERT INTO dbo.customers_scd2 (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            customer_id, customer_name, email, phone,
            address, city, region, register_date,
            customer_type, is_active, created_at, updated_at
        )
        SELECT
            @batch_id,
            'business_db.dbo.customers',
            s.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            s.customer_id,
            s.customer_name,
            s.email,
            s.phone,
            s.address,
            s.city,
            s.region,
            s.register_date,
            s.customer_type,
            s.is_active,
            s.created_at,
            s.updated_at
        FROM (
            SELECT
                customer_id,
                customer_name,
                email,
                phone,
                address,
                city,
                region,
                register_date,
                customer_type,
                is_active,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(customer_name, N''),
                        N'|', ISNULL(email, N''),
                        N'|', ISNULL(phone, N''),
                        N'|', ISNULL(address, N''),
                        N'|', ISNULL(city, N''),
                        N'|', ISNULL(region, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
                        N'|', ISNULL(customer_type, N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.customers
        ) s
        -- 只插入:刚刚被 MERGE 关闭的旧记录(即发生更新的客户)
        INNER JOIN dbo.customers_scd2 t
            ON s.customer_id = t.customer_id
            AND t.is_current = 'N'
            AND t.end_date = DATEADD(SECOND, -1, @current_date)
        -- 避免重复插入
        WHERE NOT EXISTS (
            SELECT 1
            FROM dbo.customers_scd2
            WHERE customer_id = s.customer_id
            AND is_current = 'Y'
        );


        -- ================================================
        -- 汇总统计
        -- ================================================
        SELECT @rows_inserted = COUNT(*)
        FROM dbo.customers_scd2
        WHERE psa_batch_id = @batch_id;

        SELECT @rows_closed = COUNT(*)
        FROM dbo.customers_scd2
        WHERE end_date >= @current_date
          AND end_date < @far_future
          AND psa_batch_id <> @batch_id
          AND is_current = 'N';

        -- 更新 ETL 日志
        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            rows_inserted = ISNULL(@rows_inserted, 0),
            rows_updated = ISNULL(@rows_closed, 0),
            status = 'SUCCESS'
        WHERE batch_id = @batch_id
          AND table_name = 'customers_scd2';

        PRINT N'✅ 客户表 SCD2 加载完成';
        PRINT N'   本次新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
        PRINT N'   历史版本关闭: ' + ISNULL(CAST(@rows_closed AS VARCHAR), '0');

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();

        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            status = 'FAILED',
            error_message = @error_msg
        WHERE batch_id = @batch_id
          AND table_name = 'customers_scd2';

        THROW;
    END CATCH;
END;
GO

SCD2 ETL 存储过程:订单表

-- ============================================================
-- SCD2 加载存储过程:订单表
-- 订单的核心变化是状态流转(pending→confirmed→shipped/cancelled)
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_orders_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_orders_scd2;
GO

CREATE PROCEDURE dbo.sp_load_orders_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'orders_scd2',
            GETDATE(), 'RUNNING'
        );

        ;MERGE dbo.orders_scd2 AS target
        USING (
            SELECT
                order_id,
                customer_id,
                product_id,
                order_date,
                quantity,
                unit_price,
                total_amount,
                status,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(order_id, N''),
                        N'|', ISNULL(customer_id, N''),
                        N'|', ISNULL(product_id, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
                        N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
                        N'|', ISNULL(status, N'')
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.orders
        ) AS source
        ON (
            target.order_id = source.order_id
            AND target.is_current = 'Y'
        )

        -- 属性变化(订单状态变更)
        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current = 'N'

        -- 新增订单
        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            order_id, customer_id, product_id, order_date,
            quantity, unit_price, total_amount, status,
            created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.orders',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.order_id,
            source.customer_id,
            source.product_id,
            source.order_date,
            source.quantity,
            source.unit_price,
            source.total_amount,
            source.status,
            source.created_at,
            source.updated_at
        )

        -- 订单取消/删除
        WHEN NOT MATCHED BY SOURCE
             AND target.is_current = 'Y'
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current = 'N';

        -- =============================================
        -- 补插【数据更新后的新版本记录】
        -- =============================================
        INSERT INTO dbo.orders_scd2 (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            order_id, customer_id, product_id, order_date,
            quantity, unit_price, total_amount, status,
            created_at, updated_at
        )
        SELECT
            @batch_id,
            'business_db.dbo.orders',
            s.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            s.order_id,
            s.customer_id,
            s.product_id,
            s.order_date,
            s.quantity,
            s.unit_price,
            s.total_amount,
            s.status,
            s.created_at,
            s.updated_at
        FROM (
            SELECT
                order_id,
                customer_id,
                product_id,
                order_date,
                quantity,
                unit_price,
                total_amount,
                status,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(order_id, N''),
                        N'|', ISNULL(customer_id, N''),
                        N'|', ISNULL(product_id, N''),
                        N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
                        N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
                        N'|', ISNULL(status, N'')
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.orders
        ) s
        INNER JOIN dbo.orders_scd2 t
            ON s.order_id = t.order_id
            AND t.is_current = 'N'
            AND t.end_date = DATEADD(SECOND, -1, @current_date)
        WHERE NOT EXISTS (
            SELECT 1 FROM dbo.orders_scd2
            WHERE order_id = s.order_id AND is_current = 'Y'
        );


        SELECT @rows_inserted = COUNT(*)
        FROM dbo.orders_scd2
        WHERE psa_batch_id = @batch_id;

        SELECT @rows_closed = COUNT(*)
        FROM dbo.orders_scd2
        WHERE end_date >= @current_date
          AND end_date < @far_future
          AND psa_batch_id <> @batch_id
          AND is_current = 'N';

        UPDATE etl_db.dbo.etl_log
        SET
            end_time = GETDATE(),
            rows_inserted = ISNULL(@rows_inserted, 0),
            rows_updated = ISNULL(@rows_closed, 0),
            status = 'SUCCESS'
        WHERE batch_id = @batch_id
          AND table_name = 'orders_scd2';

        PRINT N'✅ 订单表 SCD2 加载完成';
        PRINT N'   新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();
        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), status = 'FAILED', error_message = @error_msg
        WHERE batch_id = @batch_id AND table_name = 'orders_scd2';
        THROW;
    END CATCH;
END;
GO

SCD2 ETL 存储过程:商品表

-- ============================================================
-- SCD2 加载存储过程:商品表
-- 商品表的变化包括:价格调整、品类调整、上下架
-- ============================================================

USE psa_db;
GO

IF OBJECT_ID('dbo.sp_load_products_scd2', 'P') IS NOT NULL
    DROP PROCEDURE dbo.sp_load_products_scd2;
GO

CREATE PROCEDURE dbo.sp_load_products_scd2
    @batch_id VARCHAR(50)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @current_date DATETIME = GETDATE();
    DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
    DECLARE @rows_inserted INT = 0;
    DECLARE @rows_closed INT = 0;
    DECLARE @error_msg NVARCHAR(MAX);

    BEGIN TRY
        INSERT INTO etl_db.dbo.etl_log (
            batch_id, layer_name, db_name, table_name,
            start_time, status
        )
        VALUES (
            @batch_id, 'psa_scd2', 'psa_db', 'products_scd2',
            GETDATE(), 'RUNNING'
        );

        ;MERGE dbo.products_scd2 AS target
        USING (
            SELECT
                product_id,
                product_name,
                category,
                sub_category,
                brand,
                unit_cost,
                unit_price,
                supplier_id,
                is_active,
                created_at,
                updated_at,
                CONVERT(VARCHAR(32), HASHBYTES('MD5',
                    CONCAT(
                        ISNULL(product_name, N''),
                        N'|', ISNULL(category, N''),
                        N'|', ISNULL(sub_category, N''),
                        N'|', ISNULL(brand, N''),
                        N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
                        N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
                        N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
                    )
                ), 2) AS psa_checksum
            FROM business_db.dbo.products
        ) AS source
        ON (
            target.product_id = source.product_id
            AND target.is_current = 'Y'
        )

        WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
        THEN UPDATE SET
            target.end_date   = DATEADD(SECOND, -1, @current_date),
            target.is_current = 'N'

        WHEN NOT MATCHED BY TARGET
        THEN INSERT (
            psa_batch_id, psa_source_table, psa_checksum,
            start_date, end_date, is_current,
            product_id, product_name, category, sub_category,
            brand, unit_cost, unit_price, supplier_id,
            is_active, created_at, updated_at
        ) VALUES (
            @batch_id,
            'business_db.dbo.products',
            source.psa_checksum,
            @current_date,
            @far_future,
            'Y',
            source.product_id,
            source.product_name,
            source.category,
            source.sub_category,
            source.brand,
            source.unit_cost,
            source.unit_price,
            source.supplier_id,
            source.is_active,
            source.created_at,
            source.updated_at
        )

        WHEN NOT MATCHED BY SOURCE
             AND target.is_current = 'Y'
        THEN UPDATE SET
            target.end_date   = @current_date,
            target.is_current = 'N';

        -- ================================================
		-- 插入【被关闭的旧记录对应的新版本数据】
		-- ================================================
		INSERT INTO dbo.products_scd2 (
			psa_batch_id, psa_source_table, psa_checksum,
			start_date, end_date, is_current,
			product_id, product_name, category, sub_category,
			brand, unit_cost, unit_price, supplier_id,
			is_active, created_at, updated_at
		)
		SELECT
			@batch_id,
			'business_db.dbo.products',
			s.psa_checksum,
			@current_date,
			@far_future,
			'Y',
			s.product_id,
			s.product_name,
			s.category,
			s.sub_category,
			s.brand,
			s.unit_cost,
			s.unit_price,
			s.supplier_id,
			s.is_active,
			s.created_at,
			s.updated_at
		FROM (
			SELECT
				product_id, product_name, category, sub_category,
				brand, unit_cost, unit_price, supplier_id,
				is_active, created_at, updated_at,
				CONVERT(VARCHAR(32), HASHBYTES('MD5',
					CONCAT(
						ISNULL(product_name, N''),
						N'|', ISNULL(category, N''),
						N'|', ISNULL(sub_category, N''),
						N'|', ISNULL(brand, N''),
						N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
						N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
						N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
						N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
					)
				), 2) AS psa_checksum
			FROM business_db.dbo.products
		) s
		INNER JOIN dbo.products_scd2 t
			ON s.product_id = t.product_id
			AND t.is_current = 'N'
			AND t.end_date = DATEADD(SECOND, -1, @current_date)
		WHERE NOT EXISTS (
			SELECT 1
			FROM dbo.products_scd2
			WHERE product_id = s.product_id AND is_current = 'Y'
		);



        SELECT @rows_inserted = COUNT(*)
        FROM dbo.products_scd2
        WHERE psa_batch_id = @batch_id;

        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'
        WHERE batch_id = @batch_id AND table_name = 'products_scd2';

        PRINT N'✅ 商品表 SCD2 加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR);

    END TRY
    BEGIN CATCH
        SET @error_msg = ERROR_MESSAGE();
        UPDATE etl_db.dbo.etl_log
        SET end_time = GETDATE(), status = 'FAILED', error_message = @error_msg
        WHERE batch_id = @batch_id AND table_name = 'products_scd2';
        THROW;
    END CATCH;
END;
GO

场景演示:完整 ETL 生命周期

Step 0:环境准备

确保已创建 business_db(业务库)和 psa_db(PSA库),具体表结构参见 PSA 层(持久化暂存区)详解。如果没运行添加测试数据的脚本需要提前运行一下,如果已经跟着那一篇运行过的话,先把业务表里的数据TRUNCATE掉,然后再重新生成测试数据,准备脚本如下:

-- 清理已有测试数据
TRUNCATE TABLE [business_db].[dbo].[customers]
TRUNCATE TABLE [business_db].[dbo].[orders]
TRUNCATE TABLE [business_db].[dbo].[products]

TRUNCATE TABLE [psa_db].[dbo].[customers_scd2]
TRUNCATE TABLE [psa_db].[dbo].[orders_scd2]
TRUNCATE TABLE [psa_db].[dbo].[products_scd2]


-- 插入模拟业务数据

USE business_db;
GO

-- 客户数据
INSERT INTO dbo.customers (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type, is_active)
VALUES
(N'C001', N'张三', 'zhangsan@email.com', '13800138001', N'北京市朝阳区建国路88号', N'北京', N'华北', '2023-01-15', 'individual', 1),
(N'C002', N'李四科技有限公司', 'lisi@company.com', '13800138002', N'上海市浦东新区张江路100号', N'上海', N'华东', '2023-03-20', 'enterprise', 1),
(N'C003', N'王五', 'wangwu@email.com', '13800138003', N'广州市天河区体育西路103号', N'广州', N'华南', '2023-06-10', 'individual', 1),
(N'C004', N'赵六集团', 'zhaoliu@group.com', '13800138004', N'深圳市南山区科技园南路10号', N'深圳', N'华南', '2023-08-05', 'enterprise', 1),
(N'C005', N'钱七', 'qianqi@email.com', '13800138005', N'杭州市西湖区文三路478号', N'杭州', N'华东', '2024-01-20', 'individual', 1);
GO

-- 商品数据
INSERT INTO dbo.products (product_id, product_name, category, sub_category, brand, unit_cost, unit_price, supplier_id, is_active)
VALUES
('P001', N'iPhone 15 Pro', N'电子产品', N'手机', N'Apple', 6500.00, 8999.00, 'S001', 1),
('P002', N'MacBook Pro 16', N'电子产品', N'笔记本', N'Apple', 15000.00, 19999.00, 'S001', 1),
('P003', N'AirPods Pro 2', N'电子产品', N'耳机', N'Apple', 1200.00, 1899.00, 'S001', 1),
('P004', N'华为Mate 60 Pro', N'电子产品', N'手机', N'华为', 4500.00, 6999.00, 'S002', 1),
('P005', N'戴森吸尘器V15', N'家用电器', N'吸尘器', N'Dyson', 2800.00, 4999.00, 'S003', 1),
('P006', N'索尼WH-1000XM5', N'电子产品', N'耳机', N'Sony', 1800.00, 2999.00, 'S004', 1),
('P007', N'小米空气净化器', N'家用电器', N'空气净化器', N'小米', 600.00, 1299.00, 'S005', 1),
('P008', N'iPad Air 5', N'电子产品', N'平板', N'Apple', 3200.00, 4799.00, 'S001', 1);
GO

-- 订单数据
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O001', 'C001', 'P001', '2024-01-15', 1, 8999.00, 8999.00, 'shipped'),
('O002', 'C002', 'P002', '2024-01-16', 5, 19999.00, 99995.00, 'confirmed'),
('O003', 'C001', 'P003', '2024-01-17', 2, 1899.00, 3798.00, 'pending'),
('O004', 'C003', 'P004', '2024-01-18', 1, 6999.00, 6999.00, 'shipped'),
('O005', 'C004', 'P005', '2024-01-19', 3, 4999.00, 14997.00, 'confirmed'),
('O006', 'C001', 'P002', '2024-01-20', 1, 19999.00, 19999.00, 'pending'),
('O007', 'C005', 'P006', '2024-01-21', 1, 2999.00, 2999.00, 'cancelled'),
('O008', 'C002', 'P001', '2024-01-22', 10, 8999.00, 89990.00, 'confirmed'),
('O009', 'C003', 'P007', '2024-01-23', 2, 1299.00, 2598.00, 'shipped'),
('O010', 'C004', 'P008', '2024-01-24', 4, 4799.00, 19196.00, 'pending');
GO

-- 验证数据
SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO

Step 1:初始加载(第一天)

-- ============================================================
-- 第一天:初始全量加载
-- ============================================================

DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240115_001';

PRINT N'========== 第一天初始加载 ==========';
PRINT N'批次号: ' + @batch_id;

EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;

第一天加载后客户表状态:

customer_id customer_name address start_date end_date is_current
C001 张三 北京市朝阳区建国路88号 2024-01-15 9999-12-31 Y
C002 李四科技有限公司 上海市浦东新区张江路100号 2024-01-15 9999-12-31 Y
C003 王五 广州市天河区体育西路103号 2024-01-15 9999-12-31 Y
C004 赵六集团 深圳市南山区科技园南路10号 2024-01-15 9999-12-31 Y
C005 钱七 杭州市西湖区文三路478号 2024-01-15 9999-12-31 Y

Step 2:模拟业务变更(第二天)

-- ============================================================
-- 第二天:模拟业务系统发生变更
-- ============================================================

USE business_db;
GO

-- 变更①:客户属性变化 — 张三搬家
UPDATE dbo.customers
SET address = N'北京市朝阳区望京SOHO',
    updated_at = GETDATE()
WHERE customer_id = 'C001';

-- 变更②:新增客户 — 孙八
INSERT INTO dbo.customers
    (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type)
VALUES
    (N'C006', N'孙八', 'sunba@email.com', '13800138006',
     N'成都市高新区天府大道888号', N'成都', N'西南', '2024-01-25', 'individual');

-- 变更③:客户注销(软删除)
UPDATE dbo.customers
SET is_active = 0,
    updated_at = GETDATE()
WHERE customer_id = 'C005';

-- 变更④:订单状态流转
UPDATE dbo.orders
SET status = 'shipped',
    updated_at = GETDATE()
WHERE order_id IN ('O003', 'O006');

-- 变更⑤:商品价格调整
UPDATE dbo.products
SET unit_price = 6499.00,
    updated_at = GETDATE()
WHERE product_id = 'P004';  -- 华为Mate 60 Pro 降价

-- 变更⑥:新订单
INSERT INTO dbo.orders
    (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
    ('O011', 'C001', 'P004', '2024-01-25', 2, 6499.00, 12998.00, 'pending');

业务变更汇总:

变更类型 业务键 变更内容
属性变化 C001 张三 地址:建国路88号 → 望京SOHO
新增客户 C006 孙八 新客户入账
软删除 C005 钱七 is_active:1 → 0
订单状态 O003/O006 pending → shipped
商品调价 P004 unit_price:6999 → 6499
新订单 O011 新订单

Step 3:第二天 ETL 加载

-- ============================================================
-- 第二天:SCD2增量加载
-- ============================================================

DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240125_002';

PRINT N'========== 第二天增量加载 ==========';
PRINT N'批次号: ' + @batch_id;

EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;

第二天加载后的完整版本链:

【客户 C001 — 张三】(属性变化)
├── V1: 建国路88号 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: 望京SOHO  | 2024-01-25 → 9999-12-31          | Y(当前)

【客户 C005 — 钱七】(软删除)
└── V1: 西湖区文三路478号 | 2024-01-15 → 2024-01-25 | N(已删除,无新版本)

【客户 C006 — 孙八】(新增)
└── V1: 天府大道888号 | 2024-01-25 → 9999-12-31 | Y(当前)

【订单 O003 — 状态流转】
├── V1: status=pending | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: status=shipped | 2024-01-25 → 9999-12-31          | Y(当前)

【商品 P004 — 华为Mate 60 Pro】(价格变更)
├── V1: unit_price=6999 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: unit_price=6499 | 2024-01-25 → 9999-12-31          | Y(当前)

【其余客户/订单/商品】→ 无变化,不产生新记录 ✅

查询方法

查询1:当前有效数据

-- 查询当前所有有效客户(去历史版本)
SELECT
    customer_id,
    customer_name,
    address,
    is_active,
    start_date AS effective_from
FROM psa_db.dbo.customers_scd2
WHERE is_current = 'Y'
ORDER BY customer_id;

查询2:任意时间点的快照

-- 查询2024-01-20各客户的地址(张三变更前的快照)
DECLARE @as_of_date DATETIME = '2024-01-20 23:59:59';

SELECT
    customer_id,
    customer_name,
    address,
    city
FROM psa_db.dbo.customers_scd2
WHERE start_date <= @as_of_date
  AND end_date   >  @as_of_date;

查询3:客户变更历史(完整版本链)

-- 查询C001的完整变更记录
SELECT
    customer_id,
    customer_name,
    address,
    customer_type,
    is_active,
    start_date,
    end_date,
    CASE
        WHEN end_date = '9999-12-31 23:59:59' THEN '🔵 当前版本'
        WHEN is_active = 0 THEN '🔴 已删除'
        ELSE '⚪ 历史版本'
    END AS version_status
FROM psa_db.dbo.customers_scd2
WHERE customer_id = 'C001'
ORDER BY start_date;

查询4:订单完整生命周期

-- 查询O003订单的完整状态流转
SELECT
    order_id,
    customer_id,
    product_id,
    quantity,
    total_amount,
    status,
    start_date AS 状态生效时间,
    end_date   AS 状态失效时间,
    CASE
        WHEN end_date = '9999-12-31 23:59:59' THEN '🔵 当前状态'
        ELSE '⚪ 已流转'
    END AS version_status
FROM psa_db.dbo.orders_scd2
WHERE order_id = 'O003'
ORDER BY start_date;

查询5:商品历史价格查询

-- 查询P004的历史价格变动
SELECT
    product_id,
    product_name,
    unit_price,
    brand,
    start_date AS 生效日期,
    CASE
        WHEN end_date = '9999-12-31 23:59:59' THEN '当前价格'
        ELSE CONVERT(VARCHAR(10), DATEADD(DAY, -1, end_date), 120)
    END AS 失效日期,
    CASE
        WHEN end_date = '9999-12-31 23:59:59' THEN '🔵'
        ELSE '⚪'
    END AS flag
FROM psa_db.dbo.products_scd2
WHERE product_id = 'P004'
ORDER BY start_date;

查询6:变更统计报表

-- 统计各批次的新增/变更/删除情况
SELECT
    batch_id,
    table_name,
    rows_inserted AS 新增或变更行数,
    rows_updated  AS 关闭的历史行数,
    status,
    CONVERT(VARCHAR(19), start_time, 120) AS 开始时间,
    CONVERT(VARCHAR(19), end_time, 120)   AS 结束时间,
    DATEDIFF(SECOND, start_time, end_time) AS 耗时_秒
FROM etl_db.dbo.etl_log
WHERE layer_name = 'psa_scd2'
ORDER BY start_time DESC;

快照方式 vs SCD2 方式:全面对比

维度 快照方式(原文) SCD2 方式(本文)
写入策略 每次全量 INSERT N 行 只写变化的行
存储模型 横向堆叠(N批次 = N×行数) 纵向版本链(只存变更)
变更追踪粒度 批次级别(只知道"某批次有变化") 行级别(精确知道"哪条数据变了")
变更内容 不可知 旧版本+新版本完整对比
历史查询 按批次过滤,不精准 start_date/end_date 精准区间
新增识别 psa_operation='I' is_current='Y' 且是首次插入
变化识别 无法识别 Checksum 不同触发 SCD2
删除识别 psa_operation='D'(需额外维护) 自然识别(NOT MATCHED BY SOURCE)
存储成本 O(n × batch_count) O(n × avg_change_count)
实现复杂度 低(INSERT SELECT) 中(MERGE + 有效期逻辑)
适用场景 小表、低频变更、快速原型 大表、频繁变更、审计合规
下游消费 简单(取最新批次) 需过滤 is_current='Y'
数据追溯 有限(批次维度) 完整(行+时间双维度)

总结

通过这一篇,可以看到SCD2总体的过程,以及在SQLServer下具体的实现。
SCD2 是数据仓库历史追踪的基石方案。掌握这一篇,数据仓库也更高效的方式具备了"时光机"——可以随时回溯到任意时间点,查看任意数据的真实状态。

BTW:
总结此篇的目的,一来把过去工作经验做一个总结,二来将来如果需要做同样的事情,借助AI工具,让AI工具参考此篇文章的URL,会帮助AI更高效准确的生成我想要的代码。


相关链接


文章来源:https://www.cnblogs.com/aspnetx/p/19940662
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云