Cree operadores flexibles personalizados para analizar el flujo de eventos

Vea el proyecto completo en el editor CCL. Debería ver algo como esto:


CREATE INPUT STREAM MACHINEDATA SCHEMA ( MACHINEID string , EVENT_TIME msdate , EVENT_NAME string , EVENT_DESCRIPTION string , EVENT_VALUE string ) ; CREATE REFERENCE MACHINE_REF SCHEMA ( MACHINEID string , MACHINETYPE string , MAX_TEMP decimal(4,2) , MIN_TEMP decimal(4,2) , LOCATION string , TEMP_UNIT string ) PRIMARY KEY ( MACHINEID ) PROPERTIES service = 'hanadb' , source = 'MACHINE_REF' , sourceSchema = 'STREAMING' ; /**@SIMPLEQUERY=FILTER*/ CREATE OUTPUT STREAM ACTIVITY_HIST AS SELECT * FROM MACHINEDATA WHERE MACHINEDATA.EVENT_NAME = 'DOOR' ; /**@SIMPLEQUERY=JOIN*/ CREATE OUTPUT STREAM DEVICE_EVENTS AS SELECT MACHINEDATA.MACHINEID MACHINEID , MACHINEDATA.EVENT_TIME EVENT_TIME , MACHINEDATA.EVENT_NAME EVENT_NAME , MACHINEDATA.EVENT_DESCRIPTION EVENT_DESCRIPTION , MACHINEDATA.EVENT_VALUE EVENT_VALUE , MACHINE_REF.MACHINETYPE MACHINETYPE , MACHINE_REF.MAX_TEMP MAX_TEMP , MACHINE_REF.MIN_TEMP MIN_TEMP , MACHINE_REF.LOCATION LOCATION , MACHINE_REF.TEMP_UNIT TEMP_UNIT FROM MACHINEDATA INNER JOIN MACHINE_REF ON MACHINEDATA.MACHINEID = MACHINE_REF.MACHINEID ; /**@SIMPLEQUERY=AGGREGATE*/ CREATE OUTPUT WINDOW AVG_TEMP PRIMARY KEY DEDUCED KEEP ALL AS SELECT DEVICE_EVENTS.MACHINEID MACHINEID , LAST ( DEVICE_EVENTS.EVENT_TIME ) EVENT_TIME , avg ( to_decimal(DEVICE_EVENTS.EVENT_VALUE, 4, 2) ) AVG_TEMP , DEVICE_EVENTS.MAX_TEMP MAX_TEMP , DEVICE_EVENTS.MIN_TEMP MIN_TEMP , DEVICE_EVENTS.LOCATION LOCATION , DEVICE_EVENTS.TEMP_UNIT TEMP_UNIT FROM DEVICE_EVENTS KEEP 30 SEC GROUP FILTER DEVICE_EVENTS.EVENT_NAME = 'TEMP' GROUP BY DEVICE_EVENTS.MACHINEID ; /**@SIMPLEQUERY=PATTERN*/ CREATE OUTPUT STREAM ALARM_POWER AS SELECT A.MACHINEID MACHINEID , A.EVENT_TIME EVENT_TIME , A.LOCATION LOCATION , 'POWER' ALARM_TYPE , 'POWER Out for more than 20 seconds' ALARM_DESC FROM DEVICE_EVENTS A, DEVICE_EVENTS B MATCHING [ 20 SEC : A , ! B ] ON A.MACHINEID = B.MACHINEID AND A.EVENT_VALUE = 'Power off' AND B.EVENT_VALUE = 'Power on' ; CREATE FLEX POWER_OUTAGES IN DEVICE_EVENTS OUT OUTPUT STREAM POWER_OUTAGES SCHEMA ( MACHINEID string , POWER_OFF_TIME msdate , POWER_ON_TIME msdate , DURATION_MIN double ) BEGIN DECLARE dictionary(string, msdate) offtime; msdate offts; END; ON DEVICE_EVENTS { if (DEVICE_EVENTS.EVENT_VALUE = 'Power off') { offtime[DEVICE_EVENTS.MACHINEID] := DEVICE_EVENTS.EVENT_TIME; } if (DEVICE_EVENTS.EVENT_VALUE = 'Power on' AND not isnull(offtime[DEVICE_EVENTS.MACHINEID])) { offts := offtime[DEVICE_EVENTS.MACHINEID]; output [ MACHINEID = DEVICE_EVENTS.MACHINEID; POWER_OFF_TIME = offts; POWER_ON_TIME = DEVICE_EVENTS.EVENT_TIME; DURATION_MIN = cast(double,DEVICE_EVENTS.EVENT_TIME - offts)/60000000;]; } } ; END; /* Dashboard */ CREATE FLEX DASHBOARD IN DEVICE_EVENTS OUT OUTPUT WINDOW DASHBOARD SCHEMA ( MACHINEID string , POWER_STATUS integer, CURR_TEMP float, MAX_TEMP float ) PRIMARY KEY (MACHINEID) KEEP ALL BEGIN DECLARE typeof(DASHBOARD) outrec; dictionary(string, typeof(DASHBOARD)) prev; END; ON DEVICE_EVENTS { if (not isnull(prev[DEVICE_EVENTS.MACHINEID])){ outrec := prev[DEVICE_EVENTS.MACHINEID]; } outrec.MACHINEID := DEVICE_EVENTS.MACHINEID; if(DEVICE_EVENTS.EVENT_NAME = 'POWER') { if (DEVICE_EVENTS.EVENT_VALUE = 'Power on') { outrec.POWER_STATUS := 0;} else {outrec.POWER_STATUS := 10;} } if (DEVICE_EVENTS.EVENT_NAME = 'TEMP') { outrec.CURR_TEMP := to_float(DEVICE_EVENTS.EVENT_VALUE); outrec.MAX_TEMP := to_float(DEVICE_EVENTS.MAX_TEMP); } output setOpcode(outrec,upsert); prev[DEVICE_EVENTS.MACHINEID] := outrec; } ; END; CREATE OUTPUT WINDOW ALARM_TEMP PRIMARY KEY DEDUCED KEEP ALL AS SELECT AVG_TEMP.MACHINEID MACHINEID , AVG_TEMP.EVENT_TIME EVENT_TIME , AVG_TEMP.LOCATION LOCATION , 'TEMP' ALARM_TYPE , 'Machine not maintaining temperature' ALARM_DESC FROM AVG_TEMP WHERE AVG_TEMP.AVG_TEMP > AVG_TEMP.MAX_TEMP ; ATTACH OUTPUT ADAPTER HANA_Output1 TYPE hana_out TO ACTIVITY_HIST PROPERTIES service = 'hanadb' , sourceSchema = 'STREAMING' , table = 'ACTIVITY_HIST' ;

Hecho

Inicie sesión para responder la pregunta

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *