首页 > 基础资料 博客日记
数据仓库笔记 第六篇:PSA 层 SCD2 处理方式
2026-04-28 00:30:02基础资料围观1次
数据仓库笔记 第六篇:PSA 层 SCD2 处理方式
摘要
SCD2(Slowly Changing Dimension Type 2) 是维度建模中处理历史变更的行业标准且最常用的方案。相比快照方式,SCD2只写入真正发生变化的数据行,通过 有效期 字段精确记录每条数据的生命周期,用极低的存储成本实现完整的历史追溯能力。这一篇用详细的代码解释其工作流程。
此笔记使用的数据库为SQLServer,相应的示例脚本都围绕于此,其它数据库的相应实现会略有不同。
在 《PSA 层(持久化暂存区)详解》 一文中,PSA层采用快照全量方式加载数据——每次ETL都将源表全量数据写入PSA,所有记录统一标记 psa_operation='I'。这种方式的优点是实现简单,适合数据量小、变更不频繁的场景。
但在实际生产环境中,快照方式的局限性很明显:
| 问题 | 说明 |
|---|---|
| 无法追踪变更历史 | 无法知道某条数据在哪一天从什么值变成了什么值 |
| 存储浪费严重 | 每次全量写入,数据量膨胀快 |
本篇将以PSA层的客户表、订单表、商品表为例,完整演示如何用SCD2方式重构PSA层。
SCD2 核心概念
Type 2 的三条核心规则
SCD2在每条历史记录上附加三个关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
start_date |
DATETIME | 该版本数据的生效起始时间 |
end_date |
DATETIME | 该版本数据的失效时间(9999-12-31 表示当前有效) |
is_current |
CHAR(1) | Y=当前版本,N=历史版本 |
三条变更规则:
┌─────────────────────────────────────────────────────────────┐
│ SCD2 变更判定逻辑 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 新增(源表有,PSA无) │
│ → 插入新行,start_date=当前时间,end_date=9999-12-31, │
│ is_current='Y' │
│ │
│ 属性变化(源表有,PSA有,但Checksum不同) │
│ → 旧行标记过期:end_date=当前时间-1秒,is_current='N' │
│ → 插入新行:start_date=当前时间,end_date=9999-12-31, │
│ is_current='Y' │
│ │
│ 删除(源表无,PSA有) │
│ → 旧行标记过期:end_date=当前时间,is_current='N' │
│ → 不插入新行 │
│ │
│ 无变化(源表有,PSA有,Checksum相同) │
│ → 不做任何操作 │
└─────────────────────────────────────────────────────────────┘
通过什么来检测变化
数据仓库为了能够回溯数据的变更历史,识别源系统数据的变化是关键。但是源系统的表通常有好多列,变化检测是怎么做的呢?是一列一列的去做对比吗?
最常用的方法就是做Checksum数据校验,是SCD2变化检测的核心。每次ETL时,计算源表每行所有业务字段的MD5值:
-- 示例:张三的Checksum计算
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
'张三', -- customer_name
'|', 'zhangsan@email.com', -- email
'|', '13800138001', -- phone
'|', '北京市朝阳区建国路88号', -- address
...
)
), 2)
-- 结果:0xE8B2A5C3D9F1... ← 32位MD5值
也就是会把需要检测的字段都揉成一列,然后转成MD5值,这样只要任何一个业务字段发生变化,MD5值就完全不同,从而触发SCD2变更流程,从而以更高效的方式识别出了源系统数据的变化。
PSA-SCD2 表结构设计
同第二篇里创建的PSA的三张表,这里我们创建带scd2后缀的表,以此跟第二篇里的表做出区分。
客户表 SCD2 结构
-- ============================================================
-- PSA-SCD2 客户表
-- 相比快照方式:增加 start_date / end_date / is_current
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.customers_scd2', 'U') IS NOT NULL
DROP TABLE dbo.customers_scd2;
GO
CREATE TABLE dbo.customers_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.customers',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
customer_id VARCHAR(50) NOT NULL,
customer_name NVARCHAR(100) NOT NULL,
email VARCHAR(100) NULL,
phone VARCHAR(20) NULL,
address NVARCHAR(200) NULL,
city NVARCHAR(50) NULL,
region NVARCHAR(50) NULL,
register_date DATE NOT NULL,
customer_type VARCHAR(20) NOT NULL,
is_active BIT NULL,
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_customers_scd2_bk
ON dbo.customers_scd2(customer_id, is_current);
CREATE NONCLUSTERED INDEX idx_customers_scd2_period
ON dbo.customers_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_customers_scd2_batch
ON dbo.customers_scd2(psa_batch_id);
GO
设计说明: SCD2表不设主键约束(因为同一业务键会有多条历史版本)。用
(customer_id, is_current='Y')定位当前版本。
订单表 SCD2 结构
-- ============================================================
-- PSA-SCD2 订单表
-- 订单的主要变更是状态流转(pending→confirmed→shipped→cancelled)
-- SCD2能够完整记录订单的全生命周期
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.orders_scd2', 'U') IS NOT NULL
DROP TABLE dbo.orders_scd2;
GO
CREATE TABLE dbo.orders_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.orders',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
order_id VARCHAR(50) NOT NULL,
customer_id VARCHAR(50) NOT NULL,
product_id VARCHAR(50) NOT NULL,
order_date DATE NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_amount DECIMAL(12,2) NOT NULL,
status VARCHAR(20) NOT NULL, -- pending/confirmed/shipped/cancelled
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_orders_scd2_bk
ON dbo.orders_scd2(order_id, is_current);
CREATE NONCLUSTERED INDEX idx_orders_scd2_period
ON dbo.orders_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_orders_scd2_batch
ON dbo.orders_scd2(psa_batch_id);
CREATE NONCLUSTERED INDEX idx_orders_scd2_customer
ON dbo.orders_scd2(customer_id, is_current);
GO
商品表 SCD2 结构
-- ============================================================
-- PSA-SCD2 商品表
-- 商品表变更多为价格调整、上下架、品类调整
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.products_scd2', 'U') IS NOT NULL
DROP TABLE dbo.products_scd2;
GO
CREATE TABLE dbo.products_scd2 (
-- PSA 系统字段
psa_record_seq BIGINT IDENTITY(1,1) NOT NULL,
psa_load_time DATETIME NOT NULL DEFAULT GETDATE(),
psa_batch_id VARCHAR(50) NOT NULL,
psa_source_table VARCHAR(100) NOT NULL DEFAULT 'business_db.dbo.products',
psa_checksum VARCHAR(32) NULL,
-- SCD2 有效期字段
start_date DATETIME NOT NULL,
end_date DATETIME NOT NULL,
is_current CHAR(1) NOT NULL DEFAULT 'Y',
-- 业务字段
product_id VARCHAR(50) NOT NULL,
product_name NVARCHAR(200) NOT NULL,
category NVARCHAR(50) NOT NULL,
sub_category NVARCHAR(50) NULL,
brand NVARCHAR(50) NULL,
unit_cost DECIMAL(10,2) NULL,
unit_price DECIMAL(10,2) NOT NULL,
supplier_id VARCHAR(50) NULL,
is_active BIT NULL,
created_at DATETIME NULL,
updated_at DATETIME NULL
);
GO
CREATE NONCLUSTERED INDEX idx_products_scd2_bk
ON dbo.products_scd2(product_id, is_current);
CREATE NONCLUSTERED INDEX idx_products_scd2_period
ON dbo.products_scd2(start_date, end_date);
CREATE NONCLUSTERED INDEX idx_products_scd2_batch
ON dbo.products_scd2(psa_batch_id);
GO
SCD2 ETL 存储过程:客户表
从这里开始就进入到ETL的关键部分,代码会有点长,但每个代码的处理逻辑都是一样的。
完整实现
-- ============================================================
-- SCD2 加载存储过程:客户表
-- 使用 MERGE 语句一次完成 新增 / 更新 / 删除 的判定
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_customers_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_customers_scd2;
GO
CREATE PROCEDURE dbo.sp_load_customers_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @start_time DATETIME = GETDATE();
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
-- 记录 ETL 开始
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'customers_scd2',
@start_time, 'RUNNING'
);
-- ================================================
-- SCD2 MERGE 语句
-- 匹配逻辑:ON target(customer_id) = source(customer_id)
-- AND target.is_current = 'Y'(只匹配当前版本)
-- ================================================
;MERGE dbo.customers_scd2 AS target
USING (
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
region,
register_date,
customer_type,
is_active,
created_at,
updated_at,
-- 计算业务字段的MD5校验和
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(customer_name, N''),
N'|', ISNULL(email, N''),
N'|', ISNULL(phone, N''),
N'|', ISNULL(address, N''),
N'|', ISNULL(city, N''),
N'|', ISNULL(region, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
N'|', ISNULL(customer_type, N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.customers
) AS source
ON (
target.customer_id = source.customer_id
AND target.is_current = 'Y'
)
-- ② 属性变化 → 关闭旧版本
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current = 'N'
-- ③ 新增数据 → 插入新行
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
customer_id, customer_name, email, phone,
address, city, region, register_date,
customer_type, is_active, created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.customers',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.customer_id,
source.customer_name,
source.email,
source.phone,
source.address,
source.city,
source.region,
source.register_date,
source.customer_type,
source.is_active,
source.created_at,
source.updated_at
)
-- ④ 软删除(源表已删除)→ 关闭旧版本
WHEN NOT MATCHED BY SOURCE
AND target.is_current = 'Y'
THEN UPDATE SET
target.end_date = @current_date,
target.is_current = 'N';
-- ================================================
-- 插入【被关闭的旧记录对应的新版本数据】
-- ================================================
INSERT INTO dbo.customers_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
customer_id, customer_name, email, phone,
address, city, region, register_date,
customer_type, is_active, created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.customers',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.customer_id,
s.customer_name,
s.email,
s.phone,
s.address,
s.city,
s.region,
s.register_date,
s.customer_type,
s.is_active,
s.created_at,
s.updated_at
FROM (
SELECT
customer_id,
customer_name,
email,
phone,
address,
city,
region,
register_date,
customer_type,
is_active,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(customer_name, N''),
N'|', ISNULL(email, N''),
N'|', ISNULL(phone, N''),
N'|', ISNULL(address, N''),
N'|', ISNULL(city, N''),
N'|', ISNULL(region, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), register_date, 120), N''),
N'|', ISNULL(customer_type, N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.customers
) s
-- 只插入:刚刚被 MERGE 关闭的旧记录(即发生更新的客户)
INNER JOIN dbo.customers_scd2 t
ON s.customer_id = t.customer_id
AND t.is_current = 'N'
AND t.end_date = DATEADD(SECOND, -1, @current_date)
-- 避免重复插入
WHERE NOT EXISTS (
SELECT 1
FROM dbo.customers_scd2
WHERE customer_id = s.customer_id
AND is_current = 'Y'
);
-- ================================================
-- 汇总统计
-- ================================================
SELECT @rows_inserted = COUNT(*)
FROM dbo.customers_scd2
WHERE psa_batch_id = @batch_id;
SELECT @rows_closed = COUNT(*)
FROM dbo.customers_scd2
WHERE end_date >= @current_date
AND end_date < @far_future
AND psa_batch_id <> @batch_id
AND is_current = 'N';
-- 更新 ETL 日志
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
rows_inserted = ISNULL(@rows_inserted, 0),
rows_updated = ISNULL(@rows_closed, 0),
status = 'SUCCESS'
WHERE batch_id = @batch_id
AND table_name = 'customers_scd2';
PRINT N'✅ 客户表 SCD2 加载完成';
PRINT N' 本次新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
PRINT N' 历史版本关闭: ' + ISNULL(CAST(@rows_closed AS VARCHAR), '0');
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
status = 'FAILED',
error_message = @error_msg
WHERE batch_id = @batch_id
AND table_name = 'customers_scd2';
THROW;
END CATCH;
END;
GO
SCD2 ETL 存储过程:订单表
-- ============================================================
-- SCD2 加载存储过程:订单表
-- 订单的核心变化是状态流转(pending→confirmed→shipped/cancelled)
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_orders_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_orders_scd2;
GO
CREATE PROCEDURE dbo.sp_load_orders_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'orders_scd2',
GETDATE(), 'RUNNING'
);
;MERGE dbo.orders_scd2 AS target
USING (
SELECT
order_id,
customer_id,
product_id,
order_date,
quantity,
unit_price,
total_amount,
status,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(order_id, N''),
N'|', ISNULL(customer_id, N''),
N'|', ISNULL(product_id, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
N'|', ISNULL(status, N'')
)
), 2) AS psa_checksum
FROM business_db.dbo.orders
) AS source
ON (
target.order_id = source.order_id
AND target.is_current = 'Y'
)
-- 属性变化(订单状态变更)
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current = 'N'
-- 新增订单
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
order_id, customer_id, product_id, order_date,
quantity, unit_price, total_amount, status,
created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.orders',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.order_id,
source.customer_id,
source.product_id,
source.order_date,
source.quantity,
source.unit_price,
source.total_amount,
source.status,
source.created_at,
source.updated_at
)
-- 订单取消/删除
WHEN NOT MATCHED BY SOURCE
AND target.is_current = 'Y'
THEN UPDATE SET
target.end_date = @current_date,
target.is_current = 'N';
-- =============================================
-- 补插【数据更新后的新版本记录】
-- =============================================
INSERT INTO dbo.orders_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
order_id, customer_id, product_id, order_date,
quantity, unit_price, total_amount, status,
created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.orders',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.order_id,
s.customer_id,
s.product_id,
s.order_date,
s.quantity,
s.unit_price,
s.total_amount,
s.status,
s.created_at,
s.updated_at
FROM (
SELECT
order_id,
customer_id,
product_id,
order_date,
quantity,
unit_price,
total_amount,
status,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(order_id, N''),
N'|', ISNULL(customer_id, N''),
N'|', ISNULL(product_id, N''),
N'|', ISNULL(CONVERT(NVARCHAR(20), order_date, 120), N''),
N'|', ISNULL(CAST(quantity AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(total_amount AS NVARCHAR(20)), N''),
N'|', ISNULL(status, N'')
)
), 2) AS psa_checksum
FROM business_db.dbo.orders
) s
INNER JOIN dbo.orders_scd2 t
ON s.order_id = t.order_id
AND t.is_current = 'N'
AND t.end_date = DATEADD(SECOND, -1, @current_date)
WHERE NOT EXISTS (
SELECT 1 FROM dbo.orders_scd2
WHERE order_id = s.order_id AND is_current = 'Y'
);
SELECT @rows_inserted = COUNT(*)
FROM dbo.orders_scd2
WHERE psa_batch_id = @batch_id;
SELECT @rows_closed = COUNT(*)
FROM dbo.orders_scd2
WHERE end_date >= @current_date
AND end_date < @far_future
AND psa_batch_id <> @batch_id
AND is_current = 'N';
UPDATE etl_db.dbo.etl_log
SET
end_time = GETDATE(),
rows_inserted = ISNULL(@rows_inserted, 0),
rows_updated = ISNULL(@rows_closed, 0),
status = 'SUCCESS'
WHERE batch_id = @batch_id
AND table_name = 'orders_scd2';
PRINT N'✅ 订单表 SCD2 加载完成';
PRINT N' 新增/变更: ' + ISNULL(CAST(@rows_inserted AS VARCHAR), '0');
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), status = 'FAILED', error_message = @error_msg
WHERE batch_id = @batch_id AND table_name = 'orders_scd2';
THROW;
END CATCH;
END;
GO
SCD2 ETL 存储过程:商品表
-- ============================================================
-- SCD2 加载存储过程:商品表
-- 商品表的变化包括:价格调整、品类调整、上下架
-- ============================================================
USE psa_db;
GO
IF OBJECT_ID('dbo.sp_load_products_scd2', 'P') IS NOT NULL
DROP PROCEDURE dbo.sp_load_products_scd2;
GO
CREATE PROCEDURE dbo.sp_load_products_scd2
@batch_id VARCHAR(50)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @current_date DATETIME = GETDATE();
DECLARE @far_future DATETIME = '9999-12-31 23:59:59';
DECLARE @rows_inserted INT = 0;
DECLARE @rows_closed INT = 0;
DECLARE @error_msg NVARCHAR(MAX);
BEGIN TRY
INSERT INTO etl_db.dbo.etl_log (
batch_id, layer_name, db_name, table_name,
start_time, status
)
VALUES (
@batch_id, 'psa_scd2', 'psa_db', 'products_scd2',
GETDATE(), 'RUNNING'
);
;MERGE dbo.products_scd2 AS target
USING (
SELECT
product_id,
product_name,
category,
sub_category,
brand,
unit_cost,
unit_price,
supplier_id,
is_active,
created_at,
updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(product_name, N''),
N'|', ISNULL(category, N''),
N'|', ISNULL(sub_category, N''),
N'|', ISNULL(brand, N''),
N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.products
) AS source
ON (
target.product_id = source.product_id
AND target.is_current = 'Y'
)
WHEN MATCHED AND target.psa_checksum <> source.psa_checksum
THEN UPDATE SET
target.end_date = DATEADD(SECOND, -1, @current_date),
target.is_current = 'N'
WHEN NOT MATCHED BY TARGET
THEN INSERT (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at
) VALUES (
@batch_id,
'business_db.dbo.products',
source.psa_checksum,
@current_date,
@far_future,
'Y',
source.product_id,
source.product_name,
source.category,
source.sub_category,
source.brand,
source.unit_cost,
source.unit_price,
source.supplier_id,
source.is_active,
source.created_at,
source.updated_at
)
WHEN NOT MATCHED BY SOURCE
AND target.is_current = 'Y'
THEN UPDATE SET
target.end_date = @current_date,
target.is_current = 'N';
-- ================================================
-- 插入【被关闭的旧记录对应的新版本数据】
-- ================================================
INSERT INTO dbo.products_scd2 (
psa_batch_id, psa_source_table, psa_checksum,
start_date, end_date, is_current,
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at
)
SELECT
@batch_id,
'business_db.dbo.products',
s.psa_checksum,
@current_date,
@far_future,
'Y',
s.product_id,
s.product_name,
s.category,
s.sub_category,
s.brand,
s.unit_cost,
s.unit_price,
s.supplier_id,
s.is_active,
s.created_at,
s.updated_at
FROM (
SELECT
product_id, product_name, category, sub_category,
brand, unit_cost, unit_price, supplier_id,
is_active, created_at, updated_at,
CONVERT(VARCHAR(32), HASHBYTES('MD5',
CONCAT(
ISNULL(product_name, N''),
N'|', ISNULL(category, N''),
N'|', ISNULL(sub_category, N''),
N'|', ISNULL(brand, N''),
N'|', ISNULL(CAST(unit_cost AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(unit_price AS NVARCHAR(20)), N''),
N'|', ISNULL(CAST(supplier_id AS NVARCHAR(50)), N''),
N'|', CASE WHEN ISNULL(is_active, 0) = 1 THEN N'1' ELSE N'0' END
)
), 2) AS psa_checksum
FROM business_db.dbo.products
) s
INNER JOIN dbo.products_scd2 t
ON s.product_id = t.product_id
AND t.is_current = 'N'
AND t.end_date = DATEADD(SECOND, -1, @current_date)
WHERE NOT EXISTS (
SELECT 1
FROM dbo.products_scd2
WHERE product_id = s.product_id AND is_current = 'Y'
);
SELECT @rows_inserted = COUNT(*)
FROM dbo.products_scd2
WHERE psa_batch_id = @batch_id;
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), rows_inserted = @rows_inserted, status = 'SUCCESS'
WHERE batch_id = @batch_id AND table_name = 'products_scd2';
PRINT N'✅ 商品表 SCD2 加载完成,插入 ' + CAST(@rows_inserted AS VARCHAR);
END TRY
BEGIN CATCH
SET @error_msg = ERROR_MESSAGE();
UPDATE etl_db.dbo.etl_log
SET end_time = GETDATE(), status = 'FAILED', error_message = @error_msg
WHERE batch_id = @batch_id AND table_name = 'products_scd2';
THROW;
END CATCH;
END;
GO
场景演示:完整 ETL 生命周期
Step 0:环境准备
确保已创建
business_db(业务库)和psa_db(PSA库),具体表结构参见 PSA 层(持久化暂存区)详解。如果没运行添加测试数据的脚本需要提前运行一下,如果已经跟着那一篇运行过的话,先把业务表里的数据TRUNCATE掉,然后再重新生成测试数据,准备脚本如下:
-- 清理已有测试数据
TRUNCATE TABLE [business_db].[dbo].[customers]
TRUNCATE TABLE [business_db].[dbo].[orders]
TRUNCATE TABLE [business_db].[dbo].[products]
TRUNCATE TABLE [psa_db].[dbo].[customers_scd2]
TRUNCATE TABLE [psa_db].[dbo].[orders_scd2]
TRUNCATE TABLE [psa_db].[dbo].[products_scd2]
-- 插入模拟业务数据
USE business_db;
GO
-- 客户数据
INSERT INTO dbo.customers (customer_id, customer_name, email, phone, address, city, region, register_date, customer_type, is_active)
VALUES
(N'C001', N'张三', 'zhangsan@email.com', '13800138001', N'北京市朝阳区建国路88号', N'北京', N'华北', '2023-01-15', 'individual', 1),
(N'C002', N'李四科技有限公司', 'lisi@company.com', '13800138002', N'上海市浦东新区张江路100号', N'上海', N'华东', '2023-03-20', 'enterprise', 1),
(N'C003', N'王五', 'wangwu@email.com', '13800138003', N'广州市天河区体育西路103号', N'广州', N'华南', '2023-06-10', 'individual', 1),
(N'C004', N'赵六集团', 'zhaoliu@group.com', '13800138004', N'深圳市南山区科技园南路10号', N'深圳', N'华南', '2023-08-05', 'enterprise', 1),
(N'C005', N'钱七', 'qianqi@email.com', '13800138005', N'杭州市西湖区文三路478号', N'杭州', N'华东', '2024-01-20', 'individual', 1);
GO
-- 商品数据
INSERT INTO dbo.products (product_id, product_name, category, sub_category, brand, unit_cost, unit_price, supplier_id, is_active)
VALUES
('P001', N'iPhone 15 Pro', N'电子产品', N'手机', N'Apple', 6500.00, 8999.00, 'S001', 1),
('P002', N'MacBook Pro 16', N'电子产品', N'笔记本', N'Apple', 15000.00, 19999.00, 'S001', 1),
('P003', N'AirPods Pro 2', N'电子产品', N'耳机', N'Apple', 1200.00, 1899.00, 'S001', 1),
('P004', N'华为Mate 60 Pro', N'电子产品', N'手机', N'华为', 4500.00, 6999.00, 'S002', 1),
('P005', N'戴森吸尘器V15', N'家用电器', N'吸尘器', N'Dyson', 2800.00, 4999.00, 'S003', 1),
('P006', N'索尼WH-1000XM5', N'电子产品', N'耳机', N'Sony', 1800.00, 2999.00, 'S004', 1),
('P007', N'小米空气净化器', N'家用电器', N'空气净化器', N'小米', 600.00, 1299.00, 'S005', 1),
('P008', N'iPad Air 5', N'电子产品', N'平板', N'Apple', 3200.00, 4799.00, 'S001', 1);
GO
-- 订单数据
INSERT INTO dbo.orders (order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O001', 'C001', 'P001', '2024-01-15', 1, 8999.00, 8999.00, 'shipped'),
('O002', 'C002', 'P002', '2024-01-16', 5, 19999.00, 99995.00, 'confirmed'),
('O003', 'C001', 'P003', '2024-01-17', 2, 1899.00, 3798.00, 'pending'),
('O004', 'C003', 'P004', '2024-01-18', 1, 6999.00, 6999.00, 'shipped'),
('O005', 'C004', 'P005', '2024-01-19', 3, 4999.00, 14997.00, 'confirmed'),
('O006', 'C001', 'P002', '2024-01-20', 1, 19999.00, 19999.00, 'pending'),
('O007', 'C005', 'P006', '2024-01-21', 1, 2999.00, 2999.00, 'cancelled'),
('O008', 'C002', 'P001', '2024-01-22', 10, 8999.00, 89990.00, 'confirmed'),
('O009', 'C003', 'P007', '2024-01-23', 2, 1299.00, 2598.00, 'shipped'),
('O010', 'C004', 'P008', '2024-01-24', 4, 4799.00, 19196.00, 'pending');
GO
-- 验证数据
SELECT 'orders' AS table_name, COUNT(*) AS row_count FROM dbo.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM dbo.customers
UNION ALL
SELECT 'products', COUNT(*) FROM dbo.products;
GO
Step 1:初始加载(第一天)
-- ============================================================
-- 第一天:初始全量加载
-- ============================================================
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240115_001';
PRINT N'========== 第一天初始加载 ==========';
PRINT N'批次号: ' + @batch_id;
EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;
第一天加载后客户表状态:
| customer_id | customer_name | address | start_date | end_date | is_current |
|---|---|---|---|---|---|
| C001 | 张三 | 北京市朝阳区建国路88号 | 2024-01-15 | 9999-12-31 | Y |
| C002 | 李四科技有限公司 | 上海市浦东新区张江路100号 | 2024-01-15 | 9999-12-31 | Y |
| C003 | 王五 | 广州市天河区体育西路103号 | 2024-01-15 | 9999-12-31 | Y |
| C004 | 赵六集团 | 深圳市南山区科技园南路10号 | 2024-01-15 | 9999-12-31 | Y |
| C005 | 钱七 | 杭州市西湖区文三路478号 | 2024-01-15 | 9999-12-31 | Y |
Step 2:模拟业务变更(第二天)
-- ============================================================
-- 第二天:模拟业务系统发生变更
-- ============================================================
USE business_db;
GO
-- 变更①:客户属性变化 — 张三搬家
UPDATE dbo.customers
SET address = N'北京市朝阳区望京SOHO',
updated_at = GETDATE()
WHERE customer_id = 'C001';
-- 变更②:新增客户 — 孙八
INSERT INTO dbo.customers
(customer_id, customer_name, email, phone, address, city, region, register_date, customer_type)
VALUES
(N'C006', N'孙八', 'sunba@email.com', '13800138006',
N'成都市高新区天府大道888号', N'成都', N'西南', '2024-01-25', 'individual');
-- 变更③:客户注销(软删除)
UPDATE dbo.customers
SET is_active = 0,
updated_at = GETDATE()
WHERE customer_id = 'C005';
-- 变更④:订单状态流转
UPDATE dbo.orders
SET status = 'shipped',
updated_at = GETDATE()
WHERE order_id IN ('O003', 'O006');
-- 变更⑤:商品价格调整
UPDATE dbo.products
SET unit_price = 6499.00,
updated_at = GETDATE()
WHERE product_id = 'P004'; -- 华为Mate 60 Pro 降价
-- 变更⑥:新订单
INSERT INTO dbo.orders
(order_id, customer_id, product_id, order_date, quantity, unit_price, total_amount, status)
VALUES
('O011', 'C001', 'P004', '2024-01-25', 2, 6499.00, 12998.00, 'pending');
业务变更汇总:
| 变更类型 | 业务键 | 变更内容 |
|---|---|---|
| 属性变化 | C001 张三 | 地址:建国路88号 → 望京SOHO |
| 新增客户 | C006 孙八 | 新客户入账 |
| 软删除 | C005 钱七 | is_active:1 → 0 |
| 订单状态 | O003/O006 | pending → shipped |
| 商品调价 | P004 | unit_price:6999 → 6499 |
| 新订单 | O011 | 新订单 |
Step 3:第二天 ETL 加载
-- ============================================================
-- 第二天:SCD2增量加载
-- ============================================================
DECLARE @batch_id VARCHAR(50);
SET @batch_id = 'BATCH_SCD2_20240125_002';
PRINT N'========== 第二天增量加载 ==========';
PRINT N'批次号: ' + @batch_id;
EXEC psa_db.dbo.sp_load_customers_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_orders_scd2 @batch_id;
EXEC psa_db.dbo.sp_load_products_scd2 @batch_id;
第二天加载后的完整版本链:
【客户 C001 — 张三】(属性变化)
├── V1: 建国路88号 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: 望京SOHO | 2024-01-25 → 9999-12-31 | Y(当前)
【客户 C005 — 钱七】(软删除)
└── V1: 西湖区文三路478号 | 2024-01-15 → 2024-01-25 | N(已删除,无新版本)
【客户 C006 — 孙八】(新增)
└── V1: 天府大道888号 | 2024-01-25 → 9999-12-31 | Y(当前)
【订单 O003 — 状态流转】
├── V1: status=pending | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: status=shipped | 2024-01-25 → 9999-12-31 | Y(当前)
【商品 P004 — 华为Mate 60 Pro】(价格变更)
├── V1: unit_price=6999 | 2024-01-15 → 2024-01-24 23:59:59 | N(已过期)
└── V2: unit_price=6499 | 2024-01-25 → 9999-12-31 | Y(当前)
【其余客户/订单/商品】→ 无变化,不产生新记录 ✅
查询方法
查询1:当前有效数据
-- 查询当前所有有效客户(去历史版本)
SELECT
customer_id,
customer_name,
address,
is_active,
start_date AS effective_from
FROM psa_db.dbo.customers_scd2
WHERE is_current = 'Y'
ORDER BY customer_id;
查询2:任意时间点的快照
-- 查询2024-01-20各客户的地址(张三变更前的快照)
DECLARE @as_of_date DATETIME = '2024-01-20 23:59:59';
SELECT
customer_id,
customer_name,
address,
city
FROM psa_db.dbo.customers_scd2
WHERE start_date <= @as_of_date
AND end_date > @as_of_date;
查询3:客户变更历史(完整版本链)
-- 查询C001的完整变更记录
SELECT
customer_id,
customer_name,
address,
customer_type,
is_active,
start_date,
end_date,
CASE
WHEN end_date = '9999-12-31 23:59:59' THEN '🔵 当前版本'
WHEN is_active = 0 THEN '🔴 已删除'
ELSE '⚪ 历史版本'
END AS version_status
FROM psa_db.dbo.customers_scd2
WHERE customer_id = 'C001'
ORDER BY start_date;
查询4:订单完整生命周期
-- 查询O003订单的完整状态流转
SELECT
order_id,
customer_id,
product_id,
quantity,
total_amount,
status,
start_date AS 状态生效时间,
end_date AS 状态失效时间,
CASE
WHEN end_date = '9999-12-31 23:59:59' THEN '🔵 当前状态'
ELSE '⚪ 已流转'
END AS version_status
FROM psa_db.dbo.orders_scd2
WHERE order_id = 'O003'
ORDER BY start_date;
查询5:商品历史价格查询
-- 查询P004的历史价格变动
SELECT
product_id,
product_name,
unit_price,
brand,
start_date AS 生效日期,
CASE
WHEN end_date = '9999-12-31 23:59:59' THEN '当前价格'
ELSE CONVERT(VARCHAR(10), DATEADD(DAY, -1, end_date), 120)
END AS 失效日期,
CASE
WHEN end_date = '9999-12-31 23:59:59' THEN '🔵'
ELSE '⚪'
END AS flag
FROM psa_db.dbo.products_scd2
WHERE product_id = 'P004'
ORDER BY start_date;
查询6:变更统计报表
-- 统计各批次的新增/变更/删除情况
SELECT
batch_id,
table_name,
rows_inserted AS 新增或变更行数,
rows_updated AS 关闭的历史行数,
status,
CONVERT(VARCHAR(19), start_time, 120) AS 开始时间,
CONVERT(VARCHAR(19), end_time, 120) AS 结束时间,
DATEDIFF(SECOND, start_time, end_time) AS 耗时_秒
FROM etl_db.dbo.etl_log
WHERE layer_name = 'psa_scd2'
ORDER BY start_time DESC;
快照方式 vs SCD2 方式:全面对比
| 维度 | 快照方式(原文) | SCD2 方式(本文) |
|---|---|---|
| 写入策略 | 每次全量 INSERT N 行 | 只写变化的行 |
| 存储模型 | 横向堆叠(N批次 = N×行数) | 纵向版本链(只存变更) |
| 变更追踪粒度 | 批次级别(只知道"某批次有变化") | 行级别(精确知道"哪条数据变了") |
| 变更内容 | 不可知 | 旧版本+新版本完整对比 |
| 历史查询 | 按批次过滤,不精准 | start_date/end_date 精准区间 |
| 新增识别 | psa_operation='I' |
is_current='Y' 且是首次插入 |
| 变化识别 | 无法识别 | Checksum 不同触发 SCD2 |
| 删除识别 | psa_operation='D'(需额外维护) |
自然识别(NOT MATCHED BY SOURCE) |
| 存储成本 | O(n × batch_count) | O(n × avg_change_count) |
| 实现复杂度 | 低(INSERT SELECT) | 中(MERGE + 有效期逻辑) |
| 适用场景 | 小表、低频变更、快速原型 | 大表、频繁变更、审计合规 |
| 下游消费 | 简单(取最新批次) | 需过滤 is_current='Y' |
| 数据追溯 | 有限(批次维度) | 完整(行+时间双维度) |
总结
通过这一篇,可以看到SCD2总体的过程,以及在SQLServer下具体的实现。
SCD2 是数据仓库历史追踪的基石方案。掌握这一篇,数据仓库也更高效的方式具备了"时光机"——可以随时回溯到任意时间点,查看任意数据的真实状态。
BTW:
总结此篇的目的,一来把过去工作经验做一个总结,二来将来如果需要做同样的事情,借助AI工具,让AI工具参考此篇文章的URL,会帮助AI更高效准确的生成我想要的代码。
相关链接
- 《数据仓库笔记 第一篇:数据仓库的定义、历史与意义》
- 《数据仓库笔记 第二篇:PSA 层(持久化暂存区)详解》
- 《数据仓库笔记 第三篇:缓慢变化维》(预告:Star Schema层如何消费PSA-SCD2数据)
- 《数据仓库笔记 第四篇:Star Schema 层》
- 《数据仓库笔记 第五篇:Data Mart 层》
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
相关文章
最新发布
- 汇川 PLC 跨网段通信,看完就会
- 数据仓库笔记 第六篇:PSA 层 SCD2 处理方式
- DBLens for PostgreSQL 正式发布|把 PostgreSQL 开发与管理带进 AI + Agent 时代
- 【OpenClaw】通过 Nanobot 源码学习架构---(10)Heartbeat
- C# .NET 周刊|2026年4月1期
- Obsidian CLI 来了
- C# 视频录制监控系统
- 从 AMBA 协议看 Valid-Ready 到 Credit-based 流控机制
- Redis-Hash型与List型操作命令
- 8 年前的老代码 + 20 刀 AI token = 我的第一款独立产品

