Un módulo de operador es la interfaz que interactúa con fuentes de datos externas al recibir datos de una fuente de datos o al enviar datos a un destino de datos.
SAP HANA Streaming Analytics admite dos tipos de operador: basado en filas y en flujo.
Los operadores basados y reciben reciben un conjunto de datos en un formato consecutivo, como un operador de base de datos. Estos transportistas trabajan con AdapterRow
casos, que son contenedores para uno o más registros o capas que fluyen de un módulo (portador, formateador o conector de Streaming Analytics) al siguiente. Puede agregar varios registros como objetos dentro de una sola lista AdapterRow
objeto. El es AdapterRow
tiene una marca de tiempo y banderas de bloque que controlan cómo se comunican sus registros hacia y desde Streaming Analytics.
Los operadores basados en transmisión tratan los datos en transmisión como un operador de socket. Estos transportistas trabajan con ByteStream
o ByteBuffer
casos, que indican un flujo continuo de datos.
En este tutorial, crearemos un módulo de portador en una fila según le convenga MQTT
.
Antes de comenzar, puede consultar el $STREAMING_HOME/adapters/framework/examples/src
directorio para el código fuente de los portadores de muestra.
El código fuente completo del módulo de transporte está disponible en el
Appendix
Alt
Primero, configuraremos el Proyecto de adaptadores personalizados.
- Comience abriendo su IDE y creando un nuevo proyecto java llamado
mqtt-input
- Crea un paquete
com.sap
- Crea una clase de Java llamada
MqttTransporter.java
- Crea una clase de Java llamada
MqttCB.java
. El código para este archivo está disponible en la sección de apéndice de este tutorial. - Agregaremos algo ahora
.jar
dependencias en nuestra ruta de clases:- Java OPS Biblioteca
- Las otras dependencias serán del Kit de herramientas de adaptadores y se pueden encontrar en
%STREAMING_HOME%adaptersframeworklibj
Commons-configuration-<version>.jar
Streaming-client.jar
Streaming-system.jar
Streaming-adapter-framework.jar
Entonces tambien MqttTransporter
extensión de la Transporter
sonó.
Comenzaremos definiendo una serie de variables de instancia, a las que se les asignarán valores en el init()
método (más sobre eso más adelante).
MqttClient client
;String topic
;MqttCB cb
;
Después de hacer esto, necesitaremos implementar algunos métodos abstractos en Transporter. Cubriremos los métodos en el mismo orden en que los llamará el marco del adaptador.
Es el primer método abstracto que aplicaremos void init()
. El objetivo de este método es preparar el módulo para las acciones de las que es responsable. Usaremos este método para iniciar varias variables globales, así como para tocar los parámetros definidos por el usuario para el adaptador.
Primero, queremos obtener el valor del parámetro Tema. Este valor lo establece el desarrollador de transmisión mientras configura el adaptador en Studio.
Podemos encontrar el valor del Contenido llamando a:
utility.getParameters().getString("MQTTInputTransporterParamet ers.Topic");
El es
MQTTInputTransporterParameters
un prefijo se define en nuestro archivo de configuración de adaptador.A continuación, crea
MqttClient
. El constructor construyeserverURI
la dirección del servidor al que conectarse, especificada comoURI
yclientId
– un identificador de cliente que es exclusivo del servidor al que está conectado.Usaremos el
MosquittoServerAddress
definido de forma única por el desarrollador de streaming y streamingclient = new MqttClient(utility.getParameters().getString("MQTTInputTranspo rterParameters.MosquittoServerAddress"), "MQTT_ESP");
Conecta el
MqttClient
leclient.connect();
Suscríbete al
MqttClient
con el sujeto conclient.subscribe(topic);
Instanciar
MqttCB
objetar y asignarnosloMqttClient
.MqttCB
es personalizadoMqttCallback
clase escrita para este acondicionador. El código de grabación está disponible en la sección del apéndice de este tutorial.cb = new MqttCB(); client.setCallback(cb);
Es el segundo método abstracto que debemos aplicar void start()
. El propósito de este método es realizar las tareas necesarias al iniciar el adaptador. Para nuestros propósitos, no es necesario
Incluya las instrucciones de este método para que lo dejemos en blanco.
Es el tercer método de implementación más importante. void execute()
. Cuando el marco del adaptador llama a este método, se espera que se ejecute continuamente hasta que se le solicite al adaptador que se detenga o hasta que el adaptador complete su trabajo.
En consecuencia, envolveremos nuestra funcionalidad en un bucle que se repite hasta que se emita una solicitud de detención para el adaptador. Es una guía para seguir este bucle y completar el método para cambiar el adaptador.
RunState
hacer.while(!utility.isStopRequested()) { //steps b-d } utility.setAdapterState(RunState.RS_DONE);
Aunque no se ha pedido que se detenga el adaptador, constantemente comprobaremos si hay nuevos.
MQTT
mensajes. El estakeNewMsg()
el método volveránull
si no hay mensajes nuevos, o sacar el mensaje de la cola de mensajes y devolverlo. Cuando se reciba un mensaje nuevo, lo procesaremos dentro delif
declaración.String msg; if ((msg = cb.takeNewMsg()) != null){ //steps c-d }
Una vez que hayamos recibido un mensaje, necesitamos crear un mensaje
AdapterRow
y envíalo a nuestroFormatter
módulo.AdapterRow row = utility.createRow(cb.getRcvdMsg()); utility.sendRow(row);
El cuarto método es la violación. void stop()
. Su propósito es realizar las tareas necesarias cuando el adaptador está parado. Usaremos este método para nuestro MqttClient
emitiéndolo
client.disconnect();
Es el quinto método y el último método. void destroy()
. Su propósito es realizar cualquier tarea de limpieza para su portadora de entrada o salida. Para nuestros propósitos, no es necesario incluir ninguna instrucción en este método por lo que lo dejamos en blanco.