以下这篇博客转载自数据仓库案例,巨详细👍👍👍👍👍
离线数据仓库
数据仓库(Data WareHouse)是为企业所有决策制定过程,提供所有系统数据支持的战略集合
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制、成本、提高产品质量等
数据仓库,并不是数据最终目的地,而是为数据最终的目的地做好准备:清洗、转义、分类、重组、合并、拆分、统计等等
1 项目简介
1.1 项目需求
- 用户行为数据采集平台搭建
- 业务数据采集平台搭建
- 数据仓库维度建模
- 分析:用户、流量、会员、商品、销售、地区、活动等主题
- 采用即席查询工具,随时进行指标分析
- 对集群性能进行监控,发生异常需要报警
- 元数据管理
- 质量监控
1.2 技术选型
数据采集功能如何技术选型
采集框架名称 主要功能 Sqoop 大数据平台和关系型数据库的导入导出 Datax 大数据平台和关系型数据库的导入导出 flume 擅长日志数据的采集和解析 logstash 擅长日志数据的采集和解析 maxwell 常用作实时解析mysql的binlog数据 canal 常用作实时解析mysql的binlog数据 waterDrop 数据导入导出工具 消息中间件的技术选型
开源MQ 概述 RabbitMQ LShift 用Erlang实现,支持多协议,broker架构,重量级 ZeroMQ AMQP最初设计者iMatix公司实现,轻量消息内核,无broker设计。C++实现 Kafka LinkedIn用Scala语言实现,支持hadoop数据并行加载 ActiveMQ Apach的一种JMS具体实现,支持代理和p2p部署。支持多协议。Java实现 Redis Key-value NoSQL数据库,有MQ的功能 MemcacheQ 国人利用memcache缓冲队列协议开发的消息队列,C/C++实现 数据永久存储技术框架选型
框架名称 主要用途 HDFS 分布式文件存储系统 Hbase Key,value对的nosql数据库 Kudu Cloudera公司开源提供的类似于Hbase的数据存储 Hive 基于MR的数据仓库工具 数据离线计算框架技术选型(hive引擎)
框架名称 基本介绍 MapReduce 最早期的分布式文件计算系统 Spark 基于spark,一站式解决批流处理问题 Flink 基于flink,一站式解决批流处理问题 分析数据库选型
对比项目 Druid Kylin Presto Impala ES 亚秒级响应 √ √ × × × 百亿数据集 √ √ √ √ √ SQL支持 √ √ √ √ √(需插件) 离线 √ √ √ √ √ 实时 √ √ × × × 精确去重 × √ √ √ × 多表Join × √ √ √ × JDBC for BI × √ √ √ × 其他选型
如何确认集群规模(假设每台服务器8T磁盘,128G内存)
- 每天日活跃用户100万,每人一天平均100条:100万 * 100条 = 1亿条
- 每条日志1K左右,每天1一条:1亿 / 1024 /1024 = 约100G
- 半年内不扩容服务器来算:100G * 180天 = 约18T
- 保存3个副本:18T * 3 = 54T
- 预留20% ~ 30%BUF:54T / 0.7 = 77T
- 总结:约10台服务器
由于资源有限,采用3台进行制作
服务名称 | 子服务 | 服务器 cdh01.cm | 服务器 cdh02.cm | 服务器 cdh03.cm |
---|---|---|---|---|
HDFS | NameNode DataNode SecondaryNameNode | √ √ | √ | √ √ |
Yarn | NodeManager Resourcemanager | √ | √ √ | √ |
Zookeeper | Zookeeper Server | √ | √ | √ |
Flume | Flume Flume(消费 Kafka) | √ | √ | √ |
Kafka | Kafka | √ | √ | √ |
Hive | Hive | √ | ||
MySQL | MySQL | √ | ||
Sqoop | Sqoop | √ | ||
Presto | Coordinator Worker | √ | √ | √ |
DolphinScheduler | DolphinScheduler | √ | ||
Druid | Druid | √ | √ | √ |
Kylin | Kylin | √ | ||
Hbase | HMaster HRegionServer | √ √ | √ | √ |
Superset | Superset | √ | ||
Atlas | Atlas | √ | ||
Solr | Solr | √ |
2 数据生成模块
此模块主要针对于用户行为数据的采集,为什么要进行用户行为数据的采集呢?
因为对于企业来说,用户就是钱,需要将用户的习惯等数据进行采集,以便在大数据衍生产品如用户画像标签系统进行分析,那么一般情况下用户的信息都是离线分析的,后期我们可以将分析结果存入ES等倒排索引生态中,在使用实时计算的方式匹配用户习惯,进行定制化推荐,更进一步的深度学习,对相似用户进行推荐。
2.1 埋点数据基本格式
公共字段:基本所有安卓手机都包含的字段
业务字段:埋点上报的字段,有具体的业务类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35{
"ap":"xxxxx",//项目数据来源 app pc
"cm": { //公共字段
"mid": "", // (String) 设备唯一标识
"uid": "", // (String) 用户标识
"vc": "1", // (String) versionCode,程序版本号
"vn": "1.0", // (String) versionName,程序版本名
"l": "zh", // (String) language 系统语言
"sr": "", // (String) 渠道号,应用从哪个渠道来的。
"os": "7.1.1", // (String) Android 系统版本
"ar": "CN", // (String) area 区域
"md": "BBB100-1", // (String) model 手机型号
"ba": "blackberry", // (String) brand 手机品牌
"sv": "V2.2.1", // (String) sdkVersion
"g": "", // (String) gmail
"hw": "1620x1080", // (String) heightXwidth,屏幕宽高
"t": "1506047606608", // (String) 客户端日志产生时的时间
"nw": "WIFI", // (String) 网络模式
"ln": 0, // (double) lng 经度
"la": 0 // (double) lat 纬度
},
"et": [ //事件
{
"ett": "1506047605364", //客户端事件产生时间
"en": "display", //事件名称
"kv": { //事件结果,以 key-value 形式自行定义
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
}
]
}示例日志(服务器时间戳 | 日志),时间戳可以有效判定网络服务的通信时长:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
411540934156385| {
"ap": "gmall", //数仓库名
"cm": {
"uid": "1234",
"vc": "2",
"vn": "1.0",
"la": "EN",
"sr": "",
"os": "7.1.1",
"ar": "CN",
"md": "BBB100-1",
"ba": "blackberry",
"sv": "V2.2.1",
"g": "abc@gmail.com",
"hw": "1620x1080",
"t": "1506047606608",
"nw": "WIFI",
"ln": 0,
"la": 0
},
"et": [
{
"ett": "1506047605364", //客户端事件产生时间
"en": "display", //事件名称
"kv": { //事件结果,以 key-value 形式自行定义
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
},{
"ett": "1552352626835",
"en": "active_background",
"kv": {
"active_source": "1"
}
}
]
}
}2.2 埋点事件日志数据
2.2.1 商品列表页
事件名称:loading
标签 含义 action 动作:开始加载=1,加载成功=2,加载失败=3 loading_time 加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0,加载成 功或加载失败才上报时间) loading_way 加载类型:1-读取缓存,2-从接口拉新数据 (加载成功才上报加载类型) extend1 扩展字段 Extend1 extend2 扩展字段 Extend2 type 加载类型:自动加载=1,用户下拽加载=2,底部加载=3(底部条触发点击底部提示条/点击返回顶部加载) type1 加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败)
2.2.2 商品点击
- 事件标签:display
标签 含义 action 动作:曝光商品=1,点击商品=2 goodsid 商品 ID(服务端下发的 ID) place 顺序(第几条商品,第一条为 0,第二条为 1,如此类推) extend1 曝光类型:1 - 首次曝光 2-重复曝光 category 分类 ID(服务端定义的分类 ID)
2.2.3 商品详情页
- 事件标签:newsdetail
标签 含义 entry 页面入口来源:应用首页=1、push=2、详情页相关推荐=3 action 动作:开始加载=1,加载成功=2(pv),加载失败=3, 退出页面=4 goodsid 商品 ID(服务端下发的 ID) show_style 商品样式:0、无图、1、一张大图、2、两张图、3、三张小图、4、一张小图、 5、一张大图两张小图 news_staytime 页面停留时长:从商品开始加载时开始计算,到用户关闭页面所用的时间。 若中途用跳转到其它页面了,则暂停计时,待回到详情页时恢复计时。或中 途划出的时间超过 10 分钟,则本次计时作废,不上报本次数据。如未加载成 功退出,则报空。 loading_time 加载时长:计算页面开始加载到接口返回数据的时间 (开始加载报 0,加载 成功或加载失败才上报时间) type1 加载失败码:把加载失败状态码报回来(报空为加载成功,没有失败) category 分类 ID(服务端定义的分类 ID)
2.2.4 广告
- 事件名称:ad
标签 含义 entry 入口:商品列表页=1 应用首页=2 商品详情页=3 action 动作: 广告展示=1 广告点击=2 contentType Type: 1 商品 2 营销活动 displayMills 展示时长 毫秒数 itemId 商品 id activityId 营销活动 id
2.2.5 消息通知
- 事件标签:notification
标签 含义 action 动作:通知产生=1,通知弹出=2,通知点击=3,常驻通知展示(不重复上 报,一天之内只报一次)=4 type 通知 id:预警通知=1,天气预报(早=2,晚=3),常驻=4 ap_time 客户端弹出时间 content 备用字段
2.2.6 用户后台活跃
- 事件标签: active_background
标签 含义 active_source 1=upgrade,2=download(下载),3=plugin_upgrade
2.2.7 评论
- 描述:评论表(comment)
序号 字段名称 字段描述 字段类型 长度 允许空 缺省值 1 comment_id 评论表 int 10,0 2 userid 用户 id int 10,0 √ 0 3 p_comment_id 父级评论 id(为 0 则是
一级评论,不 为 0 则是回复)int 10,0 √ 4 content 评论内容 string 1000 √ 5 addtime 创建时间 string √ 6 other_id 评论的相关 id int 10,0 √ 7 praise_count 点赞数量 int 10,0 √ 0 8 reply_count 回复数量 int 10,0 √ 0
2.2.8 收藏
- 描述:收藏(favorites)
序号 字段名称 字段描述 字段类型 长度 允许空 缺省值 1 id 主键 int 10,0 2 course_id 商品 id int 10,0 √ 0 3 userid 用户 ID int 10,0 √ 0 4 add_time 创建时间 string √
2.2.9 点赞
- 描述:所有的点赞表(praise)
序号 字段名称 字段描述 字段类型 长度 允许空 缺省值 1 id 主键 id int 10,0 2 userid 用户 id int 10,0 √ 3 target_id 点赞的对象 id int 10,0 √ 4 type 创建点赞类型:1问答点赞 2问答评论点赞
3文章点赞数 4评论点赞int 10,0 √ 5 add_time 添加时间 string √
2.2.10 错误日志
errorBrief | 错误摘要 |
---|---|
errorBrief | 错误详情 |
2.3 埋点启动日志数据
1 | { |
- 事件标签: start
标签 含义 entry 入 口 : push=1 , widget=2 , icon=3 , notification=4, lockscreen_widget =5 open_ad_type 开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2 action 状态:成功=1 失败=2 loading_time 加载时长:计算下拉开始到接口返回数据的时间,(开始加载报 0,加载成 功或加载失败才上报时间) detail 失败码(没有则上报空) extend1 失败的 message(没有则上报空) en 日志类型 start
2.4 数据生成脚本
如下案例中将省略图中红框中的日志生成过程,直接使用Java程序构建logFile文件。
2.4.1 数据生成格式
- 启动日志
{“action”:”1”,”ar”:”MX”,”ba”:”Sumsung”,”detail”:”201”,”en”:”start”,”entry”:”4”,”extend1”:””,”g”:”69021X1Q@gmail.com“,”hw”:”1080*1920”,”l”:”pt”,”la”:”-11.0”,”ln”:”-70.0”,”loading_time”:”9”,”md”:”sumsung-5”,”mid”:”244”,”nw”:”3G”,”open_ad_type”:”1”,”os”:”8.2.3”,”sr”:”D”,”sv”:”V2.1.3”,”t”:”1589612165914”,”uid”:”244”,”vc”:”16”,”vn”:”1.2.1”}
- 事件日志(由于转换问题,图中没有 “时间戳|”)
1589695383284|{“cm”:{“ln”:”-79.4”,”sv”:”V2.5.3”,”os”:”8.0.6”,”g”:”81614U54@gmail.com“,”mid”:”245”,”nw”:”WIFI”,”l”:”pt”,”vc”:”6”,”hw”:”1080*1920”,”ar”:”MX”,”uid”:”245”,”t”:”1589627025851”,”la”:”-39.6”,”md”:”HTC-7”,”vn”:”1.3.5”,”ba”:”HTC”,”sr”:”N”},”ap”:”app”,”et”:[{“ett”:”1589650631883”,”en”:”display”,”kv”:{“goodsid”:”53”,”action”:”2”,”extend1”:”2”,”place”:”3”,”category”:”50”}},{“ett”:”1589690866312”,”en”:”newsdetail”,”kv”:{“entry”:”3”,”goodsid”:”54”,”news_staytime”:”1”,”loading_time”:”6”,”action”:”4”,”showtype”:”0”,”category”:”78”,”type1”:””}},{“ett”:”1589641734037”,”en”:”loading”,”kv”:{“extend2”:””,”loading_time”:”0”,”action”:”1”,”extend1”:””,”type”:”2”,”type1”:”201”,”loading_way”:”2”}},{“ett”:”1589687684878”,”en”:”ad”,”kv”:{“activityId”:”1”,”displayMills”:”92030”,”entry”:”3”,”action”:”5”,”contentType”:”0”}},{“ett”:”1589632980772”,”en”:”active_background”,”kv”:{“active_source”:”1”}},{“ett”:”1589682030324”,”en”:”error”,”kv”:{“errorDetail”:”java.lang.NullPointerException\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound”,”errorBrief”:”at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)”}},{“ett”:”1589675065650”,”en”:”comment”,”kv”:{“p_comment_id”:2,”addtime”:”1589624299628”,”praise_count”:509,”other_id”:6,”comment_id”:7,”reply_count”:35,”userid”:3,”content”:”关色芦候佰间纶珊斑禁尹赞涤仇彭企呵姜毅”}},{“ett”:”1589631359459”,”en”:”favorites”,”kv”:{“course_id”:7,”id”:0,”add_time”:”1589681240066”,”userid”:7}},{“ett”:”1589616574187”,”en”:”praise”,”kv”:{“target_id”:1,”id”:7,”type”:3,”add_time”:”1589642497314”,”userid”:8}}]}
2.4.2 创建maven工程
data-producer:pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63<!--版本号统一-->
<properties>
<slf4j.version>1.7.20</slf4j.version>
<logback.version>1.0.7</logback.version>
</properties>
<dependencies> <!--阿里巴巴开源 json 解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency> <!--日志生成框架-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--主类名-->
<mainClass>com.heaton.bigdata.datawarehouse.app.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>data-producer:logback.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false"> <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 -->
<property name="LOG_HOME" value="/root/logs/"/> <!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出:%d 表示日期,%thread 表示线程名,%-5level:级别从左显示 5 个字符宽度%msg: 日志消息,%n 是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender> <!-- 按照每天生成日志文件。存储事件日志 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>${LOG_HOME}/app.log</File>设置日志不超过${log.max.size}时的保存路径,注意, 如果是 web 项目会保存到 Tomcat 的 bin 目录 下 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志文件输出的文件名 -->
<FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern> <!--日志文件保留天数 -->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%msg%n</pattern>
</encoder> <!--日志文件最大的大小 -->
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender> <!--异步打印日志-->
<appender name="ASYNC_FILE"
class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的 80%已满,则会丢弃 TRACT、DEBUG、INFO 级别的日志 -->
<discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为 256 -->
<queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一个 -->
<appender-ref ref="FILE"/>
</appender> <!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC_FILE"/>
<appender-ref ref="error"/>
</root>
</configuration>data-flume:pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>hive-function:pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>2.4.3 各事件bean
data-producer工程
2.4.3.1 公共日志类
1 | import lombok.Data; |
2.4.3.2 启动日志类
1 | import lombok.Data; |
2.4.3.3 错误日志类
1 | import lombok.Data; |
2.4.3.4 商品点击日志类
1 | import lombok.Data; |
2.4.3.5 商品详情类
1 | import lombok.Data; |
2.4.3.6 商品列表类
1 | import lombok.Data; |
2.4.3.7 广告类
1 | import lombok.Data; |
2.4.3.8 消息通知日志类
1 | import lombok.Data; |
2.4.3.9 用户后台活跃类
1 | import lombok.Data; |
2.4.3.10 用户评论类
1 | import lombok.Data; |
2.4.3.11 用户收藏类
1 | import lombok.Data; |
2.4.3.12 用户点赞类
1 | import lombok.Data; |
2.4.4 启动类
1 | import com.alibaba.fastjson.JSON; |
2.4.5 启动测试
注意,需要将日志模拟放到2台服务器上,模拟日志每一条中即包括公共日志,又包含事件日志,需要flume拦截器进行日志分发,当然也需要两个flume-ng来做这个事情
打包上传2台服务器节点,生产数据为后面的测试做准备,这里为用户目录test文件夹下
通过参数控制生成消息速度及产量(如下 2秒一条,打印1000条)
1 | #控制时间及条数 |
通过www.json.cn查看数据格式
3 创建KafKa-Topic
- 创建启动日志主题:topic_start
- 创建事件日志主题:topic_event
4 Flume准备
共分为2组flume
第一组:将服务器日志收集,并使用Kafka-Channels将数据发往Kafka不同的Topic,其中使用拦截器进行公共日志和事件日志的分发,
第二组:收集Kafka数据,使用Flie-Channels缓存数据,最终发往Hdfs保存
4.1 Flume:File->Kafka配置编写
- vim /root/test/file-flume-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#1 定义组件
a1.sources=r1
a1.channels=c1 c2
# 2 source配置 type类型 positionFile记录日志读取位置 filegroups读取哪些目录 app.+为读取什么开头 channels发往哪里
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/test/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#3 拦截器 这里2个为自定义的拦截器 multiplexing为类型区分选择器 header头用于区分类型 mapping匹配头
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.heaton.bigdata.flume.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.heaton.bigdata.flume.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
#4 channel配置 kafkaChannel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer在生产日志的2台服务器节点上创建flume配置文件。
LogETLInterceptor,LogTypeInterceptor为自定义拦截
4.2 自定义拦截器
data-flume工程
LogUtils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
/** 服务器时间 | json
1588319303710|{
"cm":{
"ln":"-51.5","sv":"V2.0.7","os":"8.0.8","g":"L1470998@gmail.com","mid":"13",
"nw":"4G","l":"en","vc":"7","hw":"640*960","ar":"MX","uid":"13","t":"1588291826938",
"la":"-38.2","md":"Huawei-14","vn":"1.3.6","ba":"Huawei","sr":"Y"
},
"ap":"app",
"et":[{
"ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"}
},{
"ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""}
},{
"ett":"1588249203743","en":"active_background","kv":{"active_source":"3"}
},{
"ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0}
},{
"ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8}
}]
}
*/
// 1 切割
String[] logContents = log.split("\\|");
// 2 校验
if (logContents.length != 2) {
return false;
}
//3 校验服务器时间
if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
return false;
}
// 4 校验 json
if (!logContents[1].trim().startsWith("{")
|| !logContents[1].trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateStart(String log) {
/**
{
"action":"1","ar":"MX","ba":"HTC","detail":"201","en":"start","entry":"4","extend1":"",
"g":"4Z174142@gmail.com","hw":"750*1134","l":"pt","la":"-29.7","ln":"-48.1","loading_time":"0",
"md":"HTC-18","mid":"14","nw":"3G","open_ad_type":"2","os":"8.0.8","sr":"D","sv":"V2.8.2",
"t":"1588251833523","uid":"14","vc":"15","vn":"1.2.9"
}
*/
if (log == null) {
return false;
}
// 校验 json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
return false;
}
return true;
}
}LogETLInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
//初始化
}
@Override
public Event intercept(Event event) {
// 1 获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 判断数据类型并向 Header 中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if (LogUtils.validateEvent(log)) {
return event;
}
}
// 3 返回校验结果
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
if (intercept1 != null) {
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
//关闭
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}LogTypeInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 区分日志类型: body header
// 1 获取 body 数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 获取 header
Map<String, String> headers = event.getHeaders();
// 3 判断数据类型并向 Header 中赋值
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}将项目打包放入Flume/lib目录下(所有节点):
CDH路径参考:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/flume-ng/lib
4.3 Flume启停脚本
vim /root/log-kafka-flume.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17#! /bin/bash
case $1 in
"start"){
for i in cdh02.cm cdh03.cm
do
echo " --------启动 $i 消费 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/file-flume-kafka.log 2>&1 &"
done
};;
"stop"){
for i in cdh02.cm cdh03.cm
do
echo " --------停止 $i 消费 flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac4.4 Flume:Kafka->HDFS配置编写
在第三台服务上准备
vim /root/test/kafka-flume-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## Kafka-source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r1.kafka.topics = topic_start
## Kafka- source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r2.kafka.topics = topic_event
## channel1
a1.channels.c1.type = file
##索引文件路径
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior1
##持久化路径
a1.channels.c1.dataDirs = /root/test/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
##索引文件路径
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior2
##持久化路径
a1.channels.c1.dataDirs = /root/test/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## HDFS-sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
## HDFS-sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 50
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k2.hdfs.codeC = snappy
## 组件拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c24.5 Flume启停脚本
在第三台服务上准备
vim /root/test/kafka-hdfs-flume.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17#! /bin/bash
case $1 in
"start"){
for i in cdh01.cm
do
echo " --------启动 $i 消费 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/kafka-flume-hdfs.log 2>&1 &"
done
};;
"stop"){
for i in cdh01.cm
do
echo " --------停止 $i 消费 flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
5 业务数据
此模块后主要针对于企业报表决策,为数据分析提供数据支持,解决大数据量下,无法快速产出报表,及一些即席业务需求的快速展示提供数据支撑。划分企业离线与实时业务,用离线的方式直观的管理数据呈现,为实时方案奠定良好基础。
5.1 电商业务流程
5.2 SKU-SPU
- SKU(Stock Keeping Unit):库存量基本单位,现在已经被引申为产品统一编号的简称, 每种产品均对应有唯一的 SKU 号。
- SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的 标准化信息集合。
- 总结:黑鲨3 手机就是 SPU。一台铠甲灰、256G 内存的就是 SKU。
5.3 业务表结构
5.3.1 订单表(order_info)
5.3.2 订单详情表(order_detail)
5.3.3 SKU 商品表(sku_info)
5.3.4 用户表(user_info)
5.3.5 商品一级分类表(base_category1)
5.3.6 商品二级分类表(base_category2)
5.3.7 商品三级分类表(base_category3)
5.3.8 支付流水表(payment_info)
5.3.9 省份表(base_province)
5.3.10 地区表(base_region)
5.3.11 品牌表(base_trademark)
5.3.12 订单状态表(order_status_log)
5.3.13 SPU 商品表(spu_info)
5.3.14 商品评论表(comment_info)
5.3.15 退单表(order_refund_info)
5.3.16 加入购物车表(cart_info)
5.3.17 商品收藏表(favor_info)
5.3.18 优惠券领用表(coupon_use)
5.3.19 优惠券表(coupon_info)
5.3.20 活动表(activity_info)
5.3.21 活动订单关联表(activity_order)
5.3.22 优惠规则表(activity_rule)
5.3.23 编码字典表(base_dic)
5.3.24 活动参与商品表(activity_sku)
5.4 时间表结构
5.4.1 时间表(date_info)
5.4.2 假期表(holiday_info)
5.4.3 假期年表(holiday_year)
6 同步策略及数仓分层
数据同步策略的类型包括:全量表、增量表、新增及变化表
- 全量表:每天一个分区,存储完整的数据。
- 增量表:每天新增数据放在一个分区,存储新增加的数据。
- 新增及变化表:每天新增和变化的数据放在一个分区,存储新增加的数据和变化的数据。
- 特殊表:没有分区,只需要存储一次。
6.1 全量策略
每日全量,每天存储一份完整数据,作为一个分区。
适合场景:表数据量不大,且有新增或修改业务的场景
例如:品牌表、编码表、商品分类表、优惠规则表、活动表、商品表、加购表、收藏表、SKU/SPU表
6.2 增量策略
每日增量,每天储存一份增量数据,作为一个分区
适合场景:表数据量大,且只会有新增数据的场景。
例如:退单表、订单状态表、支付流水表、订单详情表、活动与订单关联表、商品评论表
6.3 新增及变化策略
每日新增及变化,储存创建时间和操作时间都是今天的数据,作为一个分区
适合场景:表数据量大,既会有新增,又会有修改。
例如:用户表、订单表、优惠卷领用表。
6.4 特殊策略
某些特殊的维度表,可不必遵循上述同步策略,在数仓中只做一次同步,数据不变化不更新
适合场景:表数据几乎不会变化
1.客观世界维度:没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一 份固定值
2.日期维度:日期维度可以一次性导入一年或若干年的数据。
3.地区维度:省份表、地区表
6.5 分析业务表同步策略
考虑到特殊表可能会缓慢变化,比如打仗占地盘,地区表可能就会发生变化,故也选择分区全量同步策略。
6.6 数仓分层
为什么分层:
- 简单化:把复杂的任务分解为多层来完成,每层处理各自的任务,方便定位问题。
- 减少重复开发:规范数据分层,通过中间层数据,能够极大的减少重复计算,增加结果复用性。
- 隔离数据:不论是数据异常还是数据敏感性,使真实数据和统计数据解耦。
- 一般在DWD层进行维度建模
ODS层:原始数据层,存放原始数据
DWD层:对ODS层数据进行清洗(去空、脏数据,转换类型等),维度退化,脱敏(保护隐私)
DWS层:以DWD为基础,按天进行汇总
DWT层:以DWS为基础,按主题进行汇总
7 Sqoop同步数据
Sqoop注意点:
Hive 中的 Null 在底层是以“\N”来存储,而 MySQL 中的 Null 在底层就是 Null,为了 保证数据两端的一致性。- 在导出数据时采用 –input-null-string 和 –input-null-non-string
- 导入数据时采用 –null-string 和 –null-non-string
本例思路为:sqoop抽取mysql数据上传至Hdfs上,存储为parquet文件,在建立hive-ods表,使用对应数据。
使用DolphinScheduler调度执行脚本。
Sqoop采集Mysql和Hive数据格式
mysql字段类型 hive:ods字段类型 hive:dwd-ads字段类型 tinyint tinyint tinyint int int int bigint bigint bigint varchar string string datetime bigint string bit boolean int double double double decimal decimal decimal
8 ods层构建
8.1 ods建表
hive创建ods数据库,使用DolphinScheduler创建数据源,在创建DAG时需要选择hive库。
顺便将dwd,dws,dwt,ads一起创建了
base_dic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15drop table if exists ods.mall__base_dic
CREATE EXTERNAL TABLE `ods.mall__base_dic`(
`dic_code` string COMMENT '编号',
`dic_name` string COMMENT '编码名称',
`parent_code` string COMMENT '父编号',
`create_time` bigint COMMENT '创建日期',
`operate_time` bigint COMMENT '修改日期'
) COMMENT '编码字典表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_dic/'
tblproperties ("parquet.compression"="snappy")base_trademark
1
2
3
4
5
6
7
8
9
10
11
12drop table if exists ods.mall__base_trademark
CREATE EXTERNAL TABLE `ods.mall__base_trademark`(
`tm_id` string COMMENT '品牌id',
`tm_name` string COMMENT '品牌名称'
) COMMENT '品牌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_trademark/'
tblproperties ("parquet.compression"="snappy")base_category3
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists ods.mall__base_category3
CREATE EXTERNAL TABLE `ods.mall__base_category3`(
`id` bigint COMMENT '编号',
`name` string COMMENT '三级分类名称',
`category2_id` bigint COMMENT '二级分类编号'
) COMMENT '三级分类表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category3/'
tblproperties ("parquet.compression"="snappy")base_category2
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists ods.mall__base_category2
CREATE EXTERNAL TABLE `ods.mall__base_category2`(
`id` bigint COMMENT '编号',
`name` string COMMENT '二级分类名称',
`category1_id` bigint COMMENT '一级分类编号'
) COMMENT '二级分类表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category2/'
tblproperties ("parquet.compression"="snappy")base_category1
1
2
3
4
5
6
7
8
9
10
11
12drop table if exists ods.mall__base_category1
CREATE EXTERNAL TABLE `ods.mall__base_category1`(
`id` bigint COMMENT '编号',
`name` string COMMENT '分类名称'
) COMMENT '一级分类表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category1/'
tblproperties ("parquet.compression"="snappy")activity_rule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists ods.mall__activity_rule
CREATE EXTERNAL TABLE `ods.mall__activity_rule`(
`id` int COMMENT '编号',
`activity_id` int COMMENT '类型',
`condition_amount` decimal(16,2) COMMENT '满减金额',
`condition_num` bigint COMMENT '满减件数',
`benefit_amount` decimal(16,2) COMMENT '优惠金额',
`benefit_discount` bigint COMMENT '优惠折扣',
`benefit_level` bigint COMMENT '优惠级别'
) COMMENT '优惠规则'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_rule/'
tblproperties ("parquet.compression"="snappy")activity_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16drop table if exists ods.mall__activity_info
CREATE EXTERNAL TABLE `ods.mall__activity_info`(
`id` bigint COMMENT '活动id',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` bigint COMMENT '开始时间',
`end_time` bigint COMMENT '结束时间',
`create_time` bigint COMMENT '创建时间'
) COMMENT '活动表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_info/'
tblproperties ("parquet.compression"="snappy")activity_sku
1
2
3
4
5
6
7
8
9
10
11
12
13
14drop table if exists ods.mall__activity_sku
CREATE EXTERNAL TABLE `ods.mall__activity_sku`(
`id` bigint COMMENT '编号',
`activity_id` bigint COMMENT '活动id',
`sku_id` bigint COMMENT 'sku_id',
`create_time` bigint COMMENT '创建时间'
) COMMENT '活动参与商品'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_sku/'
tblproperties ("parquet.compression"="snappy")cart_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20drop table if exists ods.mall__cart_info
CREATE EXTERNAL TABLE `ods.mall__cart_info`(
`id` bigint COMMENT '编号',
`user_id` bigint COMMENT '用户id',
`sku_id` bigint COMMENT 'sku_id',
`cart_price` decimal(10,2) COMMENT '放入购物车时价格',
`sku_num` bigint COMMENT '数量',
`sku_name` string COMMENT 'sku名称',
`create_time` bigint COMMENT '创建时间',
`operate_time` bigint COMMENT '修改时间',
`is_ordered` bigint COMMENT '是否已经下单',
`order_time` bigint COMMENT '下单时间'
) COMMENT '购物车表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/cart_info/'
tblproperties ("parquet.compression"="snappy")favor_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists ods.mall__favor_info
CREATE EXTERNAL TABLE `ods.mall__favor_info`(
`id` bigint COMMENT '编号',
`user_id` bigint COMMENT '用户id',
`sku_id` bigint COMMENT 'sku_id',
`spu_id` bigint COMMENT '商品id',
`is_cancel` string COMMENT '是否已取消 0 正常 1 已取消',
`create_time` bigint COMMENT '创建时间',
`cancel_time` bigint COMMENT '修改时间'
) COMMENT '商品收藏表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/favor_info/'
tblproperties ("parquet.compression"="snappy")coupon_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26drop table if exists ods.mall__coupon_info
CREATE EXTERNAL TABLE `ods.mall__coupon_info`(
`id` bigint COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` decimal(10,2) COMMENT '满额数',
`condition_num` bigint COMMENT '满件数',
`activity_id` bigint COMMENT '活动编号',
`benefit_amount` decimal(16,2) COMMENT '减金额',
`benefit_discount` bigint COMMENT '折扣',
`create_time` bigint COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` bigint COMMENT '商品id',
`tm_id` bigint COMMENT '品牌id',
`category3_id` bigint COMMENT '品类id',
`limit_num` int COMMENT '最多领用次数',
`operate_time` bigint COMMENT '修改时间',
`expire_time` bigint COMMENT '过期时间'
) COMMENT '优惠券表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_info/'
tblproperties ("parquet.compression"="snappy")sku_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19drop table if exists ods.mall__sku_info
CREATE EXTERNAL TABLE `ods.mall__sku_info`(
`id` bigint COMMENT 'skuid',
`spu_id` bigint COMMENT 'spuid',
`price` decimal(10,0) COMMENT '价格',
`sku_name` string COMMENT 'sku名称',
`sku_desc` string COMMENT '商品规格描述',
`weight` decimal(10,2) COMMENT '重量',
`tm_id` bigint COMMENT '品牌',
`category3_id` bigint COMMENT '三级分类id',
`create_time` bigint COMMENT '创建时间'
) COMMENT '库存单元表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/sku_info/'
tblproperties ("parquet.compression"="snappy")spu_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14drop table if exists ods.mall__spu_info
CREATE EXTERNAL TABLE `ods.mall__spu_info`(
`id` bigint COMMENT '商品id',
`spu_name` string COMMENT '商品名称',
`category3_id` bigint COMMENT '三级分类id',
`tm_id` bigint COMMENT '品牌id'
) COMMENT '商品表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/spu_info/'
tblproperties ("parquet.compression"="snappy")base_province
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15drop table if exists ods.mall__base_province
CREATE EXTERNAL TABLE `ods.mall__base_province`(
`id` bigint COMMENT 'id',
`name` string COMMENT '省名称',
`region_id` string COMMENT '大区id',
`area_code` string COMMENT '行政区位码',
`iso_code` string COMMENT '国际编码'
) COMMENT '省份表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_province/'
tblproperties ("parquet.compression"="snappy")base_region
1
2
3
4
5
6
7
8
9
10
11
12drop table if exists ods.mall__base_region
CREATE EXTERNAL TABLE `ods.mall__base_region`(
`id` string COMMENT '大区id',
`region_name` string COMMENT '大区名称'
) COMMENT '地区表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_region/'
tblproperties ("parquet.compression"="snappy")refund_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19drop table if exists ods.mall__order_refund_info
CREATE EXTERNAL TABLE `ods.mall__order_refund_info`(
`id` bigint COMMENT '编号',
`user_id` bigint COMMENT '用户id',
`order_id` bigint COMMENT '订单编号',
`sku_id` bigint COMMENT 'skuid',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退货件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '原因类型',
`create_time` bigint COMMENT '创建时间'
) COMMENT '退单表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_refund_info/'
tblproperties ("parquet.compression"="snappy")order_status_log
1
2
3
4
5
6
7
8
9
10
11
12
13
14drop table if exists ods.mall__order_status_log
CREATE EXTERNAL TABLE `ods.mall__order_status_log`(
`id` bigint COMMENT '编号',
`order_id` bigint COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`operate_time` bigint COMMENT '操作时间'
) COMMENT '订单状态表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_status_log/'
tblproperties ("parquet.compression"="snappy")payment_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19drop table if exists ods.mall__payment_info
CREATE EXTERNAL TABLE `ods.mall__payment_info`(
`id` bigint COMMENT '编号',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` bigint COMMENT '订单编号',
`user_id` bigint COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`total_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付方式',
`payment_time` bigint COMMENT '支付时间'
) COMMENT '支付流水表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/payment_info/'
tblproperties ("parquet.compression"="snappy")order_detail
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists ods.mall__order_detail
CREATE EXTERNAL TABLE `ods.mall__order_detail`(
`id` bigint COMMENT '编号',
`order_id` bigint COMMENT '订单编号',
`user_id` bigint COMMENT '用户id',
`sku_id` bigint COMMENT 'sku_id',
`sku_name` string COMMENT 'sku名称',
`order_price` decimal(10,2) COMMENT '购买价格(下单时sku价格)',
`sku_num` string COMMENT '购买个数',
`create_time` bigint COMMENT '创建时间'
) COMMENT '订单明细表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_detail/'
tblproperties ("parquet.compression"="snappy")activity_order
1
2
3
4
5
6
7
8
9
10
11
12
13
14drop table if exists ods.mall__activity_order
CREATE EXTERNAL TABLE `ods.mall__activity_order`(
`id` bigint COMMENT '编号',
`activity_id` bigint COMMENT '活动id',
`order_id` bigint COMMENT '订单编号',
`create_time` bigint COMMENT '发生日期'
) COMMENT '活动与订单关联表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_order/'
tblproperties ("parquet.compression"="snappy")comment_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists ods.mall__comment_info
CREATE EXTERNAL TABLE `ods.mall__comment_info`(
`id` bigint COMMENT '编号',
`user_id` bigint COMMENT '用户名称',
`sku_id` bigint COMMENT 'skuid',
`spu_id` bigint COMMENT '商品id',
`order_id` bigint COMMENT '订单编号',
`appraise` string COMMENT '评价 1 好评 2 中评 3 差评',
`comment_txt` string COMMENT '评价内容',
`create_time` bigint COMMENT '创建时间'
) COMMENT '商品评论表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/comment_info/'
tblproperties ("parquet.compression"="snappy")coupon_use
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists ods.mall__coupon_use
CREATE EXTERNAL TABLE `ods.mall__coupon_use`(
`id` bigint COMMENT '编号',
`coupon_id` bigint COMMENT '购物券ID',
`user_id` bigint COMMENT '用户ID',
`order_id` bigint COMMENT '订单ID',
`coupon_status` string COMMENT '购物券状态',
`get_time` bigint COMMENT '领券时间',
`using_time` bigint COMMENT '使用时间',
`used_time` bigint COMMENT '过期时间'
) COMMENT '优惠券领用表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_use/'
tblproperties ("parquet.compression"="snappy")user_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists ods.mall__user_info
CREATE EXTERNAL TABLE `ods.mall__user_info`(
`id` bigint COMMENT '编号',
`name` string COMMENT '用户姓名',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户级别',
`birthday` bigint COMMENT '用户生日',
`gender` string COMMENT '性别 M男,F女',
`create_time` bigint COMMENT '创建时间',
`operate_time` bigint COMMENT '修改时间'
) COMMENT '用户表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/user_info/'
tblproperties ("parquet.compression"="snappy")order_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21drop table if exists ods.mall__order_info
CREATE EXTERNAL TABLE `ods.mall__order_info`(
`id` bigint COMMENT '编号',
`final_total_amount` decimal(16,2) COMMENT '总金额',
`order_status` string COMMENT '订单状态',
`user_id` bigint COMMENT '用户id',
`out_trade_no` string COMMENT '订单交易编号(第三方支付用)',
`create_time` bigint COMMENT '创建时间',
`operate_time` bigint COMMENT '操作时间',
`province_id` int COMMENT '地区',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`feight_fee` decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_info/'
tblproperties ("parquet.compression"="snappy")start_log
此为埋点启动日志表
1 | drop table if exists ods.mall__start_log |
- event_log
此为埋点事件日志表
1 | drop table if exists ods.mall__event_log |
- date_info
此为时间表
1 | drop table if exists ods.mall__date_info |
8.2 mysql数据抽取
sqoop抽取脚本基础
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
mysql_db_name=${db_name}
mysql_db_addr=${db_addr}
mysql_db_user=${db_user}
mysql_db_password=${db_password}
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
echo "日期:"$db_date
echo "mysql库名:"$mysql_db_name
import_data() {
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/sqoop import \
--connect jdbc:mysql://$mysql_db_addr:3306/$mysql_db_name?tinyInt1isBit=false \
--username $mysql_db_user \
--password $mysql_db_password \
--target-dir /origin_data/$mysql_db_name/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--null-string '' \
--null-non-string '\\n' \
--fields-terminated-by "\t" \
--query "$2"' and $CONDITIONS;' \
--as-parquetfile
}DolphinScheduler全局参数
date | 不传为昨天 |
---|---|
db_name | 数据库名字 |
db_addr | 数据库IP地址 |
db_user | 数据库用户 |
db_password | 数据库密码 |
元数据中数据开始日期为2020-03-15
如下导入数据代码片段,拼接上述的基础片段执行
全量表代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114import_data "base_dic" "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
import_data "base_trademark" "select
tm_id,
tm_name
from base_trademark
where 1=1"
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
import_data "activity_rule" "select
id,
activity_id,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
import_data "activity_info" "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
import_data "activity_sku" "select
id,
activity_id,
sku_id,
create_time
FROM
activity_sku
where 1=1"
import_data "cart_info" "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from cart_info
where 1=1"
import_data "favor_info" "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
import_data "coupon_info" "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from coupon_info
where 1=1"
import_data "sku_info" "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info where 1=1"
import_data "spu_info" "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"特殊表代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25import_data "base_province" "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"
import_data "base_region" "select
id,
region_name
from base_region
where 1=1"
import_data "date_info" "select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from date_info
where 1=1"增量表代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67import_data "order_refund_info" "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from order_refund_info
where
date_format(create_time,'%Y-%m-%d')='$db_date'"
import_data "order_status_log" "select
id,
order_id,
order_status,
operate_time
from order_status_log
where
date_format(operate_time,'%Y-%m-%d')='$db_date'"
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
where
DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
import_data "order_detail" "select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time
from order_detail od
join order_info oi
on od.order_id=oi.id
where
DATE_FORMAT(od.create_time,'%Y-%m-%d')='$db_date'"
import_data "activity_order" "select
id,
activity_id,
order_id,
create_time
from activity_order
where
date_format(create_time,'%Y-%m-%d')='$db_date'"
import_data "comment_info" "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$db_date'"增量及变化表代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40import_data "coupon_use" "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$db_date'
or date_format(using_time,'%Y-%m-%d')='$db_date'
or date_format(used_time,'%Y-%m-%d')='$db_date')"
import_data "user_info" "select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
import_data "order_info" "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,'%Y-%m-%d')='$db_date'
or date_format(operate_time,'%Y-%m-%d')='$db_date')"8.3 ods层数据加载
脚本修改$table_name即可
注意2张埋点日志表的数据导出目录
1 | #!/bin/bash |
9 dwd层构建
9.1 dwd层构建(启动-事件日志)
9.1.1 启动日志表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33drop table if exists dwd.mall__start_log
CREATE EXTERNAL TABLE `dwd.mall__start_log`(
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`entry` string COMMENT '入口: push=1,widget=2,icon=3,notification=4,lockscreen_widget=5',
`open_ad_type` string COMMENT '开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2',
`action` string COMMENT '状态:成功=1 失败=2',
`loading_time` string COMMENT '加载时长',
`detail` string COMMENT '失败码',
`extend1` string COMMENT '失败的 message'
) COMMENT '启动日志表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/start_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=start_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from $hive_origin_table_name
where dt='$db_date';
"
$hive -e "$sql"9.1.2 事件日志表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30drop table if exists dwd.mall__event_log
CREATE EXTERNAL TABLE `dwd.mall__event_log`(
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`event_name` string COMMENT '事件名称',
`event_json` string COMMENT '事件详情',
`server_time` string COMMENT '服务器时间'
) COMMENT '事件日志表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/event_log/'
tblproperties ("parquet.compression"="snappy")9.2.1 制作 UDF UDTF
udf
1 | import org.apache.commons.lang.StringUtils; |
结果:
13
1588319303710
[{“ett”:”1588228193191”,”en”:”ad”,”kv”:{“activityId”:”1”,”displayMills”:”113201”,”entry”:”3”,”action”:”5”,”contentType”:”0”}},{“ett”:”1588300304713”,”en”:”notification”,”kv”:{“ap_time”:”1588277440794”,”action”:”2”,”type”:”3”,”content”:””}},{“ett”:”1588249203743”,”en”:”active_background”,”kv”:{“active_source”:”3”}},{“ett”:”1588225856101”,”en”:”comment”,”kv”:{“p_comment_id”:0,”addtime”:”1588263895040”,”praise_count”:231,”other_id”:5,”comment_id”:5,”reply_count”:62,”userid”:7,”content”:”骸汞”}},{“ett”:”1588254200122”,”en”:”favorites”,”kv”:{“course_id”:5,”id”:0,”add_time”:”1588264138625”,”userid”:0}},{“ett”:”1588281152824”,”en”:”praise”,”kv”:{“target_id”:4,”id”:3,”type”:3,”add_time”:”1588307696417”,”userid”:8}}]
- udtf
1 | import org.apache.commons.lang.StringUtils; |
9.1.2.2 直接永久使用UDF
- 上传UDF资源
将hive-function-1.0-SNAPSHOT包传到HDFS 的/user/hive/jars下
1 | hadoop dfs -mkdir /user/hive/jars |
在hive中创建永久UDF
1 | create function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar'; |
9.1.2.3 Dolphin使用方式UDF
在DAG图创建SQL工具中选择对应UDF函数即可使用,但是目前Dolphin1.2.0中关联函数操作保存无效。
大家可以使用UDF管理功能将JAR传入到HDFS上,这样通过脚本加入临时函数,也可以很好的完成功能。
临时函数语句:
1 | create temporary function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar'; |
9.2.4 数据导入
1 | #!/bin/bash |
9.1.3 商品点击表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33drop table if exists dwd.mall__display_log
CREATE EXTERNAL TABLE `dwd.mall__display_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
) COMMENT '商品点击表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/display_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=display_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='display';
"
$hive -e "$sql"9.1.4 商品列表表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35drop table if exists dwd.mall__loading_log
CREATE EXTERNAL TABLE `dwd.mall__loading_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
) COMMENT '商品列表表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/loading_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=loading_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.loading_way') loading_way,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.extend2') extend2,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.type1') type1,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='loading';
"
$hive -e "$sql"9.1.5 广告表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34drop table if exists dwd.mall__ad_log
CREATE EXTERNAL TABLE `dwd.mall__ad_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
) COMMENT '广告表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/ad_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=ad_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.contentType') contentType,
get_json_object(event_json,'$.kv.displayMills') displayMills,
get_json_object(event_json,'$.kv.itemId') itemId,
get_json_object(event_json,'$.kv.activityId') activityId,
server_time
from dwd.mall__event_log
where dt='db_date' and event_name='ad';
"
$hive -e "$sql"9.1.6 消息通知表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32drop table if exists dwd.mall__notification_log
CREATE EXTERNAL TABLE `dwd.mall__notification_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
) COMMENT '消息通知表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/notification_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=notification_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.noti_type') noti_type,
get_json_object(event_json,'$.kv.ap_time') ap_time,
get_json_object(event_json,'$.kv.content') content,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='notification';
"
$hive -e "$sql"9.1.7 用户后台活跃表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29drop table if exists dwd.mall__active_background_log
CREATE EXTERNAL TABLE `dwd.mall__active_background_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
) COMMENT '用户后台活跃表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/active_background_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=active_background_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='active_background';
"
$hive -e "$sql"9.1.8 评论表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36drop table if exists dwd.mall__comment_log
CREATE EXTERNAL TABLE `dwd.mall__comment_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
) COMMENT '评论表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/comment_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=comment_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
get_json_object(event_json,'$.kv.content') content,
get_json_object(event_json,'$.kv.addtime') addtime,
get_json_object(event_json,'$.kv.other_id') other_id,
get_json_object(event_json,'$.kv.praise_count') praise_count,
get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='comment';
"
$hive -e "$sql"9.1.9 收藏表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32drop table if exists dwd.mall__favorites_log
CREATE EXTERNAL TABLE `dwd.mall__favorites_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
) COMMENT '收藏表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/favorites_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=favorites_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='favorites';
"
$hive -e "$sql"9.1.10 点赞表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33drop table if exists dwd.mall__praise_log
CREATE EXTERNAL TABLE `dwd.mall__praise_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
) COMMENT '点赞表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/praise_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=praise_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='praise';
"
$hive -e "$sql"9.1.11 错误日志表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30drop table if exists dwd.mall__error_log
CREATE EXTERNAL TABLE `dwd.mall__error_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
) COMMENT '错误日志表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/error_log/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=error_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief,
get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='error';
"
$hive -e "$sql"9.2 dwd层构建(业务库)
此层在构建之初,增量表需要动态分区来划分时间,将数据放入指定分区
事实/维度 | 时间 | 用户 | 地区 | 商品 | 优惠卷 | 活动 | 编码 | 度量 |
---|---|---|---|---|---|---|---|---|
订单 | √ | √ | √ | √ | 件数/金额 | |||
订单详情 | √ | √ | √ | 件数/金额 | ||||
支付 | √ | √ | 次数/金额 | |||||
加入购物车 | √ | √ | √ | 件数/金额 | ||||
收藏 | √ | √ | √ | 个数 | ||||
评价 | √ | √ | √ | 个数 | ||||
退款 | √ | √ | √ | 件数/金额 | ||||
优惠卷领用 | √ | √ | √ | 个数 |
9.2.1 商品维度表(全量)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dwd.mall__dim_sku_info
CREATE EXTERNAL TABLE `dwd.mall__dim_sku_info`(
`id` string COMMENT '商品 id',
`spu_id` string COMMENT 'spuid',
`price` double COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` double COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`tm_name` string COMMENT '品牌名称',
`category3_id` string COMMENT '三级分类 id',
`category2_id` string COMMENT '二级分类 id',
`category1_id` string COMMENT '一级分类 id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
`spu_name` string COMMENT 'spu 名称',
`create_time` string COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_sku_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_sku_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
from_unixtime(cast(sku.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__sku_info where dt='$db_date'
)sku
join
(
select * from ods.mall__base_trademark where dt='$db_date'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ods.mall__spu_info where dt='$db_date'
)spu on spu.id = sku.spu_id
join
(
select * from ods.mall__base_category3 where dt='$db_date'
)c3 on sku.category3_id=c3.id
join
(
select * from ods.mall__base_category2 where dt='$db_date'
)c2 on c3.category2_id=c2.id
join
(
select * from ods.mall__base_category1 where dt='$db_date'
)c1 on c2.category1_id=c1.id;
"
$hive -e "$sql"9.2.2 优惠券信息维度表(全量)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dwd.mall__dim_coupon_info
CREATE EXTERNAL TABLE `dwd.mall__dim_coupon_info`(
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券信息维度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_coupon_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_coupon_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
from_unixtime(cast(expire_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') expire_time
from ods.mall__coupon_info
where dt='$db_date';
"
$hive -e "$sql"9.2.3 活动维度表(全量)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22drop table if exists dwd.mall__dim_activity_info
CREATE EXTERNAL TABLE `dwd.mall__dim_activity_info`(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`condition_amount` string COMMENT '满减金额',
`condition_num` string COMMENT '满减件数',
`benefit_amount` string COMMENT '优惠金额',
`benefit_discount` string COMMENT '优惠折扣',
`benefit_level` string COMMENT '优惠级别',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动维度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_activity_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_activity_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
from_unixtime(cast(info.start_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') start_time,
from_unixtime(cast(info.end_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') end_time,
from_unixtime(cast(info.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__activity_info where dt='$db_date'
)info
left join
(
select * from ods.mall__activity_rule where dt='$db_date'
)rule on info.id = rule.activity_id;
"
$hive -e "$sql"9.2.4 地区维度表(特殊)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists dwd.mall__dim_base_province
CREATE EXTERNAL TABLE `dwd.mall__dim_base_province`(
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'ISO 编码',
`region_id` string COMMENT '地区 id',
`region_name` string COMMENT '地区名称'
) COMMENT '地区维度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_base_province/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_base_province
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ods.mall__base_province bp
join ods.mall__base_region br
on bp.region_id=br.id;
"
$hive -e "$sql"9.2.5 时间维度表(特殊)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20drop table if exists dwd.mall__dim_date_info
CREATE EXTERNAL TABLE `dwd.mall__dim_date_info`(
`date_id` string COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第几天',
`day` int COMMENT '每月的第几天',
`month` int COMMENT '第几月',
`quarter` int COMMENT '第几季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是节假日'
) COMMENT '时间维度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_date_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_date_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from ods.mall__date_info
"
$hive -e "$sql"9.2.6 用户维度表(新增及变化-缓慢变化维-拉链表)
9.2.6.1 拉链表介绍
拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前日期放入生效开始日期。
如果当前信息至今有效,在生效结束日期中填入一个极大值(如:9999-99-99),下表为张三的手机号变化例子
用户ID | 姓名 | 手机号 | 开始日期 | 结束日期 |
---|---|---|---|---|
1 | 张三 | 134XXXX5050 | 2019-01-01 | 2019-01-02 |
1 | 张三 | 139XXXX3232 | 2019-01-03 | 2020-01-01 |
1 | 张三 | 137XXXX7676 | 2020-01-02 | 9999-99-99 |
适合场景:数据会发生变化,但是大部分不变(即:缓慢变化维)
比如:用户信息发生变化,但是每天变化比例不高,按照每日全量,则效率低
如何使用拉链表:通过–>生效开始日期<=某个日期 且 生效结束日期>=某个日期,能够得到某个时间点的数据全量切片。
- 拉链表形成过程
- 制作流程
用户当日全部数据和MySQL中每天变化的数据拼接在一起,形成一个<新的临时拉链表。
用临时拉链表覆盖旧的拉链表数据。
从而解决Hive中数据不能更新的问题
9.2.6.2 用户维度表
用户表中的数据每日既有可能新增,也有可能修改,属于缓慢变化维度,此处采用拉链表存储用户维度数据。
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists dwd.mall__dim_user_info_his
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his`(
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '用户拉链表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his/'
tblproperties ("parquet.compression"="snappy")临时表建表(结构与主表相同)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists dwd.mall__dim_user_info_his_tmp
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his_tmp`(
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '用户拉链表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="snappy")首先(主表)数据初始化,只做一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
'$db_date',
'9999-99-99'
from ods.mall__user_info oi
where oi.dt='$db_date';
"
$hive -e "$sql"临时表数据计算导入(在主表数据之后执行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his_tmp
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
*
from
( --查询当前时间的所有信息
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
'$db_date' start_date,
'9999-99-99' end_date
from ods.mall__user_info where dt='$db_date'
union all
--查询当前变化了的数据,修改日期
select
uh.id,
uh.name,
from_unixtime(cast(uh.birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
uh.gender,
uh.email,
uh.user_level,
from_unixtime(cast(uh.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(uh.operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1),uh.end_date) end_date
from dwd.mall__dim_user_info_his uh left join
(
--查询当前时间的所有信息
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
dt
from ods.mall__user_info
where dt='$db_date'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
"
$hive -e "$sql"数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select * from dwd.mall__dim_user_info_his_tmp;
"
$hive -e "$sql"9.2.7 订单详情事实表(事务型快照事实表-新增)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21drop table if exists dwd.mall__fact_order_detail
CREATE EXTERNAL TABLE `dwd.mall__fact_order_detail`(
`id` bigint COMMENT '编号',
`order_id` bigint COMMENT '订单编号',
`user_id` bigint COMMENT '用户id',
`sku_id` bigint COMMENT 'sku_id',
`sku_name` string COMMENT 'sku名称',
`order_price` decimal(10,2) COMMENT '购买价格(下单时sku价格)',
`sku_num` string COMMENT '购买个数',
`create_time` bigint COMMENT '创建时间',
`province_id` string COMMENT '省份ID',
`total_amount` decimal(20,2) COMMENT '订单总金额'
) COMMENT '订单明细表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_detail/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_detail
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from (select * from ods.mall__order_detail where dt='$db_date' ) od
join (select * from ods.mall__order_info where dt='$db_date' ) oi
on od.order_id=oi.id;
"
$hive -e "$sql"9.2.7 支付事实表(事务型快照事实表-新增)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21drop table if exists dwd.mall__fact_payment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_payment_info`(
`id` string COMMENT '',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`payment_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间',
`province_id` string COMMENT '省份 ID'
) COMMENT '支付事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_payment_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_payment_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
from_unixtime(cast(pi.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') payment_time,
oi.province_id
from
(
select * from ods.mall__payment_info where dt='$db_date'
)pi
join
(
select id, province_id from ods.mall__order_info where dt='$db_date'
)oi
on pi.order_id = oi.id;
"
$hive -e "$sql"9.2.8 退款事实表(事务型快照事实表-新增)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20drop table if exists dwd.mall__fact_order_refund_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_refund_info`(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`order_id` string COMMENT '订单 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退款件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_refund_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_refund_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__order_refund_info
where dt='$db_date';
"
$hive -e "$sql"9.2.9 评价事实表(事务型快照事实表-新增)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists dwd.mall__fact_comment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_comment_info`(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '订单 ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_comment_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_comment_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__comment_info
where dt='$db_date';
"
$hive -e "$sql"9.2.10 加购事实表(周期型快照事实表-全量)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21drop table if exists dwd.mall__fact_cart_info
CREATE EXTERNAL TABLE `dwd.mall__fact_cart_info`(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku 名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单。1 为已下单;0 为未下单',
`order_time` string COMMENT '下单时间'
) COMMENT '加购事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_cart_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_cart_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
is_ordered,
from_unixtime(cast(order_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') order_time
from ods.mall__cart_info
where dt='$db_date';
"
$hive -e "$sql"9.2.11 收藏事实表(周期型快照事实表-全量)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18drop table if exists dwd.mall__fact_favor_info
CREATE EXTERNAL TABLE `dwd.mall__fact_favor_info`(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_favor_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_favor_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') cancel_time
from ods.mall__favor_info
where dt='$db_date';
"
$hive -e "$sql"9.2.12 优惠券领用事实表(累积型快照事实表-新增及变化)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19drop table if exists dwd.mall__fact_coupon_use
CREATE EXTERNAL TABLE `dwd.mall__fact_coupon_use`(
`` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券 ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '订单 id',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_coupon_use/'
tblproperties ("parquet.compression"="snappy")dt 是按照优惠卷领用时间 get_time 做为分区。
get_time 为领用时间,领用过后数据就需要存在,然后在下单和支付的时候叠加更新时间
数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_coupon_use
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
if(new.id is null,old.id,new.id) id,
if(new.coupon_id is null,old.coupon_id,new.coupon_id) coupon_id,
if(new.user_id is null,old.user_id,new.user_id) user_id,
if(new.order_id is null,old.order_id,new.order_id) order_id,
if(new.coupon_status is null,old.coupon_status,new.coupon_status) coupon_status,
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd') get_time,
from_unixtime(cast(if(new.using_time is null,old.using_time,new.using_time)/1000 as bigint),'yyyy-MM-dd') using_time,
from_unixtime(cast(if(new.used_time is null,old.used_time,new.used_time)/1000 as bigint),'yyyy-MM-dd'),
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd.mall__fact_coupon_use
where dt in
(
select
from_unixtime(cast(get_time/1000 as bigint),'yyyy-MM-dd')
from ods.mall__coupon_use
where dt='$db_date'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods.mall__coupon_use
where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"9.2.13 订单事实表(累积型快照事实表-新增及变化)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dwd.mall__fact_order_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_info`(
`id` string COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户 id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间(未支付状态)',
`payment_time` string COMMENT '支付时间(已支付状态)',
`cancel_time` string COMMENT '取消时间(已取消状态)',
`finish_time` string COMMENT '完成时间(已完成状态)',
`refund_time` string COMMENT '退款时间(退款中状态)',
`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)',
`province_id` string COMMENT '省份 ID',
`activity_id` string COMMENT '活动 ID',
`original_total_amount` string COMMENT '原价金额',
`benefit_reduce_amount` string COMMENT '优惠金额',
`feight_fee` string COMMENT '运费',
`final_total_amount` decimal(10,2) COMMENT '订单金额'
) COMMENT '订单事实表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_info/'
tblproperties ("parquet.compression"="snappy")数据导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,from_unixtime(cast(old.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1001']),--1001 对应未支付状态
if(new.tms['1002'] is null,from_unixtime(cast(old.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1002']),
if(new.tms['1003'] is null,from_unixtime(cast(old.cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1003']),
if(new.tms['1004'] is null,from_unixtime(cast(old.finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1004']),
if(new.tms['1005'] is null,from_unixtime(cast(old.refund_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1005']),
if(new.tms['1006'] is null,from_unixtime(cast(old.refund_finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount)
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd.mall__fact_order_info
where dt in
(
select
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd')
from ods.mall__order_info
where dt='$db_date'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd')))),',','=')
tms
from ods.mall__order_status_log
where dt='$db_date'
group by order_id
)log
join
(
select * from ods.mall__order_info where dt='$db_date'
)info
on log.order_id=info.id
left join
(
select * from ods.mall__activity_order where dt='$db_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
$hive -e "$sql"10 DWS层构建
不在进行压缩处理,因为压缩对于硬盘是好的,但是对于CPU计算是差的,对于DWS层的表,会被经常使用,那么讲究的是计算效率,此层主要处理每日主题行为
10.1 每日设备行为(用户行为)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dws.mall__uv_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__uv_detail_daycount`(
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '安卓系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`login_count` bigint COMMENT '活跃次数'
) COMMENT '每日设备行为表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/uv_detail_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=uv_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
concat_ws('|', collect_set(user_id)) user_id,
concat_ws('|', collect_set(version_code)) version_code,
concat_ws('|', collect_set(version_name)) version_name,
concat_ws('|', collect_set(lang))lang,
concat_ws('|', collect_set(source)) source,
concat_ws('|', collect_set(os)) os,
concat_ws('|', collect_set(area)) area,
concat_ws('|', collect_set(model)) model,
concat_ws('|', collect_set(brand)) brand,
concat_ws('|', collect_set(sdk_version)) sdk_version,
concat_ws('|', collect_set(gmail)) gmail,
concat_ws('|', collect_set(height_width)) height_width,
concat_ws('|', collect_set(app_time)) app_time,
concat_ws('|', collect_set(network)) network,
concat_ws('|', collect_set(lng)) lng,
concat_ws('|', collect_set(lat)) lat,
count(*) login_count
from dwd.mall__start_log
where dt='$db_date'
group by mid_id;
"
$hive -e "$sql"10.2 每日会员行为(业务)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists dws.mall__user_action_daycount
CREATE EXTERNAL TABLE `dws.mall__user_action_daycount`(
user_id string comment '用户 id',
login_count bigint comment '登录次数',
cart_count bigint comment '加入购物车次数',
cart_amount double comment '加入购物车金额',
order_count bigint comment '下单次数',
order_amount decimal(16,2) comment '下单金额',
payment_count bigint comment '支付次数',
payment_amount decimal(16,2) comment '支付金额'
) COMMENT '每日会员行为表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/user_action_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=user_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_login as
(
select
user_id,
count(*) login_count
from dwd.mall__start_log
where dt='$db_date'
and user_id is not null
group by user_id
),
tmp_cart as
(
select
user_id,
count(*) cart_count,
sum(cart_price*sku_num) cart_amount
from dwd.mall__fact_cart_info
where dt='$db_date'
and user_id is not null
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by user_id
),
tmp_order as
(
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from dwd.mall__fact_order_info
where dt='$db_date'
group by user_id
) ,
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd.mall__fact_payment_info
where dt='$db_date'
group by user_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
user_actions.user_id,
sum(user_actions.login_count),
sum(user_actions.cart_count),
sum(user_actions.cart_amount),
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from
(
select
user_id,
login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_login
union all
select
user_id,
0 login_count,
cart_count,
cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_cart
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
) user_actions
group by user_id;
"
$hive -e "$sql"10.3 每日商品行为(业务)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26drop table if exists dws.mall__sku_action_daycount
CREATE EXTERNAL TABLE `dws.mall__sku_action_daycount`(
sku_id string comment 'sku_id',
order_count bigint comment '被下单次数',
order_num bigint comment '被下单件数',
order_amount decimal(16,2) comment '被下单金额',
payment_count bigint comment '被支付次数',
payment_num bigint comment '被支付件数',
payment_amount decimal(16,2) comment '被支付金额',
refund_count bigint comment '被退款次数',
refund_num bigint comment '被退款件数',
refund_amount decimal(16,2) comment '被退款金额',
cart_count bigint comment '被加入购物车次数',
cart_num bigint comment '被加入购物车件数',
favor_count bigint comment '被收藏次数',
appraise_good_count bigint comment '好评数',
appraise_mid_count bigint comment '中评数',
appraise_bad_count bigint comment '差评数',
appraise_default_count bigint comment '默认评价数'
) COMMENT '每日商品行为表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sku_action_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sku_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_order as
(
select
cast(sku_id as string) sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
group by sku_id
),
tmp_payment as
(
select
cast(sku_id as string) sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(total_amount) payment_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
and order_id in
(
select
id
from dwd.mall__fact_order_info
where (dt='$db_date' or dt=date_add('$db_date',-1))
and date_format(payment_time,'yyyy-MM-dd')='$db_date'
)
group by sku_id
),
tmp_refund as
(
select
cast(sku_id as string) sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from dwd.mall__fact_order_refund_info
where dt='$db_date'
group by sku_id
),
tmp_cart as
(
select
cast(sku_id as string) sku_id,
count(*) cart_count,
sum(sku_num) cart_num
from dwd.mall__fact_cart_info
where dt='$db_date'
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by sku_id
),
tmp_favor as
(
select
cast(sku_id as string) sku_id,
count(*) favor_count
from dwd.mall__fact_favor_info
where dt='$db_date'
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by sku_id
),
tmp_appraise as
(
select
cast(sku_id as string) sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from dwd.mall__fact_comment_info
where dt='$db_date'
group by sku_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(cart_num),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;
"
$hive -e "$sql"10.4 每日优惠券统计(业务)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26drop table if exists dws.mall__coupon_use_daycount
CREATE EXTERNAL TABLE `dws.mall__coupon_use_daycount`(
`coupon_id` string COMMENT '优惠券 ID',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数',
`get_count` bigint COMMENT '领用次数',
`using_count` bigint COMMENT '使用(下单)次数',
`used_count` bigint COMMENT '使用(支付)次数'
) COMMENT '每日优惠券统计表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/coupon_use_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=coupon_use_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
cu.coupon_id,
ci.coupon_name,
ci.coupon_type,
ci.condition_amount,
ci.condition_num,
ci.activity_id,
ci.benefit_amount,
ci.benefit_discount,
ci.create_time,
ci.range_type,
ci.spu_id,
ci.tm_id,
ci.category3_id,
ci.limit_num,
cu.get_count,
cu.using_count,
cu.used_count
from
(
select
coupon_id,
sum(if(date_format(get_time,'yyyy-MM-dd')='$db_date',1,0))
get_count,
sum(if(date_format(using_time,'yyyy-MM-dd')='$db_date',1,0))
using_count,
sum(if(date_format(used_time,'yyyy-MM-dd')='$db_date',1,0))
used_count
from dwd.mall__fact_coupon_use
where dt='$db_date'
group by coupon_id
)cu
left join
(
select
*
from dwd.mall__dim_coupon_info
where dt='$db_date'
)ci on cu.coupon_id=ci.id;
"
$hive -e "$sql"10.5 每日活动统计(业务)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists dws.mall__activity_info_daycount
CREATE EXTERNAL TABLE `dws.mall__activity_info_daycount`(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间',
`order_count` bigint COMMENT '下单次数',
`payment_count` bigint COMMENT '支付次数'
) COMMENT '每日活动统计表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/activity_info_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=activity_info_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
oi.activity_id,
ai.activity_name,
ai.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
oi.order_count,
oi.payment_count
from
(
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$db_date',1,0))
order_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$db_date',1,0))
payment_count
from dwd.mall__fact_order_info
where (dt='$db_date' or dt=date_add('$db_date',-1))
and activity_id is not null
group by activity_id
)oi
join
(
select
*
from dwd.mall__dim_activity_info
where dt='$db_date'
)ai
on oi.activity_id=ai.id;
"
$hive -e "$sql"10.6 每日购买行为(业务)
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dws.mall__sale_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__sale_detail_daycount`(
user_id string comment '用户 id',
sku_id string comment '商品 id',
user_gender string comment '用户性别',
user_age string comment '用户年龄',
user_level string comment '用户等级',
order_price decimal(10,2) comment '商品价格',
sku_name string comment '商品名称',
sku_tm_id string comment '品牌 id',
sku_category3_id string comment '商品三级品类 id',
sku_category2_id string comment '商品二级品类 id',
sku_category1_id string comment '商品一级品类 id',
sku_category3_name string comment '商品三级品类名称',
sku_category2_name string comment '商品二级品类名称',
sku_category1_name string comment '商品一级品类名称',
spu_id string comment '商品 spu',
sku_num int comment '购买个数',
order_count bigint comment '当日下单单数',
order_amount decimal(16,2) comment '当日下单金额'
) COMMENT '每日购买行为表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sale_detail_daycount/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sale_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
op.user_id,
op.sku_id,
ui.gender,
months_between('$db_date', ui.birthday)/12 age,
ui.user_level,
si.price,
si.sku_name,
si.tm_id,
si.category3_id,
si.category2_id,
si.category1_id,
si.category3_name,
si.category2_name,
si.category1_name,
si.spu_id,
op.sku_num,
op.order_count,
op.order_amount
from
(
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
group by user_id, sku_id
)op
join
(
select
*
from dwd.mall__dim_user_info_his
where end_date='9999-99-99'
)ui on op.user_id = ui.id
join
(
select
*
from dwd.mall__dim_sku_info
where dt='$db_date'
)si on op.sku_id = si.id;
"
$hive -e "$sql"11 DWT层构建
此层主要针对dws层每日数据进行汇总,不建立分区,不压缩,每日进行数据覆盖
11.1 设备主题宽表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27drop table if exists dwt.mall__uv_topic
CREATE EXTERNAL TABLE `dwt.mall__uv_topic`(
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '安卓系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`login_date_first` string comment '首次活跃时间',
`login_date_last` string comment '末次活跃时间',
`login_day_count` bigint comment '当日活跃次数',
`login_count` bigint comment '累积活跃天数'
) COMMENT '设备主题宽表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/uv_topic/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=uv_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.mid_id,old.mid_id),
nvl(new.user_id,old.user_id),
nvl(new.version_code,old.version_code),
nvl(new.version_name,old.version_name),
nvl(new.lang,old.lang),
nvl(new.source,old.source),
nvl(new.os,old.os),
nvl(new.area,old.area),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
nvl(new.sdk_version,old.sdk_version),
nvl(new.gmail,old.gmail),
nvl(new.height_width,old.height_width),
nvl(new.app_time,old.app_time),
nvl(new.network,old.network),
nvl(new.lng,old.lng),
nvl(new.lat,old.lat),
if(old.mid_id is null,'2020-03-10',old.login_date_first),
if(new.mid_id is not null,'2020-03-10',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from dwt.mall__uv_topic
)old
full outer join
(
select
*
from dws.mall__uv_detail_daycount
where dt='$db_date'
)new
on old.mid_id=new.mid_id;
"
$hive -e "$sql"11.2 会员主题宽表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23drop table if exists dwt.mall__user_topic
CREATE EXTERNAL TABLE `dwt.mall__user_topic`(
user_id string comment '用户 id',
login_date_first string comment '首次登录时间',
login_date_last string comment '末次登录时间',
login_count bigint comment '累积登录天数',
login_last_30d_count bigint comment '最近 30 日登录天数',
order_date_first string comment '首次下单时间',
order_date_last string comment '末次下单时间',
order_count bigint comment '累积下单次数',
order_amount decimal(16,2) comment '累积下单金额',
order_last_30d_count bigint comment '最近 30 日下单次数',
order_last_30d_amount bigint comment '最近 30 日下单金额',
payment_date_first string comment '首次支付时间',
payment_date_last string comment '末次支付时间',
payment_count decimal(16,2) comment '累积支付次数',
payment_amount decimal(16,2) comment '累积支付金额',
payment_last_30d_count decimal(16,2) comment '最近 30 日支付次数',
payment_last_30d_amount decimal(16,2) comment '最近 30 日支付金额'
) COMMENT '会员主题宽表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/user_topic/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and
new.login_count>0,'$db_date',old.login_date_first),
if(new.login_count>0,'$db_date',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and
new.order_count>0,'$db_date',old.order_date_first),
if(new.order_count>0,'$db_date',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and
new.payment_count>0,'$db_date',old.payment_date_first),
if(new.payment_count>0,'$db_date',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
dwt.mall__user_topic old
full outer join
(
select
user_id,
sum(if(dt='$db_date',login_count,0)) login_count,
sum(if(dt='$db_date',order_count,0)) order_count,
sum(if(dt='$db_date',order_amount,0)) order_amount,
sum(if(dt='$db_date',payment_count,0)) payment_count,
sum(if(dt='$db_date',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws.mall__user_action_daycount
where dt>=date_add( '$db_date',-30)
group by user_id
)new
on old.user_id=new.user_id;
"
$hive -e "$sql"11.3 商品主题宽表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40drop table if exists dwt.mall__sku_topic
CREATE EXTERNAL TABLE `dwt.mall__sku_topic`(
sku_id string comment 'sku_id',
spu_id string comment 'spu_id',
order_last_30d_count bigint comment '最近 30 日被下单次数',
order_last_30d_num bigint comment '最近 30 日被下单件数',
order_last_30d_amount decimal(16,2) comment '最近 30 日被下单金额',
order_count bigint comment '累积被下单次数',
order_num bigint comment '累积被下单件数',
order_amount decimal(16,2) comment '累积被下单金额',
payment_last_30d_count bigint comment '最近 30 日被支付次数',
payment_last_30d_num bigint comment '最近 30 日被支付件数',
payment_last_30d_amount decimal(16,2) comment '最近 30 日被支付金额',
payment_count bigint comment '累积被支付次数',
payment_num bigint comment '累积被支付件数',
payment_amount decimal(16,2) comment '累积被支付金额',
refund_last_30d_count bigint comment '最近三十日退款次数',
refund_last_30d_num bigint comment '最近三十日退款件数',
refund_last_30d_amount decimal(10,2) comment '最近三十日退款金额',
refund_count bigint comment '累积退款次数',
refund_num bigint comment '累积退款件数',
refund_amount decimal(10,2) comment '累积退款金额',
cart_last_30d_count bigint comment '最近 30 日被加入购物车次数',
cart_last_30d_num bigint comment '最近 30 日被加入购物车件数',
cart_count bigint comment '累积被加入购物车次数',
cart_num bigint comment '累积被加入购物车件数',
favor_last_30d_count bigint comment '最近 30 日被收藏次数',
favor_count bigint comment '累积被收藏次数',
appraise_last_30d_good_count bigint comment '最近 30 日好评数',
appraise_last_30d_mid_count bigint comment '最近 30 日中评数',
appraise_last_30d_bad_count bigint comment '最近 30 日差评数',
appraise_last_30d_default_count bigint comment '最近 30 日默认评价数',
appraise_good_count bigint comment '累积好评数',
appraise_mid_count bigint comment '累积中评数',
appraise_bad_count bigint comment '累积差评数',
appraise_default_count bigint comment '累积默认评价数'
) COMMENT '商品主题宽表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/sku_topic/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=sku_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.sku_id,old.sku_id), sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(new.cart_num30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(old.cart_num,0) + nvl(new.cart_num,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count,
order_num,
order_amount ,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count,
payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_last_30d_num,
cart_count,
cart_num,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from dwt.mall__sku_topic
)old
full outer join
(
select
sku_id,
sum(if(dt='$db_date', order_count,0 )) order_count,
sum(if(dt='$db_date',order_num ,0 )) order_num,
sum(if(dt='$db_date',order_amount,0 )) order_amount ,
sum(if(dt='$db_date',payment_count,0 )) payment_count,
sum(if(dt='$db_date',payment_num,0 )) payment_num,
sum(if(dt='$db_date',payment_amount,0 )) payment_amount,
sum(if(dt='$db_date',refund_count,0 )) refund_count,
sum(if(dt='$db_date',refund_num,0 )) refund_num,
sum(if(dt='$db_date',refund_amount,0 )) refund_amount,
sum(if(dt='$db_date',cart_count,0 )) cart_count,
sum(if(dt='$db_date',cart_num,0 )) cart_num,
sum(if(dt='$db_date',favor_count,0 )) favor_count,
sum(if(dt='$db_date',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='$db_date',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='$db_date',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='$db_date',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(cart_num) cart_num30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from dws.mall__sku_action_daycount
where dt >= date_add ('$db_date', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(
select * from dwd.mall__dim_sku_info where dt='$db_date'
) sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;
"
$hive -e "$sql"11.4 优惠卷主题宽表
建表
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists dwt.mall__coupon_topic
CREATE EXTERNAL TABLE `dwt.mall__coupon_topic`(
`coupon_id` string COMMENT '优惠券 ID',
`get_day_count` bigint COMMENT '当日领用次数',
`using_day_count` bigint COMMENT '当日使用(下单)次数',
`used_day_count` bigint COMMENT '当日使用(支付)次数',
`get_count` bigint COMMENT '累积领用次数',
`using_count` bigint COMMENT '累积使用(下单)次数',
`used_count` bigint COMMENT '累积使用(支付)次数'
) COMMENT '优惠券主题宽表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/coupon_topic/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=coupon_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.coupon_id,old.coupon_id),
nvl(new.get_count,0),
nvl(new.using_count,0),
nvl(new.used_count,0),
nvl(old.get_count,0)+nvl(new.get_count,0),
nvl(old.using_count,0)+nvl(new.using_count,0),
nvl(old.used_count,0)+nvl(new.used_count,0)
from
(
select
*
from dwt.mall__coupon_topic
)old
full outer join
(
select
coupon_id,
get_count,
using_count,
used_count
from dws.mall__coupon_use_daycount
where dt='$db_date'
)new
on old.coupon_id=new.coupon_id;
"
$hive -e "$sql"11.5 活动主题宽表
建表
1
2
3
4
5
6
7
8
9
10
11
12drop table if exists dwt.mall__activity_topic
CREATE EXTERNAL TABLE `dwt.mall__activity_topic`(
`id` string COMMENT '活动 id',
`activity_name` string COMMENT '活动名称',
`order_day_count` bigint COMMENT '当日日下单次数',
`payment_day_count` bigint COMMENT '当日支付次数',
`order_count` bigint COMMENT '累积下单次数',
`payment_count` bigint COMMENT '累积支付次数'
) COMMENT '活动主题宽表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/activity_topic/'导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=activity_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.order_count,0),
nvl(new.payment_count,0),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(
select
*
from dwt.mall__activity_topic
)old
full outer join
(
select
id,
activity_name,
order_count,
payment_count
from dws.mall__activity_info_daycount
where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"12 ADS层构建
此层为最终数据需求层,考虑数据导出和数据数量决定是否需要压缩,不需要分区,每天刷写
12.1 设备主题
12.1.1 活跃设备数(日、周、月)
日活:当日活跃的设备数
周活:当周活跃的设备数
月活:当月活跃的设备数
建表
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists ads.mall__uv_count
CREATE EXTERNAL TABLE `ads.mall__uv_count`(
`dt` string COMMENT '统计日期',
`day_count` bigint COMMENT '当日用户数量',
`wk_count` bigint COMMENT '当周用户数量',
`mn_count` bigint COMMENT '当月用户数量',
`is_weekend` string COMMENT 'Y,N 是否是周末,用于得到本周最终结果',
`is_monthend` string COMMENT 'Y,N 是否是月末,用于得到本月最终结果'
) COMMENT '活跃设备数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/uv_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
daycount.ct,
wkcount.ct,
mncount.ct,
if(date_add(next_day('$db_date','MO'),-1)='$db_date','Y','N') ,
if(last_day('$db_date')='$db_date','Y','N')
from
(
select
'$db_date' dt,
count(*) ct
from dwt.mall__uv_topic
where login_date_last='$db_date'
)daycount join
(
select
'$db_date' dt,
count (*) ct
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day('$db_date','MO'),-7)
and login_date_last<= date_add(next_day('$db_date','MO'),-1)
) wkcount on daycount.dt=wkcount.dt
join
(
select
'$db_date' dt,
count (*) ct
from dwt.mall__uv_topic
where
date_format(login_date_last,'yyyy-MM')=date_format('$db_date','yyyy-MM')
)mncount on daycount.dt=mncount.dt;
"
$hive -e "$sql"12.1.2 每日新增设备
建表
1
2
3
4
5
6
7
8
9drop table if exists ads.mall__new_mid_count
CREATE EXTERNAL TABLE `ads.mall__new_mid_count`(
`create_date` string comment '创建时间' ,
`new_mid_count` bigint comment '新增设备数量'
) COMMENT '每日新增设备表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/new_mid_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=new_mid_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
login_date_first,
count(*)
from dwt.mall__uv_topic
where login_date_first='$db_date'
group by login_date_first;
"
$hive -e "$sql"12.1.3 沉默用户数
沉默用户:只在安装当天启动过,且启动时间是在 7 天前
建表
1
2
3
4
5
6
7
8
9drop table if exists ads.mall__silent_count
CREATE EXTERNAL TABLE `ads.mall__silent_count`(
`dt` string COMMENT '统计日期',
`silent_count` bigint COMMENT '沉默设备数'
) COMMENT '沉默用户数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/silent_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=silent_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from dwt.mall__uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add('$db_date',-7);
"
$hive -e "$sql"12.1.4 本周回流用户数
本周回流用户:上周未活跃,本周活跃的设备,且不是本周新增设备
建表
1
2
3
4
5
6
7
8
9drop table if exists ads.mall__back_count
CREATE EXTERNAL TABLE `ads.mall__back_count`(
`wk_dt` string COMMENT '统计日期所在周',
`wastage_count` bigint COMMENT '回流设备数'
) COMMENT '本周回流用户数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/back_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=back_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day('$db_date','MO'),-7)
and login_date_last<= date_add(next_day('$db_date','MO'),-1)
and login_date_first<date_add(next_day('$db_date','MO'),-7)
)current_wk
left join
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','MO'),-7*2)
and dt<= date_add(next_day('$db_date','MO'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
"
$hive -e "$sql"12.1.5 流失用户数
流失用户:最近 7 天未活跃的设备
建表
1
2
3
4
5
6
7
8
9drop table if exists ads.mall__wastage_count
CREATE EXTERNAL TABLE `ads.mall__wastage_count`(
`dt` string COMMENT '统计日期',
`wastage_count` bigint COMMENT '流失设备数'
) COMMENT '流失用户数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/wastage_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=wastage_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last<=date_add('$db_date',-7)
group by mid_id
)t1;
"
$hive -e "$sql"12.1.6 留存率
建表
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists ads.mall__user_retention_day_rate
CREATE EXTERNAL TABLE `ads.mall__user_retention_day_rate`(
`stat_date` string comment '统计日期',
`create_date` string comment '设备新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数量',
`new_mid_count` bigint comment '设备新增数量',
`retention_ratio` decimal(10,2) comment '留存率'
) COMMENT '留存率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_retention_day_rate/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_retention_day_rate
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',--统计日期
date_add('$db_date',-1),--新增日期
1,--留存天数
sum(if(login_date_first=date_add('$db_date',-1) and
login_date_last='$db_date',1,0)),--2020-03-09 的 1 日留存数
sum(if(login_date_first=date_add('$db_date',-1),1,0)),--2020-03-09 新增
sum(if(login_date_first=date_add('$db_date',-1) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-1),1,0))*100
from dwt.mall__uv_topic
union all
select
'$db_date',--统计日期
date_add('$db_date',-2),--新增日期
2,--留存天数
sum(if(login_date_first=date_add('$db_date',-2) and
login_date_last='$db_date',1,0)),--2020-03-08 的 2 日留存数
sum(if(login_date_first=date_add('$db_date',-2),1,0)),--2020-03-08 新增
sum(if(login_date_first=date_add('$db_date',-2) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-2),1,0))*100
from dwt.mall__uv_topic
union all
select
'$db_date',--统计日期
date_add('$db_date',-3),--新增日期
3,--留存天数
sum(if(login_date_first=date_add('$db_date',-3) and
login_date_last='$db_date',1,0)),--2020-03-07 的 3 日留存数
sum(if(login_date_first=date_add('$db_date',-3),1,0)),--2020-03-07 新增
sum(if(login_date_first=date_add('$db_date',-3) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-3),1,0))*100
from dwt.mall__uv_topic;
"
$hive -e "$sql"12.1.7 最近连续三周活跃用户数
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__continuity_wk_count
CREATE EXTERNAL TABLE `ads.mall__continuity_wk_count`(
`dt` string COMMENT '统计日期,一般用结束周周日日期,如果每天计算一次,可用当天日期',
`wk_dt` string COMMENT '持续时间',
`continuity_count` bigint COMMENT '活跃次数'
) COMMENT '最近连续三周活跃用户数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_wk_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_wk_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
concat(date_add(next_day('$db_date','MO'),-7*3),'_',date_add(next_day('$db_date','MO'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7)
and dt<=date_add(next_day('$db_date','monday'),-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7*2)
and dt<=date_add(next_day('$db_date','monday'),-7-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7*3)
and dt<=date_add(next_day('$db_date','monday'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2
"
$hive -e "$sql"12.1.8 最近七天内连续三天活跃用户数
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__continuity_uv_count
CREATE EXTERNAL TABLE `ads.mall__continuity_uv_count`(
`dt` string COMMENT '统计日期',
`wk_dt` string COMMENT '最近 7 天日期',
`continuity_count` bigint
) COMMENT '最近七天内连续三天活跃用户数表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_uv_count/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
concat(date_add('db_date',-6),'_','db_date'),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
(
select
mid_id,
date_sub(dt,rank) date_dif
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rank
from dws.mall__uv_detail_daycount
where dt>=date_add('db_date',-6) and
dt<='db_date'
)t1
)t2
group by mid_id,date_dif
having count(*)>=3
)t3
group by mid_id
)t4;
"
$hive -e "$sql"12.2 会员主题
12.2.1 会员主题信息
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16drop table if exists ads.mall__user_topic
CREATE EXTERNAL TABLE `ads.mall__user_topic`(
`dt` string COMMENT '统计日期',
`day_users` string COMMENT '活跃会员数',
`day_new_users` string COMMENT '新增会员数',
`day_new_payment_users` string COMMENT '新增消费会员数',
`payment_users` string COMMENT '总付费会员数',
`users` string COMMENT '总会员数',
`day_users2users` decimal(10,2) COMMENT '会员活跃率',
`payment_users2users` decimal(10,2) COMMENT '会员付费率',
`day_new_users2users` decimal(10,2) COMMENT '会员新鲜度'
) COMMENT '会员主题信息表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_topic/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sum(if(login_date_last='$db_date',1,0)),
sum(if(login_date_first='$db_date',1,0)),
sum(if(payment_date_first='$db_date',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last='$db_date',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first='$db_date',1,0))/sum(if(login_date_last='$db_date',1,0))
from dwt.mall__user_topic
"
$hive -e "$sql"12.2.2 漏斗分析
统计“浏览->购物车->下单->支付”的转化率
思路:统计各个行为的人数,然后计算比值。建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15drop table if exists ads.mall__user_action_convert_day
CREATE EXTERNAL TABLE `ads.mall__user_action_convert_day`(
`dt` string COMMENT '统计日期',
`total_visitor_m_count` bigint COMMENT '总访问人数',
`cart_u_count` bigint COMMENT '加入购物车的人数',
`visitor2cart_convert_ratio` decimal(10,2) COMMENT '访问到加入购物车转化率',
`order_u_count` bigint COMMENT '下单人数',
`cart2order_convert_ratio` decimal(10,2) COMMENT '加入购物车到下单转化率',
`payment_u_count` bigint COMMENT '支付人数',
`order2payment_convert_ratio` decimal(10,2) COMMENT '下单到支付的转化率'
) COMMENT '漏斗分析表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_action_convert_day/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_action_convert_day
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
uv.day_count,
ua.cart_count,
cast(ua.cart_count/uv.day_count as decimal(10,2)) visitor2cart_convert_ratio,
ua.order_count,
cast(ua.order_count/ua.cart_count as decimal(10,2)) visitor2order_convert_ratio,
ua.payment_count,
cast(ua.payment_count/ua.order_count as decimal(10,2)) order2payment_convert_ratio
from
(
select
dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from dws.mall__user_action_daycount
where dt='$db_date'
group by dt
)ua join ads.mall__uv_count uv on uv.dt=ua.dt;
"
$hive -e "$sql"12.3 商品主题
12.3.1 商品个数信息
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__product_info
CREATE EXTERNAL TABLE `ads.mall__product_info`(
`dt` string COMMENT '统计日期',
`sku_num` string COMMENT 'sku 个数',
`spu_num` string COMMENT 'spu 个数'
) COMMENT '商品个数信息表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_info/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_info
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_num,
spu_num
from
(
select
'$db_date' dt,
count(*) sku_num
from
dwt.mall__sku_topic
) tmp_sku_num
join
(
select
'$db_date' dt,
count(*) spu_num
from
(
select
spu_id
from
dwt.mall__sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on
tmp_sku_num.dt=tmp_spu_num.dt;
"
$hive -e "$sql"12.3.2 商品销量排行
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__product_sale_topN
CREATE EXTERNAL TABLE `ads.mall__product_sale_topN`(
`dt` string COMMENT '统计日期',
`sku_num` string COMMENT 'sku 个数',
`spu_num` string COMMENT 'spu 个数'
) COMMENT '商品销量排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_sale_topN/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_sale_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
payment_amount
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by payment_amount desc
limit 10;
"
$hive -e "$sql"12.3.3 商品收藏排名
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__product_favor_topN
CREATE EXTERNAL TABLE `ads.mall__product_favor_topN`(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`favor_count` bigint COMMENT '收藏量'
) COMMENT '商品收藏排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_favor_topN/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_favor_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
favor_count
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by favor_count desc
limit 10;
"
$hive -e "$sql"12.3.4 商品加入购物车排名
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__product_cart_topN
CREATE EXTERNAL TABLE `ads.mall__product_cart_topN`(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`cart_num` bigint COMMENT '加入购物车数量'
) COMMENT '商品加入购物车排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_cart_topN/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_cart_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
cart_num
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by cart_num desc
limit 10;
"
$hive -e "$sql"12.3.5 商品退款率排名(近30天)
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__product_refund_topN
CREATE EXTERNAL TABLE `ads.mall__product_refund_topN`(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`refund_ratio` decimal(10,2) COMMENT '退款率'
) COMMENT '商品退款率排名(最近 30 天)表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_refund_topN/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_refund_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt.mall__sku_topic
order by refund_ratio desc
limit 10;
"
$hive -e "$sql"12.3.6 商品差评率
建表
1
2
3
4
5
6
7
8
9
10drop table if exists ads.mall__appraise_bad_topN
CREATE EXTERNAL TABLE `ads.mall__appraise_bad_topN`(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`appraise_bad_ratio` decimal(10,2) COMMENT '差评率'
) COMMENT '商品差评率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/appraise_bad_topN/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=appraise_bad_topN
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by appraise_bad_ratio desc
limit 10;
"
$hive -e "$sql"12.4 营销主题
12.4.1 下单数目统计
建表
1
2
3
4
5
6
7
8
9
10
11drop table if exists ads.mall__order_daycount
CREATE EXTERNAL TABLE `ads.mall__order_daycount`(
dt string comment '统计日期',
order_count bigint comment '单日下单笔数',
order_amount bigint comment '单日下单金额',
order_users bigint comment '单日下单用户数'
) COMMENT '下单数目统计表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/order_daycount/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=order_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from dws.mall__user_action_daycount
where dt='$db_date';
"
$hive -e "$sql"12.4.2 支付信息统计
建表
1
2
3
4
5
6
7
8
9
10
11
12
13drop table if exists ads.mall__payment_daycount
CREATE EXTERNAL TABLE `ads.mall__payment_daycount`(
dt string comment '统计日期',
order_count bigint comment '单日支付笔数',
order_amount bigint comment '单日支付金额',
payment_user_count bigint comment '单日支付人数',
payment_sku_count bigint comment '单日支付商品数',
payment_avg_time double comment '下单到支付的平均时长,取分钟数'
) COMMENT '支付信息统计表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/payment_daycount/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=payment_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
'$db_date' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from dws.mall__user_action_daycount
where dt='$db_date'
)tmp_payment
join
(
select
'$db_date' dt,
sum(if(payment_count>0,1,0)) payment_sku_count
from dws.mall__sku_action_daycount
where dt='$db_date'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(
select
'$db_date' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60
payment_avg_time
from dwd.mall__fact_order_info
where dt='$db_date'
and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt
"
$hive -e "$sql"12.4.3 复购率
建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17drop table if exists ads.mall__sale_tm_category1_stat_mn
CREATE EXTERNAL TABLE `ads.mall__sale_tm_category1_stat_mn`(
tm_id string comment '品牌 id',
category1_id string comment '1 级品类 id ',
category1_name string comment '1 级品类名称 ',
buycount bigint comment '购买人数',
buy_twice_last bigint comment '两次以上购买人数',
buy_twice_last_ratio decimal(10,2) comment '单次复购率',
buy_3times_last bigint comment '三次以上购买人数',
buy_3times_last_ratio decimal(10,2) comment '多次复购率',
stat_mn string comment '统计月份',
stat_date string comment '统计日期'
) COMMENT '复购率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/sale_tm_category1_stat_mn/'
tblproperties ("parquet.compression"="snappy")导入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=sale_tm_category1_stat_mn
hive_table_name=$APP2.mall__$table_name
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
mn.sku_tm_id,
mn.sku_category1_id,
mn.sku_category1_name,
sum(if(mn.order_count>=1,1,0)) buycount,
sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(mn.order_count>=3,1,0)) buy3timeLast ,
sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio,
date_format('$db_date' ,'yyyy-MM') stat_mn,
'$db_date' stat_date
from
(
select
user_id,
sd.sku_tm_id,
sd.sku_category1_id,
sd.sku_category1_name,
sum(order_count) order_count
from dws.mall__sale_detail_daycount sd
where date_format(dt,'yyyy-MM')=date_format('$db_date' ,'yyyy-MM')
group by user_id, sd.sku_tm_id, sd.sku_category1_id, sd.sku_category1_name
) mn
group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;
"
$hive -e "$sql"