Vea el proyecto completo en el editor CCL. Debería ver algo como esto:
CREATE INPUT STREAM MACHINEDATA
SCHEMA (
MACHINEID string ,
EVENT_TIME msdate ,
EVENT_NAME string ,
EVENT_DESCRIPTION string ,
EVENT_VALUE string ) ;
CREATE REFERENCE MACHINE_REF
SCHEMA (
MACHINEID string ,
MACHINETYPE string ,
MAX_TEMP decimal(4,2) ,
MIN_TEMP decimal(4,2) ,
LOCATION string ,
TEMP_UNIT string )
PRIMARY KEY ( MACHINEID )
PROPERTIES service = 'hanadb' ,
source = 'MACHINE_REF' ,
sourceSchema = 'STREAMING' ;
/**@SIMPLEQUERY=FILTER*/
CREATE OUTPUT STREAM ACTIVITY_HIST
AS SELECT *
FROM MACHINEDATA
WHERE MACHINEDATA.EVENT_NAME = 'DOOR' ;
/**@SIMPLEQUERY=JOIN*/
CREATE OUTPUT STREAM DEVICE_EVENTS
AS SELECT
MACHINEDATA.MACHINEID MACHINEID ,
MACHINEDATA.EVENT_TIME EVENT_TIME ,
MACHINEDATA.EVENT_NAME EVENT_NAME ,
MACHINEDATA.EVENT_DESCRIPTION EVENT_DESCRIPTION ,
MACHINEDATA.EVENT_VALUE EVENT_VALUE ,
MACHINE_REF.MACHINETYPE MACHINETYPE ,
MACHINE_REF.MAX_TEMP MAX_TEMP ,
MACHINE_REF.MIN_TEMP MIN_TEMP ,
MACHINE_REF.LOCATION LOCATION ,
MACHINE_REF.TEMP_UNIT TEMP_UNIT
FROM MACHINEDATA INNER JOIN MACHINE_REF
ON MACHINEDATA.MACHINEID = MACHINE_REF.MACHINEID ;
/**@SIMPLEQUERY=AGGREGATE*/
CREATE OUTPUT WINDOW AVG_TEMP
PRIMARY KEY DEDUCED
KEEP ALL
AS SELECT
DEVICE_EVENTS.MACHINEID MACHINEID ,
LAST ( DEVICE_EVENTS.EVENT_TIME ) EVENT_TIME ,
avg ( to_decimal(DEVICE_EVENTS.EVENT_VALUE, 4, 2) ) AVG_TEMP ,
DEVICE_EVENTS.MAX_TEMP MAX_TEMP ,
DEVICE_EVENTS.MIN_TEMP MIN_TEMP ,
DEVICE_EVENTS.LOCATION LOCATION ,
DEVICE_EVENTS.TEMP_UNIT TEMP_UNIT
FROM DEVICE_EVENTS KEEP 30 SEC
GROUP FILTER DEVICE_EVENTS.EVENT_NAME = 'TEMP'
GROUP BY DEVICE_EVENTS.MACHINEID ;
/**@SIMPLEQUERY=PATTERN*/
CREATE OUTPUT STREAM ALARM_POWER
AS SELECT
A.MACHINEID MACHINEID ,
A.EVENT_TIME EVENT_TIME ,
A.LOCATION LOCATION ,
'POWER' ALARM_TYPE ,
'POWER Out for more than 20 seconds' ALARM_DESC
FROM DEVICE_EVENTS A, DEVICE_EVENTS B
MATCHING [ 20 SEC : A , ! B ]
ON A.MACHINEID = B.MACHINEID
AND A.EVENT_VALUE = 'Power off'
AND B.EVENT_VALUE = 'Power on' ;
CREATE FLEX POWER_OUTAGES
IN DEVICE_EVENTS
OUT OUTPUT STREAM POWER_OUTAGES
SCHEMA (
MACHINEID string ,
POWER_OFF_TIME msdate ,
POWER_ON_TIME msdate ,
DURATION_MIN double )
BEGIN
DECLARE
dictionary(string, msdate) offtime;
msdate offts;
END;
ON DEVICE_EVENTS {
if (DEVICE_EVENTS.EVENT_VALUE = 'Power off') {
offtime[DEVICE_EVENTS.MACHINEID] := DEVICE_EVENTS.EVENT_TIME;
}
if (DEVICE_EVENTS.EVENT_VALUE = 'Power on' AND not isnull(offtime[DEVICE_EVENTS.MACHINEID])) {
offts := offtime[DEVICE_EVENTS.MACHINEID];
output [
MACHINEID = DEVICE_EVENTS.MACHINEID;
POWER_OFF_TIME = offts;
POWER_ON_TIME = DEVICE_EVENTS.EVENT_TIME;
DURATION_MIN = cast(double,DEVICE_EVENTS.EVENT_TIME - offts)/60000000;];
}
} ;
END;
/* Dashboard */
CREATE FLEX DASHBOARD
IN DEVICE_EVENTS
OUT OUTPUT WINDOW DASHBOARD
SCHEMA (
MACHINEID string ,
POWER_STATUS integer,
CURR_TEMP float,
MAX_TEMP float )
PRIMARY KEY (MACHINEID)
KEEP ALL
BEGIN
DECLARE
typeof(DASHBOARD) outrec;
dictionary(string, typeof(DASHBOARD)) prev;
END;
ON DEVICE_EVENTS {
if (not isnull(prev[DEVICE_EVENTS.MACHINEID])){
outrec := prev[DEVICE_EVENTS.MACHINEID];
}
outrec.MACHINEID := DEVICE_EVENTS.MACHINEID;
if(DEVICE_EVENTS.EVENT_NAME = 'POWER') {
if (DEVICE_EVENTS.EVENT_VALUE = 'Power on') {
outrec.POWER_STATUS := 0;}
else {outrec.POWER_STATUS := 10;}
}
if (DEVICE_EVENTS.EVENT_NAME = 'TEMP') {
outrec.CURR_TEMP := to_float(DEVICE_EVENTS.EVENT_VALUE);
outrec.MAX_TEMP := to_float(DEVICE_EVENTS.MAX_TEMP);
}
output setOpcode(outrec,upsert);
prev[DEVICE_EVENTS.MACHINEID] := outrec;
} ;
END;
CREATE OUTPUT WINDOW ALARM_TEMP
PRIMARY KEY DEDUCED
KEEP ALL
AS SELECT AVG_TEMP.MACHINEID MACHINEID ,
AVG_TEMP.EVENT_TIME EVENT_TIME ,
AVG_TEMP.LOCATION LOCATION ,
'TEMP' ALARM_TYPE ,
'Machine not maintaining temperature' ALARM_DESC
FROM AVG_TEMP
WHERE AVG_TEMP.AVG_TEMP > AVG_TEMP.MAX_TEMP ;
ATTACH OUTPUT ADAPTER HANA_Output1 TYPE hana_out TO ACTIVITY_HIST
PROPERTIES
service = 'hanadb' ,
sourceSchema = 'STREAMING' ,
table = 'ACTIVITY_HIST' ;