数据仓库从0到1之数仓DWS&DWT层

DWS&DWT理论引入

在维度表中,以事实表为核心,到了宽表层则以维度表为核心。

DWS层和DWT层统称宽表层,这两层的设计思想大致相同,通过以下案例进行阐述。

1.问题引出:两个需求,统计每个省份订单的个数、统计每个省份订单的总金额

2.处理办法:都是将省份表和订单表进行join,group by省份,然后计算。同样数据被计算了两次,实际上类似的场景还会更多。

​ 那怎么设计能避免重复计算呢?

针对上述场景,可以设计一张地区宽表,其主键为地区ID,字段包含为:下单次数、下单金额、支付次数、支付金额等。上述所有指标都统一进行计算,并将结果保存在该宽表中,这样就能有效避免数据的重复计算。

3.总结:

  • 需要建哪些宽表:以维度为基准。
  • 宽表里面的字段:是站在不同维度的角度去看事实表,重点关注事实表聚合后的度量值。
  • DWS和DWT层的区别:DWS层存放的所有主题对象当天的汇总行为,例如每个地区当天的下单次数,下单金额等,DWT层存放的是所有主题对象的累积行为,例如每个地区最近7天(15天、30天、60天)的下单次数、下单金额等。

数仓搭建DWS层

DWD层建模是以业务为驱动的,有哪些业务线,就建立哪些事实表。在业务过程中,业务跟哪些维度相关,就在维度模型中对应体现出来,不需要考虑需求,维度建模很少考虑需求,主要基于业务驱动。

DWS、DWT宽表层建模是以需求为驱动的。在数仓当中,分析任务中绝大多是需求都是聚合统计,一般都是分组聚合,看某个维度和事实表中的度量值的关系。按照维度字段进行分组,度量值进行聚合(有些是以行数聚合)。

所以,在做DWS、DWT层时,需要弄明白有哪些需求

业务术语

  1. 用户

用户以设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。Android系统根据IMEI号,IOS系统根据OpenUDID来标识一个独立用户,每部手机一个用户,用设备id,称之为访客。

  1. 新增用户

首次联网使用应用的用户。如果一个用户首次打开某APP,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。

  1. 活跃用户

打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。

  1. 周(月)活跃用户

某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。

  1. 月活跃率

月活跃用户与截止到该月累计的用户总和之间的比例。

  1. 沉默用户

用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP的匹配程度。

  1. 版本分布

不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断APP各个版本之间的优劣和用户行为习惯。

  1. 本周回流用户

上周未启动过应用,本周启动了应用的用户。

  1. 连续n周活跃用户

连续n周,每周至少启动一次。

  1. 忠诚用户

连续活跃5周以上的用户

  1. 连续活跃用户

连续2周及以上活跃的用户

  1. 近期流失用户

连续n(2<= n <= 4)周没有启动应用的用户。(第n+1周没有启动过)

  1. 留存用户

某段时间内的新增用户,经过一段时间后,仍然使用应用的被认作是留存用户;这部分用户占当时新增用户的比例即是留存率。

例如,5月份新增用户200,这200人在6月份启动过应用的有100人,7月份启动过应用的有80人,8月份启动过应用的有50人;则5月份新增用户一个月后的留存率是50%,二个月后的留存率是40%,三个月后的留存率是25%。

  1. 用户新鲜度

每天启动应用的新老用户比例,即新增用户数占活跃用户数的比例。

  1. 单次使用时长

每次启动使用的时间长度。

  1. 日使用时长

累计一天内的使用时间长度。

  1. 启动次数计算标准

IOS平台应用退到后台就算一次独立的启动;Android平台我们规定,两次启动之间的间隔小于30秒,被计算一次启动。用户在使用过程中,若因收发短信或接电话等退出应用30秒又再次返回应用中,那这两次行为应该是延续而非独立的,所以可以被算作一次使用行为,即一次启动。业内大多使用30秒这个标准,但用户还是可以自定义此时间间隔。

建表导数

每日设备行为

每日设备行为,主要按照设备id统计,统计设备每日访问的页面id和页面访问次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
drop table if exists dws_uv_detail_daycount;
create external table dws_uv_detail_daycount
(
`mid_id` string COMMENT '设备id',
`brand` string COMMENT '手机品牌',
`model` string COMMENT '手机型号',
`login_count` bigint COMMENT '活跃次数',
`page_stats` array<struct<page_id:string,page_count:bigint>> COMMENT '页面访问统计'
) COMMENT '每日设备行为表'
partitioned by(dt string)
stored as parquet
location '/warehouse/gmall/dws/dws_uv_detail_daycount'
tblproperties ("parquet.compression"="lzo");

数据导入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
--= 每日设备行为
-- 业务需求:统计每台设备的每日行为,每日登陆次数、 每日每页面访问次数
with tmp_start as
(
-- 计算login_count业务需求
select
mid_id,
brand,
model,
count(*) login_count
from dwd_start_log
where dt='2020-06-15'
group by mid_id, brand, model
),
tmp_page as
(
select
mid_id,
brand,
model,
collect_set(
named_struct('page_id', page_id, 'page_count', page_count)
) page_stats
from (
select
mid_id,
brand,
model,
page_id,
count(*) page_count
from dwd_page_log
where dt='2020-06-15'
group by mid_id, brand, model, page_id

) tmp
group by mid_id, brand, model
)
-- 存在启动日志遗失设备信息,但页面日志记录了该设备信息的页面访问记录
select
nvl(tmp_start.mid_id, tmp_page.mid_id)
,nvl(tmp_start.brand, tmp_page.brand)
,nvl(tmp_start.model, tmp_page.model)
,tmp_start.login_count
,tmp_page.page_stats
from tmp_start full outer join tmp_page
on tmp_start.mid_id = tmp_page.mid_id;
-- mid_id是访客唯一标识id,仅连接mid
-- and tmp_start.brand = tmp_page.brand
-- and tmp_start.model = tmp_page.model

每日会员行为

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
drop table if exists dws_user_action_daycount;
create external table dws_user_action_daycount
(
user_id string comment '用户 id',
login_count bigint comment '登录次数',
cart_count bigint comment '加入购物车次数',
order_count bigint comment '下单次数',
order_amount decimal(16,2) comment '下单金额',
payment_count bigint comment '支付次数',
payment_amount decimal(16,2) comment '支付金额',
order_detail_stats array<struct<sku_id:string,sku_num:bigint,order_count:bigint,order_amount:decimal(20,2)>> comment '下单明细统计'
) COMMENT '每日会员行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_user_action_daycount/'
tblproperties ("parquet.compression"="lzo");

导入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
--= 每日会员行为

with tmp_start as
(
select user_id, count(*) login_count
from dwd_start_log
where dt='2020-06-15'
and user_id is not null
group by user_id
),
tmp_action as
(
select
user_id,
-- sum(if(action_id='cart_add', 1, 0)) cart_count
count(*) cart_count
from dwd_action_log
where dt='2020-06-15'
and action_id = 'cart_add'
and user_id is not null
group by user_id
),
tmp_order as
(
select user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from dwd_fact_order_info
where dt='2020-06-15'
and user_id is not null
and id is not null
group by user_id
),
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd_fact_payment_info
where dt='2020-06-15'
and user_id is not null
and id is not null
group by user_id
),
tmp_od as
(
select
user_id,
collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'order_amount',order_amount)) order_stats
from (
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count, --这里的order_count是大订单下的小订单数目
-- 如果不加上类型转换,求sum后会自动把数据类型从decimal(20,2)decimal(30,2)这样后面插入表数据时,会因为和建表的decimal(20,2)格式不符而报错
cast(sum(final_amount_d) as decimal(20, 2)) order_amount
from dwd_fact_order_detail
where dt='2020-06-15'
and user_id is not null
group by user_id, sku_id
) tmp
group by user_id
)
-- insert overwrite table dws_user_action_daycount partition(dt='2020-06-15')
select
tmp_start.user_id,
login_count,
nvl(cart_count,0),
nvl(order_count,0),
nvl(order_amount,0.0),
nvl(payment_count,0),
nvl(payment_amount,0.0),
order_stats
from tmp_start
left join tmp_action on tmp_start.user_id = tmp_action.user_id
left join tmp_order on tmp_start.user_id = tmp_order.user_id
left join tmp_payment on tmp_start.user_id = tmp_payment.user_id
left join tmp_od on tmp_start.user_id = tmp_od.user_id;

每日商品行为

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
drop table if exists dws_sku_action_daycount;
create external table dws_sku_action_daycount
(
sku_id string comment 'sku_id',
order_count bigint comment '被下单次数',
order_num bigint comment '被下单件数',
order_amount decimal(16,2) comment '被下单金额',
payment_count bigint comment '被支付次数',
payment_num bigint comment '被支付件数',
payment_amount decimal(16,2) comment '被支付金额',
refund_count bigint comment '被退款次数',
refund_num bigint comment '被退款件数',
refund_amount decimal(16,2) comment '被退款金额',
cart_count bigint comment '被加入购物车次数',
favor_count bigint comment '被收藏次数',
appraise_good_count bigint comment '好评数',
appraise_mid_count bigint comment '中评数',
appraise_bad_count bigint comment '差评数',
appraise_default_count bigint comment '默认评价数'
) COMMENT '每日商品行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_sku_action_daycount/'
tblproperties ("parquet.compression"="lzo");

数据装载

注意:如果是23点59下单,支付日期跨天。需要从订单详情里面取出支付时间是今天,且订单时间是昨天或者今天的订单。

思路:该表全都是数值型字段,因此,我们可以分别将多个子查询看作是全表,缺少的字段我们置为0,最后将多个稀疏大表进行union,然后根据sku_id进行分组聚合,这样也能求出完整的数据结果,而且性能比全外连好,也不会出现null值,因为null值被0给替换掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
with tmp_order as (
select
sku_id
,count(*) as order_count
,sum(sku_num) as order_num
,sum(final_amount_d) as order_amount
from dwd_fact_order_detail
where dt='2020-06-15'
group by sku_id
),
tmp_payment as (
select
sku_id
,count(*) payment_count
,sum(sku_num) payment_num
,sum(final_amount_d) payment_amount
from dwd_fact_order_detail
-- 如果是23点59下单,支付日期跨天。需要从订单详情里面取出支付时间是今天,且订单时间是昨天或者今天的订单。
where (dt='2020-06-15' or dt=date_add('2020-06-15', -1))
and order_id in (
select
id
from dwd_fact_order_info
where (dt='2020-06-15' or date_add('2020-06-15', -1))
and date_format(payment_time, 'yyyy-MM-dd')='2020-06-15'
)
group by sku_id
),
tmp_refund as (
select
sku_id
,count(*) refund_count
,sum(refund_num) refund_num
,sum(refund_amount) refund_amount
from dwd_fact_order_refund_info
where dt='2020-06-15'
group by sku_id
),
tmp_cart as
(
select
item sku_id,
count(*) cart_count
from dwd_action_log
where dt='2020-06-15'
and user_id is not null
and action_id='cart_add'
group by item
),tmp_favor as
(
select
item sku_id,
count(*) favor_count
from dwd_action_log
where dt='2020-06-15'
and user_id is not null
and action_id='favor_add'
group by item
),
tmp_appraise as
(
select
sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from dwd_fact_comment_info
where dt='2020-06-15'
group by sku_id
)

insert overwrite table dws_sku_action_daycount partition(dt='2020-06-15')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;

每日活动统计

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
drop table if exists dws_activity_info_daycount;
create external table dws_activity_info_daycount(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间',
`display_count` bigint COMMENT '曝光次数',
`order_count` bigint COMMENT '下单次数',
`order_amount` decimal(20,2) COMMENT '下单金额',
`payment_count` bigint COMMENT '支付次数',
`payment_amount` decimal(20,2) COMMENT '支付金额'
) COMMENT '每日活动统计'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_activity_info_daycount/'
tblproperties ("parquet.compression"="lzo");

每日活动导数代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
--= 每日活动统计
with tmp_op as (
select
activity_id
,sum(if(date_format(create_time, 'yyyy-MM-dd')='2020-06-15'), 1, 0) order_count
,sum(if(date_format(create_time, 'yyyy-MM-dd')='2020-06-15'), final_total_amount, 0) order_amount
,sum(if(date_format(payment_time, 'yyyy-MM-dd')='2020-06-15'), 1, 0) payment_count
,sum(if(date_format(payment_time, 'yyyy-MM-dd')='2020-06-15'), final_total_amount, 0) payment_amount
from dwd_fact_order_info
where dt='2020-06-15' or dt=date_add('2020-06-15', -1)
and activity_id is not null
group by activity_id
),
tmp_display as (
select
item activity_id
,count(*) display_count
from dwd_display_log
where dt='2020-06-15'
group by item
),
tmp_activity as (
select
id
,activity_name
,activity_type
,start_time
,end_time
,create_time
from dwd_dim_activity_info
where dt='2020-06-15'
)
insert overwrite table dws_activity_info_daycount partition(dt='2020-06-15')
select
nvl(tmp_op.activity_id,tmp_display.activity_id),
tmp_activity.activity_name,
tmp_activity.activity_type,
tmp_activity.start_time,
tmp_activity.end_time,
tmp_activity.create_time,
tmp_display.display_count,
tmp_op.order_count,
tmp_op.order_amount,
tmp_op.payment_count,
tmp_op.payment_amount
from tmp_op
full outer join tmp_display on tmp_op.activity_id=tmp_display.activity_id
left join tmp_activity on nvl(tmp_op.activity_id,tmp_display.activity_id)=tmp_activity.id;

每日地区统计

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
drop table if exists dws_area_stats_daycount;
create external table dws_area_stats_daycount(
`id` bigint COMMENT '编号',
`province_name` string COMMENT '省份名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'iso编码',
`region_id` string COMMENT '地区ID',
`region_name` string COMMENT '地区名称',
`login_count` string COMMENT '活跃设备数',
`order_count` bigint COMMENT '下单次数',
`order_amount` decimal(20,2) COMMENT '下单金额',
`payment_count` bigint COMMENT '支付次数',
`payment_amount` decimal(20,2) COMMENT '支付金额'
) COMMENT '每日地区统计表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_area_stats_daycount/'
tblproperties ("parquet.compression"="lzo");

导数语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
with tmp_login as
(
select
area_code,
count(*) login_count
from dwd_start_log
where dt='2020-06-15'
group by area_code
),
tmp_op as
(
select
province_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='2020-06-15',1,0)) order_count,
sum(if(date_format(create_time,'yyyy-MM-dd')='2020-06-15',final_total_amount,0)) order_amount,
sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-06-15',1,0)) payment_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-06-15',final_total_amount,0)) payment_amount
from dwd_fact_order_info
where (dt='2020-06-15' or dt=date_add('2020-06-15',-1))
group by province_id
)
insert overwrite table dws_area_stats_daycount partition(dt='2020-06-15')
select
pro.id,
pro.province_name,
pro.area_code,
pro.iso_code,
pro.region_id,
pro.region_name,
nvl(tmp_login.login_count,0),
nvl(tmp_op.order_count,0),
nvl(tmp_op.order_amount,0.0),
nvl(tmp_op.payment_count,0),
nvl(tmp_op.payment_amount,0.0)
from dwd_dim_base_province pro
left join tmp_login on pro.area_code=tmp_login.area_code
left join tmp_op on pro.id=tmp_op.province_id;

DWS层导数脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
#!/bin/bash

APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi

sql="
set mapreduce.job.queuename=hive;
with
tmp_start as
(
select
mid_id,
brand,
model,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
group by mid_id,brand,model
),
tmp_page as
(
select
mid_id,
brand,
model,
collect_set(named_struct('page_id',page_id,'page_count',page_count)) page_stats
from
(
select
mid_id,
brand,
model,
page_id,
count(*) page_count
from ${APP}.dwd_page_log
where dt='$do_date'
group by mid_id,brand,model,page_id
)tmp
group by mid_id,brand,model
)
insert overwrite table ${APP}.dws_uv_detail_daycount partition(dt='$do_date')
select
nvl(tmp_start.mid_id,tmp_page.mid_id),
nvl(tmp_start.brand,tmp_page.brand),
nvl(tmp_start.model,tmp_page.model),
tmp_start.login_count,
tmp_page.page_stats
from tmp_start
full outer join tmp_page
on tmp_start.mid_id=tmp_page.mid_id
and tmp_start.brand=tmp_page.brand
and tmp_start.model=tmp_page.model;


with
tmp_login as
(
select
user_id,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
and user_id is not null
group by user_id
),
tmp_cart as
(
select
user_id,
count(*) cart_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='cart_add'
group by user_id
),tmp_order as
(
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from ${APP}.dwd_fact_order_info
where dt='$do_date'
group by user_id
) ,
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from ${APP}.dwd_fact_payment_info
where dt='$do_date'
group by user_id
),
tmp_order_detail as
(
select
user_id,
collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'order_amount',order_amount)) order_stats
from
(
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
cast(sum(final_amount_d) as decimal(20,2)) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date'
group by user_id,sku_id
)tmp
group by user_id
)

insert overwrite table ${APP}.dws_user_action_daycount partition(dt='$do_date')
select
tmp_login.user_id,
login_count,
nvl(cart_count,0),
nvl(order_count,0),
nvl(order_amount,0.0),
nvl(payment_count,0),
nvl(payment_amount,0.0),
order_stats
from tmp_login
left outer join tmp_cart on tmp_login.user_id=tmp_cart.user_id
left outer join tmp_order on tmp_login.user_id=tmp_order.user_id
left outer join tmp_payment on tmp_login.user_id=tmp_payment.user_id
left outer join tmp_order_detail on tmp_login.user_id=tmp_order_detail.user_id;

with
tmp_order as
(
select
sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(final_amount_d) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date'
group by sku_id
),
tmp_payment as
(
select
sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(final_amount_d) payment_amount
from ${APP}.dwd_fact_order_detail
where (dt='$do_date'
or dt=date_add('$do_date',-1))
and order_id in
(
select
id
from ${APP}.dwd_fact_order_info
where (dt='$do_date'
or dt=date_add('$do_date',-1))
and date_format(payment_time,'yyyy-MM-dd')='$do_date'
)
group by sku_id
),
tmp_refund as
(
select
sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from ${APP}.dwd_fact_order_refund_info
where dt='$do_date'
group by sku_id
),
tmp_cart as
(
select
item sku_id,
count(*) cart_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='cart_add'
group by item
),tmp_favor as
(
select
item sku_id,
count(*) favor_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='favor_add'
group by item
),
tmp_appraise as
(
select
sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from ${APP}.dwd_fact_comment_info
where dt='$do_date'
group by sku_id
)

insert overwrite table ${APP}.dws_sku_action_daycount partition(dt='$do_date')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;

with
tmp_login as
(
select
area_code,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
group by area_code
),
tmp_op as
(
select
province_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) order_amount,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) payment_amount
from ${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1))
group by province_id
)
insert overwrite table ${APP}.dws_area_stats_daycount partition(dt='$do_date')
select
pro.id,
pro.province_name,
pro.area_code,
pro.iso_code,
pro.region_id,
pro.region_name,
nvl(tmp_login.login_count,0),
nvl(tmp_op.order_count,0),
nvl(tmp_op.order_amount,0.0),
nvl(tmp_op.payment_count,0),
nvl(tmp_op.payment_amount,0.0)
from ${APP}.dwd_dim_base_province pro
left join tmp_login on pro.area_code=tmp_login.area_code
left join tmp_op on pro.id=tmp_op.province_id;


with
tmp_op as
(
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) order_amount,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) payment_amount
from ${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1))
and activity_id is not null
group by activity_id
),
tmp_display as
(
select
item activity_id,
count(*) display_count
from ${APP}.dwd_display_log
where dt='$do_date'
and item_type='activity_id'
group by item
),
tmp_activity as
(
select
*
from ${APP}.dwd_dim_activity_info
where dt='$do_date'
)
insert overwrite table ${APP}.dws_activity_info_daycount partition(dt='$do_date')
select
nvl(tmp_op.activity_id,tmp_display.activity_id),
tmp_activity.activity_name,
tmp_activity.activity_type,
tmp_activity.start_time,
tmp_activity.end_time,
tmp_activity.create_time,
tmp_display.display_count,
tmp_op.order_count,
tmp_op.order_amount,
tmp_op.payment_count,
tmp_op.payment_amount
from tmp_op
full outer join tmp_display on tmp_op.activity_id=tmp_display.activity_id
left join tmp_activity on nvl(tmp_op.activity_id,tmp_display.activity_id)=tmp_activity.id;
"

$hive -e "$sql"

DWS导数

1
2
[dw@hadoop116 ~]$ chmod 777 dwd_to_dws_db.sh 
[dw@hadoop116 ~]$ dwd_to_dws_db.sh 2020-06-15

数仓搭建DWT层

设备主题宽表

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
drop table if exists dwt_uv_topic;
create external table dwt_uv_topic
(
`mid_id` string comment '设备id',
`brand` string comment '手机品牌',
`model` string comment '手机型号',
`login_date_first` string comment '首次活跃时间',
`login_date_last` string comment '末次活跃时间',
`login_day_count` bigint comment '当日活跃次数',
`login_count` bigint comment '累积活跃天数'
) COMMENT '设备主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_uv_topic'
tblproperties ("parquet.compression"="lzo");

导数语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
insert overwrite table dwt_uv_topic
select
nvl(new.mid_id,old.mid_id),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
if(old.mid_id is null,'2020-06-15',old.login_date_first),
if(new.mid_id is not null,'2020-06-15',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from dwt_uv_topic
)old
full outer join
(
select
*
from dws_uv_detail_daycount
where dt='2020-06-15'
)new
on old.mid_id=new.mid_id;

会员主题宽表

DWT层的数据是按照天为维度聚合,但是DWS层是按照一段时间累计的数据进行汇总。宽表字段怎么来?是根据维度表关联的事实表中的度量值+开头、结尾+累积+累积一个时间段的来的汇总结果。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
drop table if exists dwt_user_topic;
create external table dwt_user_topic
(
user_id string comment '用户id',
--需要将所有登陆日志汇总才能求以下三行的数据
--新用户首次登陆时间就是当天日期
login_date_first string comment '首次登录时间',
--当天登陆的用户的末次登陆时间为当天
login_date_last string comment '末次登录时间',
--1.老用户当天登陆了+1,没登陆不变,新用户为1,
login_count bigint comment '累积登录天数',
--一般做这个统计是最近7天或者15天
--1.最近30天数据只拿当天是不够的,还要拿到第30天起始那天的记录,减去30天起始那天的记录,如果登陆了减去1,没有就减去0,还要考虑当天的活跃与否
--2.全部数据重新计算一遍重新计算30天,数据量不大就适合
login_last_30d_count bigint comment '最近30日登录天数',
order_date_first string comment '首次下单时间',
order_date_last string comment '末次下单时间',
order_count bigint comment '累积下单次数',
order_amount decimal(16,2) comment '累积下单金额',
order_last_30d_count bigint comment '最近30日下单次数',
order_last_30d_amount bigint comment '最近30日下单金额',
payment_date_first string comment '首次支付时间',
payment_date_last string comment '末次支付时间',
payment_count decimal(16,2) comment '累积支付次数',
payment_amount decimal(16,2) comment '累积支付金额',
payment_last_30d_count decimal(16,2) comment '最近30日支付次数',
payment_last_30d_amount decimal(16,2) comment '最近30日支付金额'
)COMMENT '会员主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_user_topic/'
tblproperties ("parquet.compression"="lzo");

导数代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
insert overwrite table dwt_user_topic
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and new.login_count>0,'2020-06-15',old.login_date_first),
if(new.login_count>0,'2020-06-15',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and new.order_count>0,'2020-06-15',old.order_date_first),
if(new.order_count>0,'2020-06-15',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and new.payment_count>0,'2020-06-15',old.payment_date_first),
if(new.payment_count>0,'2020-06-15',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
dwt_user_topic old
full outer join
(
select
user_id,
sum(if(dt='2020-06-15',login_count,0)) login_count,
sum(if(dt='2020-06-15',order_count,0)) order_count,
sum(if(dt='2020-06-15',order_amount,0)) order_amount,
sum(if(dt='2020-06-15',payment_count,0)) payment_count,
sum(if(dt='2020-06-15',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws_user_action_daycount
where dt>=date_add( '2020-06-15',-30)
group by user_id
)new
on old.user_id=new.user_id;

商品主题宽表

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
drop table if exists dwt_sku_topic;
create external table dwt_sku_topic
(
sku_id string comment 'sku_id',
spu_id string comment 'spu_id',
order_last_30d_count bigint comment '最近30日被下单次数',
order_last_30d_num bigint comment '最近30日被下单件数',
order_last_30d_amount decimal(16,2) comment '最近30日被下单金额',
order_count bigint comment '累积被下单次数',
order_num bigint comment '累积被下单件数',
order_amount decimal(16,2) comment '累积被下单金额',
payment_last_30d_count bigint comment '最近30日被支付次数',
payment_last_30d_num bigint comment '最近30日被支付件数',
payment_last_30d_amount decimal(16,2) comment '最近30日被支付金额',
payment_count bigint comment '累积被支付次数',
payment_num bigint comment '累积被支付件数',
payment_amount decimal(16,2) comment '累积被支付金额',
refund_last_30d_count bigint comment '最近三十日退款次数',
refund_last_30d_num bigint comment '最近三十日退款件数',
refund_last_30d_amount decimal(16,2) comment '最近三十日退款金额',
refund_count bigint comment '累积退款次数',
refund_num bigint comment '累积退款件数',
refund_amount decimal(16,2) comment '累积退款金额',
cart_last_30d_count bigint comment '最近30日被加入购物车次数',
cart_count bigint comment '累积被加入购物车次数',
favor_last_30d_count bigint comment '最近30日被收藏次数',
favor_count bigint comment '累积被收藏次数',
appraise_last_30d_good_count bigint comment '最近30日好评数',
appraise_last_30d_mid_count bigint comment '最近30日中评数',
appraise_last_30d_bad_count bigint comment '最近30日差评数',
appraise_last_30d_default_count bigint comment '最近30日默认评价数',
appraise_good_count bigint comment '累积好评数',
appraise_mid_count bigint comment '累积中评数',
appraise_bad_count bigint comment '累积差评数',
appraise_default_count bigint comment '累积默认评价数'
)COMMENT '商品主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_sku_topic/'
tblproperties ("parquet.compression"="lzo");

导数语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
insert overwrite table dwt_sku_topic
select
nvl(new.sku_id,old.sku_id),
sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
dwt_sku_topic old
full outer join
(
select
sku_id,
sum(if(dt='2020-06-15', order_count,0 )) order_count,
sum(if(dt='2020-06-15',order_num ,0 )) order_num,
sum(if(dt='2020-06-15',order_amount,0 )) order_amount ,
sum(if(dt='2020-06-15',payment_count,0 )) payment_count,
sum(if(dt='2020-06-15',payment_num,0 )) payment_num,
sum(if(dt='2020-06-15',payment_amount,0 )) payment_amount,
sum(if(dt='2020-06-15',refund_count,0 )) refund_count,
sum(if(dt='2020-06-15',refund_num,0 )) refund_num,
sum(if(dt='2020-06-15',refund_amount,0 )) refund_amount,
sum(if(dt='2020-06-15',cart_count,0 )) cart_count,
sum(if(dt='2020-06-15',favor_count,0 )) favor_count,
sum(if(dt='2020-06-15',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='2020-06-15',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='2020-06-15',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='2020-06-15',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from dws_sku_action_daycount
where dt >= date_add ('2020-06-15', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(select * from dwd_dim_sku_info where dt='2020-06-15') sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;

活动主题宽表

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
drop table if exists dwt_activity_topic;
create external table dwt_activity_topic(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间',
`display_day_count` bigint COMMENT '当日曝光次数',
`order_day_count` bigint COMMENT '当日下单次数',
`order_day_amount` decimal(20,2) COMMENT '当日下单金额',
`payment_day_count` bigint COMMENT '当日支付次数',
`payment_day_amount` decimal(20,2) COMMENT '当日支付金额',
`display_count` bigint COMMENT '累积曝光次数',
`order_count` bigint COMMENT '累积下单次数',
`order_amount` decimal(20,2) COMMENT '累积下单金额',
`payment_count` bigint COMMENT '累积支付次数',
`payment_amount` decimal(20,2) COMMENT '累积支付金额'
) COMMENT '活动主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_activity_topic/'
tblproperties ("parquet.compression"="lzo");

导数语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
insert overwrite table dwt_activity_topic
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.activity_type,old.activity_type),
nvl(new.start_time,old.start_time),
nvl(new.end_time,old.end_time),
nvl(new.create_time,old.create_time),
nvl(new.display_count,0),
nvl(new.order_count,0),
nvl(new.order_amount,0.0),
nvl(new.payment_count,0),
nvl(new.payment_amount,0.0),
nvl(new.display_count,0)+nvl(old.display_count,0),
nvl(new.order_count,0)+nvl(old.order_count,0),
nvl(new.order_amount,0.0)+nvl(old.order_amount,0.0),
nvl(new.payment_count,0)+nvl(old.payment_count,0),
nvl(new.payment_amount,0.0)+nvl(old.payment_amount,0.0)
from
(
select
*
from dwt_activity_topic
)old
full outer join
(
select
*
from dws_activity_info_daycount
where dt='2020-06-15'
)new
on old.id=new.id;

地区主题宽表

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
drop table if exists dwt_area_topic;
create external table dwt_area_topic(
`id` bigint COMMENT '编号',
`province_name` string COMMENT '省份名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'iso编码',
`region_id` string COMMENT '地区ID',
`region_name` string COMMENT '地区名称',
`login_day_count` string COMMENT '当天活跃设备数',
`login_last_30d_count` string COMMENT '最近30天活跃设备数',
`order_day_count` bigint COMMENT '当天下单次数',
`order_day_amount` decimal(16,2) COMMENT '当天下单金额',
`order_last_30d_count` bigint COMMENT '最近30天下单次数',
`order_last_30d_amount` decimal(16,2) COMMENT '最近30天下单金额',
`payment_day_count` bigint COMMENT '当天支付次数',
`payment_day_amount` decimal(16,2) COMMENT '当天支付金额',
`payment_last_30d_count` bigint COMMENT '最近30天支付次数',
`payment_last_30d_amount` decimal(16,2) COMMENT '最近30天支付金额'
) COMMENT '地区主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_area_topic/'
tblproperties ("parquet.compression"="lzo");

导数语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
insert overwrite table dwt_area_topic
select
nvl(old.id,new.id),
nvl(old.province_name,new.province_name),
nvl(old.area_code,new.area_code),
nvl(old.iso_code,new.iso_code),
nvl(old.region_id,new.region_id),
nvl(old.region_name,new.region_name),
nvl(new.login_day_count,0),
nvl(new.login_last_30d_count,0),
nvl(new.order_day_count,0),
nvl(new.order_day_amount,0.0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0.0),
nvl(new.payment_day_count,0),
nvl(new.payment_day_amount,0.0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0.0)
from
(
select
*
from dwt_area_topic
)old
full outer join
(
select
id,
province_name,
area_code,
iso_code,
region_id,
region_name,
sum(if(dt='2020-06-14',login_count,0)) login_day_count,
sum(if(dt='2020-06-14',order_count,0)) order_day_count,
sum(if(dt='2020-06-14',order_amount,0.0)) order_day_amount,
sum(if(dt='2020-06-14',payment_count,0)) payment_day_count,
sum(if(dt='2020-06-14',payment_amount,0.0)) payment_day_amount,
sum(login_count) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws_area_stats_daycount
where dt>=date_add('2020-06-14',-30)
group by id,province_name,area_code,iso_code,region_id,region_name
)new
on old.id=new.id;

DWT层导数脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/bin/bash

APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi

sql="
set mapreduce.job.queuename=hive;
insert overwrite table ${APP}.dwt_uv_topic
select
nvl(new.mid_id,old.mid_id),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
if(old.mid_id is null,'$do_date',old.login_date_first),
if(new.mid_id is not null,'$do_date',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from ${APP}.dwt_uv_topic
)old
full outer join
(
select
*
from ${APP}.dws_uv_detail_daycount
where dt='$do_date'
)new
on old.mid_id=new.mid_id;

insert overwrite table ${APP}.dwt_user_topic
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and new.login_count>0,'$do_date',old.login_date_first),
if(new.login_count>0,'$do_date',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and new.order_count>0,'$do_date',old.order_date_first),
if(new.order_count>0,'$do_date',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and new.payment_count>0,'$do_date',old.payment_date_first),
if(new.payment_count>0,'$do_date',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
${APP}.dwt_user_topic old
full outer join
(
select
user_id,
sum(if(dt='$do_date',login_count,0)) login_count,
sum(if(dt='$do_date',order_count,0)) order_count,
sum(if(dt='$do_date',order_amount,0)) order_amount,
sum(if(dt='$do_date',payment_count,0)) payment_count,
sum(if(dt='$do_date',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from ${APP}.dws_user_action_daycount
where dt>=date_add( '$do_date',-30)
group by user_id
)new
on old.user_id=new.user_id;

insert overwrite table ${APP}.dwt_sku_topic
select
nvl(new.sku_id,old.sku_id),
sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_num,0),
nvl(old.payment_amount,0) + nvl(new.payment_amount,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count,
order_num,
order_amount ,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count,
payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_count,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from ${APP}.dwt_sku_topic
)old
full outer join
(
select
sku_id,
sum(if(dt='$do_date', order_count,0 )) order_count,
sum(if(dt='$do_date',order_num ,0 )) order_num,
sum(if(dt='$do_date',order_amount,0 )) order_amount ,
sum(if(dt='$do_date',payment_count,0 )) payment_count,
sum(if(dt='$do_date',payment_num,0 )) payment_num,
sum(if(dt='$do_date',payment_amount,0 )) payment_amount,
sum(if(dt='$do_date',refund_count,0 )) refund_count,
sum(if(dt='$do_date',refund_num,0 )) refund_num,
sum(if(dt='$do_date',refund_amount,0 )) refund_amount,
sum(if(dt='$do_date',cart_count,0 )) cart_count,
sum(if(dt='$do_date',favor_count,0 )) favor_count,
sum(if(dt='$do_date',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='$do_date',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='$do_date',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='$do_date',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from ${APP}.dws_sku_action_daycount
where dt >= date_add ('$do_date', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(select * from ${APP}.dwd_dim_sku_info where dt='$do_date') sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;

insert overwrite table ${APP}.dwt_activity_topic
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.activity_type,old.activity_type),
nvl(new.start_time,old.start_time),
nvl(new.end_time,old.end_time),
nvl(new.create_time,old.create_time),
nvl(new.display_count,0),
nvl(new.order_count,0),
nvl(new.order_amount,0.0),
nvl(new.payment_count,0),
nvl(new.payment_amount,0.0),
nvl(new.display_count,0)+nvl(old.display_count,0),
nvl(new.order_count,0)+nvl(old.order_count,0),
nvl(new.order_amount,0.0)+nvl(old.order_amount,0.0),
nvl(new.payment_count,0)+nvl(old.payment_count,0),
nvl(new.payment_amount,0.0)+nvl(old.payment_amount,0.0)
from
(
select
*
from ${APP}.dwt_activity_topic
)old
full outer join
(
select
*
from ${APP}.dws_activity_info_daycount
where dt='$do_date'
)new
on old.id=new.id;

insert overwrite table ${APP}.dwt_area_topic
select
nvl(old.id,new.id),
nvl(old.province_name,new.province_name),
nvl(old.area_code,new.area_code),
nvl(old.iso_code,new.iso_code),
nvl(old.region_id,new.region_id),
nvl(old.region_name,new.region_name),
nvl(new.login_day_count,0),
nvl(new.login_last_30d_count,0),
nvl(new.order_day_count,0),
nvl(new.order_day_amount,0.0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0.0),
nvl(new.payment_day_count,0),
nvl(new.payment_day_amount,0.0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0.0)
from
(
select
*
from ${APP}.dwt_area_topic
)old
full outer join
(
select
id,
province_name,
area_code,
iso_code,
region_id,
region_name,
sum(if(dt='$do_date',login_count,0)) login_day_count,
sum(if(dt='$do_date',order_count,0)) order_day_count,
sum(if(dt='$do_date',order_amount,0.0)) order_day_amount,
sum(if(dt='$do_date',payment_count,0)) payment_day_count,
sum(if(dt='$do_date',payment_amount,0.0)) payment_day_amount,
sum(login_count) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from ${APP}.dws_area_stats_daycount
where dt>=date_add('$do_date',-30)
group by id,province_name,area_code,iso_code,region_id,region_name
)new
on old.id=new.id;
"

$hive -e "$sql"