技术开发 频道

使用Oracle内建功能构建ETL流程



    定时调度 – DBMS_SCHEDULER

    首先我们将使用proc_file_watcher存储过程每5分钟对某一个目录进行扫描等待文件。如果文件存在,调用proc_txn_product对文件进行处理,否则睡眠5分钟。这个文件扫描过程在周二早上4点钟被激活,如果超过8点文件还是没有到,可以抛出用户异常或者调用UTL_SMTP包发邮件通知相关责任人。
create or replace procedure proc_file_watcher is v_exists boolean; v_file_length number; v_blocksize number; begin <<L_sleeping_child>> if to_char(sysdate,'hh24')>='08' then --超时,可以调用UTL_SMTP null; else utl_file.fgetattr('SOURCE_DIR','product1.dat',v_exists,v_file_length,v_blocksize); if v_exists then dbms_output.put_line('File there!'); proc_txn_product; else dbms_output.put_line('404 Error'); dbms_lock.sleep(300); goto L_sleeping_child; end if; end if; end proc_file_watcher;
    现在,我们只需要将这个文件扫描程序加入调度,每个周二早上4点钟开始运行即可。
    --创建程序
SQL> ed Wrote file afiedt.buf 1 BEGIN 2 DBMS_SCHEDULER.CREATE_PROGRAM( 3 program_name=>'STENNY.STP_PROC_FILE_WATCHER', 4 program_action=>'STENNY.PROC_FILE_WATCHER', 5 program_type=>'STORED_PROCEDURE', 6 comments=>'Firing the ETL process if file arrives', 7 enabled=>TRUE); 8* END; SQL> / PL/SQL procedure successfully completed. --创建调度 SQL> ed Wrote file afiedt.buf 1 BEGIN 2 SYS.DBMS_SCHEDULER.CREATE_SCHEDULE( 3 repeat_interval => 'FREQ=WEEKLY;BYDAY=TUE;BYHOUR=8;BYMINUTE=0;BYSECOND=0', 4 start_date => to_timestamp_tz('2004-04-27 US/Central', 'YYYY-MM-DD TZR'), 5 comments => 'Tuesday AM Schedule', 6 schedule_name => '"STENNY"."SCS_TXN_PROD"'); 7* END; SQL> / PL/SQL procedure successfully completed. 创建工作 SQL> ed Wrote file afiedt.buf 1 BEGIN 2 SYS.DBMS_SCHEDULER.CREATE_JOB( 3 job_name => 'STENNY.SCJ_TXN_PROD', 4 program_name => 'STENNY.STP_PROC_FILE_WATCHER', 5 schedule_name => 'STENNY.SCS_TXN_PROD', 6 comments => 'Start the ETL process on Tuesday', 7 auto_drop => FALSE, 8 enabled => TRUE); 9* END; SQL> / PL/SQL procedure successfully completed. --进行测试 SQL> select count(*) from stenny.stg_product; COUNT(*) ---------- 0 SQL> EXEC DBMS_SCHEDULER.RUN_JOB('STENNY.SCJ_TXN_PROD',FALSE); PL/SQL procedure successfully completed. SQL> select count(*) from stenny.stg_product; COUNT(*) ----------
    附录A, 创建测试表的语句
--stg_excep Create table stg_excep as select * from stenny_ext_product where 1=2; --stg_product CREATE TABLE STG_PRODUCT ( PRODUCT_ID NUMBER, PRODUCT_NAME VARCHAR2(20), LOC_ID NUMBER ); --loc_std CREATE TABLE LOC_STD ( LOC_ID NUMBER, LOC_NAME VARCHAR2(20) ); INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 1, 'JiangSu'); INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 2, 'ZheJiang'); INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 3, 'SiChuan'); INSERT INTO LOC_STD ( LOC_ID, LOC_NAME ) VALUES ( 4, 'YunNan');
0
相关文章