数据开发基本都是从陌生到熟悉,但是写多了就会发现各种好用的工具/函数,也会发现各种坑,本文分享了作者从拿到数据到数据开发到数据监控的一些实操经验。
写在前面
各种join的用法篇
|
|
|
|
|
|
|
|
|
|
|
|
read zhule_b;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- better way to perform join, select small range of data first.
SELECT A.*, B.*
FROM
(SELECT * FROM A WHERE ds='20180101') A
JOIN
(SELECT * FROM B WHERE ds='20180101') B
ON a.key = b.key;
-
The left table in a LEFT OUTER JOIN operation must be a large table.
-
The right table in a RIGHT OUTER JOIN operation must be a large table.
-
MAPJOIN cannot be used in a FULL OUTER JOIN operation.
-
The left or right table in an INNER JOIN operation can be a large table.
SELECT /*+ MAPJOIN(b) */
a.*
FROM test_a a
JOIN test_b b
ON a.user_key = b.user_key
;
//就是在sql语句前加一个标记说这是mapjoin,把小表别名写在括号里
Left Join
-
一定要保留左表的内容是,可以选择用left join,例如加入key_attrs
-
Right Join和Left Join没有本质区别,建议定义好左表后都利用Left Join来执行
-
如果右表有重复数据的情况,那么最终left join的结果会有重复
Left Semi Join
-
当右表没有重复数据时,和Join是一致的,只会保留相同的列下来
-
left semi join并不会返回右表B中的任何数据,所以你没法在where条件中指定关于右表B的任何筛选条件,下面得例子能够有更加清晰的对比(例子引用于开源论坛):
employee (2 columns - e_id and e_name)
10, Tom
20, Jerry
30, Emily
employee_department_mapping (2 columns - e_id and d_name)
10, IT
10, Sales
20, HR
-- inner join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e INNER JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
10, Tom, Sales
20, Jerry, HR
-- left semi join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e LEFT SEMI JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
20, Jerry, HR
-
最好用的场景就是找出两表的差异部分;
-
算法日常调度时可以用于每日新增修改商品的提取,将关键字段放到ON条件中就行
Full Join
-
在有增删改情况下更新下游最新数据时,非常好用,但是知道的人比较少
SELECT COALESCE(a.main_image_url,b.main_image_url) AS main_image_url
,COALESCE(a.embedding,b.embedding) AS embedding
FROM today_feat a
FULL JOIN yest_feat b
ON a.main_image_url = b.main_image_url
其中full jion的效果如下,正好满足new,old,updated feature的更新,配合COALESCE很丝滑:
合理设置Mapper和Reducepriority
set odps.instance.priority
set odps.sql.mapper.split.size
-- original sql
CREATE TABLE if not EXISTS tmp_zhl_test LIFECYCLE 1 AS
SELECT sig, venture, seller_id, COUNT(product_id) as cnt
FROM sku_main_image_sig
WHERE LENGTH(sig) > 10 --some bad cases may have weird sigs like '#NEXT#'
GROUP BY sig, venture, seller_id
HAVING cnt>2
;
set odps.sql.reducer.instances
set odps.sql.mapper(reducer).memory
在Python UDF中使用第三方库
Upload&Call Package
-
需要下载第三方库的安装包xxx.whl,可以直接下载到自己的电脑上面,这样可以在离线环境验证多个版本的一致性(下面介绍)。一般来说我们需要去看安装包需要的python版本号以及兼容机器环境,一般来说都是cp37-cp37m or py2.py3-none-any在中间,然后末尾是x86_64的安装包;
-
本地直接将xxx.whl转换为xxx.zip,利用命令「mv xxx.whl xxx.zip」就行
-
将zip资源文件上传到ODPS对应的环境
-
在你的UDF中,利用下面的代码指定资源包的路径和引用(直接copy就行)
def include_package_path(res_name, lib_name):
archive_files = get_cache_archive(res_name)
dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
if '.dist_info' not in f.name], key=lambda v: len(v))
for dir_name in dir_names:
if dir_name.endswith(lib_name):
sys.path.insert(0, os.path.dirname(dir_name))
break
elif os.path.exists(os.path.join(dir_name, lib_name + '.py')):
sys.path.insert(0, os.path.abspath(dir_name))
break
class PostProcess(BaseUDTF):
def __init__(self):
include_package_path('opencv_python-3.4.0.zip','cv2')
include_package_path('numpy-1.16.6.zip','numpy')
-
python UDF写完后,就可以在创建函数里面的Resources里直接将你的资源名写进去,这样在流程启动后,你的资源才会被有效调用起来。
-
python UDF默认的版本是2.x的,如果你的python版本是3.x,那么需要在ODPS运行前加入下面的指令;同时,部分功能是需要打开沙箱的,所以如果报错的话,可以加入第二行的沙箱命令。
set odps.sql.python.version = cp37; --use python 3.7, default is 2.x version
set odps.isolation.session.enable = true;
Solve Compatibility Issue
-
在本地可以用类似conda的工具搭建一个虚拟环境
-
在本地用pip或者conda install来安装你需要的三方库
-
查询你下载的三方库以及依赖库的版本,比如python-opencv的话可以打印cv2.__version__
-
把对应版本的xxx.whl包按照上面的方法现在下来并且上传到ODPS资源中进行依赖
发布任务时的一些额外建议
-
发布任务配置时,可以灵活使用exclude和extra来去掉或添加你想要的依赖。其中exclude可以去掉你中间产出的临时表,而extra可以帮你增加虽然不在代码里但是希望依赖的上游表(这在汇总不同上游表数据写入下游对应分区并且希望同时产出下游数据时很有用)。
--exclude input or output tables (especially those tmp tables)--@exclude_input=lsiqg_iqc_sku_product_detection_result--@exclude_output=lsmp_sku_image_url_bizdate
-- include input or output tables (especially those separate venture tables)--@extra_input=lsiqg_iqc_sku_product_detection_result--@extra_output=lsmp_sku_image_url_bizdate
-
如果在SQL代码过程中你需要使用临时表来过渡中间产出的数据(避免SQL嵌套过于严重,影响运行效率),建议一定在临时表中加入一个时间戳,ex. lsiqg_iqc_input_tmp_${bizdate}不然在补数据或者遇到任务堵塞两个任务同时在调度时,或产生overwrite的一系列问题。 -
如果存在上游表存在多个分区,但是每个分区处理逻辑一样的话(比如不同国家的分区表处理逻辑其实一样),非常建议在第一步里就把不同分区表的数据汇总起来,可以重新增加一个分区(如venture)来存放融合后的数据。如下示例:
INSERT OVERWRITE TABLE sku_main_image_sig PARTITION (ds = '${bizdate}',venture)
SELECT id
,image_url
,venture
FROM (
SELECT id
,image_url
,'ID' AS venture
FROM auction_image_id
WHERE ds = MAX_PT('auction_image_id')
UNION
SELECT id
,image_url
,'PH' AS venture
FROM auction_image_ph
WHERE ds = MAX_PT('auction_image_ph')
UNION
SELECT id
,image_url
,'VN' AS venture
FROM auction_image_vn
WHERE ds = MAX_PT('auction_image_vn')
UNION
SELECT id
,image_url
,'SG' AS venture
FROM auction_image_sg
WHERE ds = MAX_PT('auction_image_sg')
UNION
SELECT id
,image_url
,'MY' AS venture
FROM auction_image_my
WHERE ds = MAX_PT('auction_image_my')
UNION
SELECT id
,image_url
,'TH' AS venture
FROM auction_image_th
WHERE ds = MAX_PT('auction_image_th')
) union_table
;
-
对于重要的数据表,一定要设置监控,防止数据丢失、不正常产出等问题,具体的方法又可以分两类:
-
设置任务基线(baseline)来保证任务优先级,这样调度的时间更有保障
-
设置warning的短信/电话或者DQC的监控规则来具体监控数据
简单的任务可以直接在任务中心查看详情中设置:
写在最后
-
拿到数据第一时间验证数据的重复性,有效性;如果是组内问题就反馈,上游链路问题就自己过滤;
-
写完数据的每一部分都先验证合理性,这样会提高最终数据的成功率;
-
一般节点上线后,会主动去观察3-4天,确保输出是符合预期的(如果会发现应该稳定的数据反而猛然增加or减少,那很可能是数据逻辑有问题);
-
定义合理的数据监控,可以避免数据为空,数据波动过大,数据字段不合理等问题;
Enjoy Data Engineering!!
作者|周慧玲(逐乐)
文章评论