Parisi.IO

Loading...Obfuscating Quigley Matrix


Home

I heard you like Reptiles

On March 22, 2019 we released Apache NiFi MiNiFi C++ 0.6.0 . This brings a lot of features that I can’t begin to cover in a single post [1].

There is one in particular I’ve been using quite a bit for rapid prototyping: Python processors [1]. We have a simple example in our code base called SentimentAnalysis [1]. This is a simple processor that performs a sentiment analysis on incoming text from the content of a flow file. It provides a score from 0.0 to 1.0 that indicates if the text is neutral, positive, or negative. This processor requires nltk and VaderSentiment to be installed via pip/

With the introduction of Python processors I hope that developers can quickly create and deploy features written in Python to MiNiFi C++. The most important aspect is that Python processors are easy to add, remove, and run. The default configuration defines a subdirectory, minifi-python. Simply place your Python processors into this directory. The file name will be the processor name defined as the class in your flow.

To demonstrate this I’ve written a short flow ( at the end of this post ) that uses our sentiment analyzer. The flow simply pulls data from a directory on my file system, using GetFile. These flow files are then sent through the Sentiment Analyzer, which is written in python, and then logged with LogAttribute.

My test files are short in nature. Here is the example output from LogAttribute with a negatively scored payload.

Negative Analysis

As you can see the sentiment analysis provides different scores for a more positive payload. VaderSentiment with its default set does a good job at scoring text. I encourage you to read more about nltk and it sentiment analyzers.

Positive Analysis

What is required?

Python processors simply require that you implement a describe, on Initialize, and onTrigger functions [1]. On The describe function allows us to provide a description of your processor to the framework. The onInitialize function allows you to specify whether your processor supports dynamic properties and the properties that make up your processor. The YAML will configure your processor as it would any C++ or Java processor implemented with our JNI capabilities.

What does it all mean?

It’s a little unfair to couch this as a rapid prototyping feature. I think many will use it as such; however, these processors function in the same way C++ or Java processors do. They’re simply function calls into bound functions. There will be added cost, but it’s likely not beyond that of your I/O. As a result you should be able to use Python processors in your every day flows.

The example I provided is short but demonstrates how you can access your Python processors. In any case,  if a dependency for your Python script does not exist we will not allow that processor to be loaded. In the future we hope to improve namespace references via the flow. If you look at the example flow, below, the class name is defined as org.apache.nifi.minifi.processors.SentimentAnalysis. In future releases we’ll improve how we isolate and reference Python processors.

Feel free to give it a try and if you have any issues let me know. I encourage you to use one of our binary releases to give it a try.

[1] https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/script/README.md

[2] https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/pythonprocessors/SentimentAnalysis.py

[3] https://nifi.apache.org/minifi/download.html

MiNiFi Config Version: 3
Flow Controller:
  name: root
  comment: ''
Core Properties:
  flow controller graceful shutdown period: 10 sec
  flow service write delay interval: 500 ms
  administrative yield duration: 30 sec
  bored yield duration: 10 millis
  max concurrent threads: 1
  variable registry properties: ''
FlowFile Repository:
  partitions: 256
  checkpoint interval: 2 mins
  always sync: false
  Swap:
    threshold: 20000
    in period: 5 sec
    in threads: 1
    out period: 5 sec
    out threads: 4
Content Repository:
  content claim max appendable size: 10 MB
  content claim max flow files: 100
  always sync: false
Provenance Repository:
  provenance rollover time: 1 min
  implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
Component Status Repository:
  buffer size: 1440
  snapshot frequency: 1 min
Security Properties:
  keystore: ''
  keystore type: ''
  keystore password: ''
  key password: ''
  truststore: ''
  truststore type: ''
  truststore password: ''
  ssl protocol: ''
  Sensitive Props:
    key:
    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
    provider: BC
Processors:
- id: c37e7b38-9b3b-4034-a67a-621328171073
  name: GetFile
  class: org.apache.nifi.minifi.processors.GetFile
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties:
    Batch Size: '10'
    File Filter: '[^\.].*'
    Ignore Hidden Files: 'true'
    Input Directory: /home/marc/deploy/data
    Keep Source File: 'true'
    Maximum File Age: 0 sec
    Minimum File Age: 0 sec
    Minimum File Size: 0 B
    Polling Interval: 0 sec
    Recurse Subdirectories: 'true'
- id: 43f59213-d9e1-4d3c-b4c3-c745ebfa2916
  name: LogAttribute
  class: org.apache.nifi.minifi.processors.LogAttribute
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 500 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list:
  - success
  Properties:
    Attributes to Ignore:
    Attributes to Log:
    Log Level: info
    Log Payload: 'true'
    Log Prefix:
- id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  name: SentimentAnalysis
  class: org.apache.nifi.minifi.processors.SentimentAnalysis
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties: {}
Controller Services: []
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: e37e50f4-caf5-401c-9e39-5c4a6e677216
  name: GetFile/success/SentimentAnalysis
  source id: c37e7b38-9b3b-4034-a67a-621328171073
  source relationship names:
  - success
  destination id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  max work queue size: 0
  max work queue data size: 10000 B
  flowfile expiration: 60 seconds
  queue prioritizer class: ''
- id: fd8f1e22-ad58-45fd-bbc1-277fd58bb568
  name: SentimentAnalysis/failure/LogAttribute
  source id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  source relationship names:
  - failure
  - success
  destination id: 43f59213-d9e1-4d3c-b4c3-c745ebfa2916
  max work queue size: 0
  max work queue data size: 10000 B
  flowfile expiration: 60 seconds
  queue prioritizer class: ''
Remote Process Groups: []
NiFi Properties Overrides: {}