Saltar al contenido

Creación de módulos adaptadores personalizados

Un módulo de operador es la interfaz que interactúa con fuentes de datos externas al recibir datos de una fuente de datos o al enviar datos a un destino de datos.

SAP HANA Streaming Analytics admite dos tipos de operador: basado en filas y en flujo.

Los operadores basados ​​y reciben reciben un conjunto de datos en un formato consecutivo, como un operador de base de datos. Estos transportistas trabajan con AdapterRow casos, que son contenedores para uno o más registros o capas que fluyen de un módulo (portador, formateador o conector de Streaming Analytics) al siguiente. Puede agregar varios registros como objetos dentro de una sola lista AdapterRow objeto. El es AdapterRow tiene una marca de tiempo y banderas de bloque que controlan cómo se comunican sus registros hacia y desde Streaming Analytics.

Los operadores basados ​​en transmisión tratan los datos en transmisión como un operador de socket. Estos transportistas trabajan con ByteStream o ByteBuffer casos, que indican un flujo continuo de datos.

En este tutorial, crearemos un módulo de portador en una fila según le convenga MQTT.

Antes de comenzar, puede consultar el $STREAMING_HOME/adapters/framework/examples/src directorio para el código fuente de los portadores de muestra.

El código fuente completo del módulo de transporte está disponible en el Appendix Alt

Primero, configuraremos el Proyecto de adaptadores personalizados.

  1. Comience abriendo su IDE y creando un nuevo proyecto java llamado mqtt-input
  2. Crea un paquete com.sap
  3. Crea una clase de Java llamada MqttTransporter.java
  4. Crea una clase de Java llamada MqttCB.java. El código para este archivo está disponible en la sección de apéndice de este tutorial.
  5. Agregaremos algo ahora .jar dependencias en nuestra ruta de clases:
    • Java OPS Biblioteca
    • Las otras dependencias serán del Kit de herramientas de adaptadores y se pueden encontrar en %STREAMING_HOME%adaptersframeworklibj
      • Commons-configuration-<version>.jar
      • Streaming-client.jar
      • Streaming-system.jar
      • Streaming-adapter-framework.jar

Entonces tambien MqttTransporter extensión de la Transporter sonó.

Comenzaremos definiendo una serie de variables de instancia, a las que se les asignarán valores en el init() método (más sobre eso más adelante).

  • MqttClient client;
  • String topic;
  • MqttCB cb;

Después de hacer esto, necesitaremos implementar algunos métodos abstractos en Transporter. Cubriremos los métodos en el mismo orden en que los llamará el marco del adaptador.

Es el primer método abstracto que aplicaremos void init(). El objetivo de este método es preparar el módulo para las acciones de las que es responsable. Usaremos este método para iniciar varias variables globales, así como para tocar los parámetros definidos por el usuario para el adaptador.

  1. Primero, queremos obtener el valor del parámetro Tema. Este valor lo establece el desarrollador de transmisión mientras configura el adaptador en Studio.

    Podemos encontrar el valor del Contenido llamando a:

    utility.getParameters().getString("MQTTInputTransporterParamet
    ers.Topic");
    

    El es MQTTInputTransporterParameters un prefijo se define en nuestro archivo de configuración de adaptador.

  2. A continuación, crea MqttClient. El constructor construye serverURI la dirección del servidor al que conectarse, especificada como URI y clientId – un identificador de cliente que es exclusivo del servidor al que está conectado.

    Usaremos el MosquittoServerAddress definido de forma única por el desarrollador de streaming y streaming

    client = new
    MqttClient(utility.getParameters().getString("MQTTInputTranspo
    rterParameters.MosquittoServerAddress"), "MQTT_ESP");
    
  3. Conecta el MqttClient le client.connect();

  4. Suscríbete al MqttClient con el sujeto con client.subscribe(topic);

  5. Instanciar MqttCB objetar y asignarnoslo MqttClient. MqttCB es personalizado MqttCallback clase escrita para este acondicionador. El código de grabación está disponible en la sección del apéndice de este tutorial.

    cb = new MqttCB();
    client.setCallback(cb);
    

Es el segundo método abstracto que debemos aplicar void start(). El propósito de este método es realizar las tareas necesarias al iniciar el adaptador. Para nuestros propósitos, no es necesario
Incluya las instrucciones de este método para que lo dejemos en blanco.

Es el tercer método de implementación más importante. void execute(). Cuando el marco del adaptador llama a este método, se espera que se ejecute continuamente hasta que se le solicite al adaptador que se detenga o hasta que el adaptador complete su trabajo.

  1. En consecuencia, envolveremos nuestra funcionalidad en un bucle que se repite hasta que se emita una solicitud de detención para el adaptador. Es una guía para seguir este bucle y completar el método para cambiar el adaptador. RunState hacer.

    while(!utility.isStopRequested())
    {
    //steps b-d
    }
    utility.setAdapterState(RunState.RS_DONE);
    
  2. Aunque no se ha pedido que se detenga el adaptador, constantemente comprobaremos si hay nuevos. MQTT mensajes. El es takeNewMsg() el método volverá null si no hay mensajes nuevos, o sacar el mensaje de la cola de mensajes y devolverlo. Cuando se reciba un mensaje nuevo, lo procesaremos dentro del if declaración.

    String msg;
    if ((msg = cb.takeNewMsg()) != null){
    //steps c-d
    }
    
  3. Una vez que hayamos recibido un mensaje, necesitamos crear un mensaje AdapterRow y envíalo a nuestro Formatter módulo.

    AdapterRow row = utility.createRow(cb.getRcvdMsg());
    utility.sendRow(row);
    

El cuarto método es la violación. void stop(). Su propósito es realizar las tareas necesarias cuando el adaptador está parado. Usaremos este método para nuestro MqttClient emitiéndolo

client.disconnect();

Es el quinto método y el último método. void destroy(). Su propósito es realizar cualquier tarea de limpieza para su portadora de entrada o salida. Para nuestros propósitos, no es necesario incluir ninguna instrucción en este método por lo que lo dejamos en blanco.

Hecho

Inicie sesión para responder la pregunta