Parisi.IO

Loading...Obfuscating Quigley Matrix


Parisi.IO

resume author

Welcome

My name is Marc Parisi

I am a Software Engineer. Scroll down to learn more..

Scroll down

Parisi.IO

About

I am Principal Software Engineer, focusing on Big data infrastructure development and embedded systems. To the right are projects I support, below is a snapshot of my experience, which you can click to learn more.

  • 2017-01-30

    Hortonworks/Cloudera Principal Software Engineer

    Working on Apache NiFi MiNiFi C++ to support getting data from a myriad of different devices, operating systems, and use cases into a multitude of formats.

  • 2016-09-01

    Miner & Kasch

    Support Bloomberg and Merck on Accumulo related activities, including creating a graph store and extending c++ client for Accumulo.

  • 2011-10-01

    DoD IC (Various contractors)

    Managed Query and Compliance Activities. Click for more Info!

  • 2010-06-01

    G2 Inc

    Software Team Lead for Fightclub project that used Apache Accumulo (Cloudbase) to create a query system across a large cluster of data.

  • 2005-01-01

    Diversified Technology

    Developed Test harness for single board computer manufacturer using a distributed framework

Scroll down

Parisi.IO

MarcOnTech

View the post
  • 27 Mar 2019
  • phrocker
  • 0 comments
blog image
View the post
  • 19 Jan 2018
  • phrocker
  • 0 comments
blog image
View the post
  • 10 Jan 2018
  • phrocker
  • 0 comments
blog image
View the post
  • 7 Jan 2018
  • phrocker
  • 0 comments
blog image
Home

I heard you like Reptiles

On March 22, 2019 we released Apache NiFi MiNiFi C++ 0.6.0 . This brings a lot of features that I can’t begin to cover in a single post [1].

There is one in particular I’ve been using quite a bit for rapid prototyping: Python processors [1]. We have a simple example in our code base called SentimentAnalysis [1]. This is a simple processor that performs a sentiment analysis on incoming text from the content of a flow file. It provides a score from 0.0 to 1.0 that indicates if the text is neutral, positive, or negative. This processor requires nltk and VaderSentiment to be installed via pip/

With the introduction of Python processors I hope that developers can quickly create and deploy features written in Python to MiNiFi C++. The most important aspect is that Python processors are easy to add, remove, and run. The default configuration defines a subdirectory, minifi-python. Simply place your Python processors into this directory. The file name will be the processor name defined as the class in your flow.

To demonstrate this I’ve written a short flow ( at the end of this post ) that uses our sentiment analyzer. The flow simply pulls data from a directory on my file system, using GetFile. These flow files are then sent through the Sentiment Analyzer, which is written in python, and then logged with LogAttribute.

My test files are short in nature. Here is the example output from LogAttribute with a negatively scored payload.

Negative Analysis

As you can see the sentiment analysis provides different scores for a more positive payload. VaderSentiment with its default set does a good job at scoring text. I encourage you to read more about nltk and it sentiment analyzers.

Positive Analysis

What is required?

Python processors simply require that you implement a describe, on Initialize, and onTrigger functions [1]. On The describe function allows us to provide a description of your processor to the framework. The onInitialize function allows you to specify whether your processor supports dynamic properties and the properties that make up your processor. The YAML will configure your processor as it would any C++ or Java processor implemented with our JNI capabilities.

What does it all mean?

It’s a little unfair to couch this as a rapid prototyping feature. I think many will use it as such; however, these processors function in the same way C++ or Java processors do. They’re simply function calls into bound functions. There will be added cost, but it’s likely not beyond that of your I/O. As a result you should be able to use Python processors in your every day flows.

The example I provided is short but demonstrates how you can access your Python processors. In any case,  if a dependency for your Python script does not exist we will not allow that processor to be loaded. In the future we hope to improve namespace references via the flow. If you look at the example flow, below, the class name is defined as org.apache.nifi.minifi.processors.SentimentAnalysis. In future releases we’ll improve how we isolate and reference Python processors.

Feel free to give it a try and if you have any issues let me know. I encourage you to use one of our binary releases to give it a try.

[1] https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/script/README.md

[2] https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/pythonprocessors/SentimentAnalysis.py

[3] https://nifi.apache.org/minifi/download.html

MiNiFi Config Version: 3
Flow Controller:
  name: root
  comment: ''
Core Properties:
  flow controller graceful shutdown period: 10 sec
  flow service write delay interval: 500 ms
  administrative yield duration: 30 sec
  bored yield duration: 10 millis
  max concurrent threads: 1
  variable registry properties: ''
FlowFile Repository:
  partitions: 256
  checkpoint interval: 2 mins
  always sync: false
  Swap:
    threshold: 20000
    in period: 5 sec
    in threads: 1
    out period: 5 sec
    out threads: 4
Content Repository:
  content claim max appendable size: 10 MB
  content claim max flow files: 100
  always sync: false
Provenance Repository:
  provenance rollover time: 1 min
  implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
Component Status Repository:
  buffer size: 1440
  snapshot frequency: 1 min
Security Properties:
  keystore: ''
  keystore type: ''
  keystore password: ''
  key password: ''
  truststore: ''
  truststore type: ''
  truststore password: ''
  ssl protocol: ''
  Sensitive Props:
    key:
    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
    provider: BC
Processors:
- id: c37e7b38-9b3b-4034-a67a-621328171073
  name: GetFile
  class: org.apache.nifi.minifi.processors.GetFile
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties:
    Batch Size: '10'
    File Filter: '[^\.].*'
    Ignore Hidden Files: 'true'
    Input Directory: /home/marc/deploy/data
    Keep Source File: 'true'
    Maximum File Age: 0 sec
    Minimum File Age: 0 sec
    Minimum File Size: 0 B
    Polling Interval: 0 sec
    Recurse Subdirectories: 'true'
- id: 43f59213-d9e1-4d3c-b4c3-c745ebfa2916
  name: LogAttribute
  class: org.apache.nifi.minifi.processors.LogAttribute
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 500 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list:
  - success
  Properties:
    Attributes to Ignore:
    Attributes to Log:
    Log Level: info
    Log Payload: 'true'
    Log Prefix:
- id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  name: SentimentAnalysis
  class: org.apache.nifi.minifi.processors.SentimentAnalysis
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties: {}
Controller Services: []
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: e37e50f4-caf5-401c-9e39-5c4a6e677216
  name: GetFile/success/SentimentAnalysis
  source id: c37e7b38-9b3b-4034-a67a-621328171073
  source relationship names:
  - success
  destination id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  max work queue size: 0
  max work queue data size: 10000 B
  flowfile expiration: 60 seconds
  queue prioritizer class: ''
- id: fd8f1e22-ad58-45fd-bbc1-277fd58bb568
  name: SentimentAnalysis/failure/LogAttribute
  source id: f9c446a7-74f7-4a67-b8d2-0c77a664f0fa
  source relationship names:
  - failure
  - success
  destination id: 43f59213-d9e1-4d3c-b4c3-c745ebfa2916
  max work queue size: 0
  max work queue data size: 10000 B
  flowfile expiration: 60 seconds
  queue prioritizer class: ''
Remote Process Groups: []
NiFi Properties Overrides: {}
Home

Batteries not included

Managing Resources

I’ve been hacking away at a robotic car built with a RaspberryPi. I had no way of monitoring the battery life, so it occurred to me that we need such a mechanism in MiNiFi C++. Configuring MiNiFi C++ for the correct number of threads and timing can be difficult, therefore we’ve created a controller service that can monitor battery life in Linux in order to adjust the thread pool settings. The controller service is titled LinuxPowerManagerService. It can be configured as we see in the image below. LinuxPowerManagerService is the first controller service that functions as a proof of concept to monitor and augment thread pools within the agent in response to battery capacity and status. To configure this, we specify the battery capacity and status paths, along with the trigger and low battery thresholds. The trigger threshold is the threshold of the battery capacity before we begin reducing threads and incurring wait in our scheduling agents between processor executions. The low battery threshold is a threshold at which we respond more aggressively to reducing resources consumed.

The battery status is a path that specifies the current state of the battery. State is typically defined as charging or discharging. This is important as the LinuxPowerManagerService makes an attempt to reduce resources if and only if we are still in a discharged state. The wait period is the frequency at which we will make adjustments the internal MiNiFi C++ threadpools. If the low battery threshold is not met then this wait period will also be the period in which we will make any adjustments to thread resources.

Avoiding Starvation

The TheadPoolManager controller service requires that we do not starve processors. In testing I found that the reduction in thread pool threads and increased time slicing that we incur with the manager results in increased yield in the flow. This may or may not be desirable in our flow. Testing concluded that as we reduce our thread pool we see a reduction in speed. The controller service will avoid starvation by leaving one thread available to do all work. This means that if you have a simple flow ( say GenerateFlowFile –> LogAttribute ) your reduction in CPU consumed may be up to 30% of a rather small amount.

Recovering

Below I’ve attempted to demonstrate the reduction in threads followed by the increase in the number of threads in htop. The increase originates from plugging the battery in, resulting in a charging state. The second image shows us after ten minutes of execution once we’ve lowered below our threshold. The final image demonstrates the increase in threads after I accidentally plugged the computer back in. Once charging, the agent will increase the number of threads incrementally.  If the agent enters into a discharge state, it will immediately resume reduction of threads from the current state.

Demonstrating Threads
Reducing Threads
Increasing threads while charging

Battery Management in action

Conclusion

In this article we’ve take a quick dive into monitoring battery state and capacity. Capacity is intended to be the current energy level of our battery(ies). If we reach our threshold the agent will automatically reduce the number of threads devoted to processor execution. In doing so we’ll also see an increase in the sleep and yield times between processor onTrigger calls.

Home

Tonight, We Bank in Hell!!

I was inspecting sources for some banks because I was curious if it was normal for one of my banks to open up a ton of sources. This isn’t necessarily a security concern, but I was quite interested in why some feel the need to pull information or link to more sources than others. I initially began watching this on all websites I visited…but one of the first sites to pique my interest was a banking website. Note that in some cases these sources are legitimate and/or owned by the bank. In others….

CITIBANK
SUNTRUST BANK

 

BANK OF AMERIC
WELLS FARGO
REGIONS BANK
M&T BANK

BB&T BANK
Home

Visualizing Sensors

Visualizing Temperature and Humidity Sensors to monitor my furnace

As I’ve previously discussed with ‘Navigating IoT,’ I think that IoT is such a general initialism that we can’t fully capture what is possible. One of the primary areas technologists refer to is sensor capture. Unlike most posts that discuss this, I had an actual reason to capture the data. I have wireless sensors that read temperature and humidity; however, I don’t have any way of capturing data from them. With the use of a SenseHAT on a few well placed Raspberry PIs I was able to get this information to Grafana and visualize it very easily.

SenseHAT on Raspberry PI

Getting Started

This how-to will step you through setting up sensors on your Raspberry PI using Apache MiNiFi C++. My first step was to assemble the RPIs with SenseHATs. This is as simple as connecting the SenseHAT to the GPIO ports on the RPI [1].

I next built an image of Raspberry PI using a custom branch with some I2C capabilities [2]. This branch contains intermediate code used for a Raspberry PI driven car; however, it also contains a processor called SenseHAT. To get this up and running I included a third party library named RTIMULIB2. The reason I chose the SenseHAT is because I knew getting the processor running would be simple with this library. Alternatively, you could use ExecuteScript with a python script, but the responsiveness of the C++ calls was much higher and required very little code.

As you can see from the onSchedule and onTrigger functions, below, there is very little to getting this running. With this processor included I built MiNiFi on the PI and installed it. I bootstrapped the agent without execute script, lib archive, or expression language capabilities. I enabled my custom extension with cmake -DENABLE_I2C=true ..

void SenseHAT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {

  imu = RTIMU::createIMU(&settings);
  if (imu) {
    imu->IMUInit();
    imu->setGyroEnable(true);
    imu->setAccelEnable(true);
  } else {
    throw std::runtime_error("RTIMU could not be initialized");
  }

  humidity_sensor_ = RTHumidity::createHumidity(&settings);
  if (humidity_sensor_) {
    humidity_sensor_->humidityInit();
  } else {
    throw std::runtime_error("RTHumidity could not be initialized");
  }

  pressure_sensor_ = RTPressure::createPressure(&settings);
  if (pressure_sensor_) {
    pressure_sensor_->pressureInit();
  } else {
    throw std::runtime_error("RTPressure could not be initialized");
  }

}

void SenseHAT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {

auto flow_file_ = session->create();
flow_file_->setSize(0);

if ( imu->IMURead() ){
  RTIMU_DATA imuData = imu->getIMUData();
  auto vector = imuData.accel;
  std::cout << "acceleration" << std::endl;
  std::string degrees = RTMath::displayDegrees("acceleration",vector);
  flow_file_->addAttribute("ACCELERATION", degrees);
}

RTIMU_DATA data;

bool have_sensor = false;

if (humidity_sensor_->humidityRead(data)) {
  if (data.humidityValid) {
    have_sensor = true;
    std::stringstream ss;
    ss << std::fixed << std::setprecision(2) << data.humidity;
    flow_file_->addAttribute("HUMIDITY", ss.str());
  }
}

if (pressure_sensor_->pressureRead(data)) {
  if (data.pressureValid) {
    have_sensor = true;
    {
      std::stringstream ss;
      ss << std::fixed << std::setprecision(2) << data.pressure;
      flow_file_->addAttribute("PRESSURE", ss.str());
    }

    if (data.temperatureValid){
      std::stringstream ss;
      ss << std::fixed << std::setprecision(2) << data.temperature;
      flow_file_->addAttribute("TEMPERATURE", ss.str());
    }

  }
}

if (have_sensor) {

  WriteCallback callback("SenseHAT");

  session->write(flow_file_,&callback);
  session->transfer(flow_file_, Success);
}

I installed the MiNiFi agent in the root PI directory under ~/deploy/bin/ Once installed I created a flow that moved flow files created from the SenseHat Processor directly to a NiFi Instance through site to site. The NiFi instance is located on AWS, so all PIs could send data to it, using the s2s.host attribute as a differentiator. Note that Once I created the PI’s image, I copied the SD card to three others where I placed them around my house. My goal was to get temperature and humidity readings in certain places. One in my basement, one upstairs in a utility room containing the furnace and furnace probe, and one in my office.

I would convert these Attributes to JSON and pass them along to InfluxDB by way of MQTT and Mosquitto. I used a similar setup to that found in this guide [3]. The image, below, depicts the movement of data from SiteToSite to PublishMQTT. I then used the python script found in the guide as a framework for my own.

Site2Site to InfluxDB

 

My variance of the guide’s python script is below. I ran this in the background, collecting data from mosquitto and inserting it into InfluxDB.
Grafana can use InfluxDB as a data source, querying the appropriate fields as necessary. When finished I found the mean humidity and temperature to be higher than expected. The reason was an outlier in my office. All temperatures on the SenseHAT register warmer than the ambient temperature due to the heat of the processor, below; however, the one in my office registered much higher due to it being on the highest level and it is the room with the poorest airflow. The temperatures are in celcius. Note that there is an overall increase throughout the night. This is because my furnace runs longer as the outdoor temperature decreases. The stark rise is when the furnace is running followed by a drop as the thermostat temperature acquiesces. Humidity rises and lowers as the whole house humidifier is running concurrently with the furnace.

 

#!/isr/bin/env python3
import paho.mqtt.client as mqtt
import datetime
import time
import json
from influxdb import InfluxDBClient

def on_connect(client, userdata, flags, rc):
    client.subscribe("sensors")

def on_message(client, userdata, msg):
    # Use utc as timestamp
    print("oh i got something")
    receiveTime=datetime.datetime.utcnow()
    message=msg.payload.decode("utf-8")
    parsedJson=False
    try:
        val  = json.loads(message)
        parsedJson=True
        print("good json")
    except:
        parsedJson=False

    if parsedJson:
        json_body = [
            {
                "measurement": "temperature",
                "time": receiveTime,
                "fields": {
                    "temperature": float(val['TEMPERATURE']),
                    "s2s.host": val['s2s.host']
                }
            }
        ]

        dbclient.write_points(json_body)

        json_body = [
            {
                "measurement": "humidity",
                "time": receiveTime,
                "fields": {
                    "humidity": float(val['HUMIDITY']),
                    "s2s.host": val['s2s.host']
                }
            }
        ]

        dbclient.write_points(json_body)

# Set up a client for InfluxDB
dbclient = InfluxDBClient('localhost', 8086, 'root', 'root', 'sensors')

# Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)

# Blocking loop to the Mosquitto broker
client.loop_forever()

 

Office Temp and Humidity

I learned that there was only a minor difference in air temperature. This difference generally amounted to 1-2 degrees Fahrenheit, but this is enough to be felt. The basement is obviously cooler. Humidity in the basement wasn’t higher, but this will likely be more stark in the late spring when the A/C isn’t running. The above grade levels have lower humidity but higher temperatures.

With only a little bit of effort, I was able to capture and visualize temperature differentials and capture statistics. Using Apache MiNiFi C++, I was able to capture the SenseHAT and send the data to NiFi where I could do what I needed. The reason I chose MiNiFi over a simple python script that connected to a remote Mosquitto instance is primarily of provenance and controllability. With Apache MiNiFi I can control the agent through C&C ( Command and Control ) capabilities. This will be especially useful as I can use command and control interfaces to update a flow on the agents when needed. Command and control became especially useful when I was running into heating issues on my office sensor and had to change the run time characteristics of the SenseHAT processor to run less often. I could do this remotely without any downtime. Since the agents were self registering with C&C, this also meant doing so without having to SSH into the RPI. I’ll go into further detail regarding my usage of command and control in a subsequent blog post.

[1] https://www.adafruit.com/product/2738

[2] https://github.com/phrocker/nifi-minifi-cpp/tree/PiCar

[3] https://larsbergqvist.wordpress.com/2017/03/02/influxdb-and-grafana-for-sensor-time-series/

Home

Navigating IoT

Navigating the Internet of Things.

The Internet of Things is a vague description that encompasses all devices that could possibly be networked. If a device has a networking card or a radio, it is a thing on the internet. In some cases, it may be confusing to know what IoT means because we really can’t exclude any networked device from this description.

In search of the etymology of the initialism I found myself rummaging through many opinions on the origin [1]. All correct, I’m sure, but what it leads me to believe is that we are free to define IoT in the realm of our organization. IoT may or may not be important to one’s job, but it’s important to recognize that all devices, phones, cars, remote controls, etc create data and send it cross the internet.

Projects like the one I work on, Apache MiNiFi, aim to corral data generated by these devices; however, the question remains “what do I do with it?” In many cases these devices are single purpose collectors and provide little utility outside of their intended use case. In other cases, the IoT devices may have significant compute capacity allowing these devices to be remote haven of code execution. What I find perplexing is: does the IoT revolution mean we are supposed to buy more devices or use the ones we already have?

IoT is a very simple term by nature, but the advertising implies that we somehow need to purchase a service from some of the big players to use these devices; however, in many cases protocols already exist. Philips Hue devices already have a protocol and communicate with your hub and thus some centralized service. Only in rare cases will custom services or software be needed. If you are building a new device, it may be advantageous to use software to corral your data if it means reduced development time and improved provenance of data. What doesn’t make sense is investing resources in massive infrastructure and complicated software.

Be mindful that getting data from your devices may not dramatically improve your overall system experience. Being at the front of the IoT revolution may mean that you can build more business intelligence, but it may also mean you spend significant time and money to get very small amounts of meaningless data. Like with all fads, just because there is data to be obtained, doesn’t mean the return on investment is there.

The beauty of IoT being as general as it is means that the paradigms may apply to infrastructure that is within a controlled environment. Perhaps it means you can get the status of manufacturing hardware sooner, and apply machine learning to estimate hardware failure before it occurs. I visited a ‘big data’ conference in the Midwest and listened intently to a company that did just that in the IoT track. Their presentation was one of caution as they said that the companies never shut down the equipment based on a warning, even after the software repeatedly identified premature failure. Listening and repairing would have saved time and money, but the companies simply stayed with their ways. The IoT revolution didn’t revolutionizing manufacturing for this company because IoT, while a penetrating paradigm, wasn’t a panacea to everyone.

Much like the ‘big data’ movement, technology is only meaningful when it is something everyone sees as a commodity. Do you need IoT or ‘big data’ to perform your work? For me that means trusting the results and information enough to accept downtime or accept cost; however, for everyone this clearly won’t be the case. When you navigate IoT, it’s important to consider that technologists are applying a nomenclature referring to the connectedness of devices that typically weren’t connected previously. It doesn’t necessarily mean that you will have lower costs or more information, and even if it does, it doesn’t mean that your organization will trust that information.

Will IoT spell greater medical breakthroughs or manufacturing improvements?  I don’t think by itself; however, we don’t always know what to expect from our infrastructure. At this same conference, I heard a talk by a veterinarian on medically linked devices for pets. These helped predict health markers that indicated the health of an animal that can’t speak or provide details. As we imagine what is possible, more devices will be created that leverage the IoT paradigm.

Security will always be a concern of mine, so I don’t anticipate this ever going away. Since the internet of things brings things together that weren’t linked before, we have risk that everyday devices fail due to vulnerabilities or provide PII without our permission. IoT can mean great utility, but it can also mean major security vulnerabilities. I would caution that advancement of technology should never take a back seat to security; however, for this reason I do opt to not include smart locks of thermostats to my home. I have smart lights, which can provide a threat vector into my home network; however, I don’t want to have someone else potentially control my thermostat or someone enter my house with a simple Bluetooth hack. When I look at the IoT realm I see many possible devices and the security threats limit what I am willing to accept; however, it is quite possible that we can mitigate these concerns when leveraging infrastructure in an organization.

In conclusion, the Internet of Things, is an interconnected network of a cacophony of devices, some old and some new. What’s important is that these devices, often single or few purpose, provide meaningful data to some, but a flutter of noise to others. You must evaluate what is important to your organization. Having an IoT infrastructure and collecting massive amounts of data may not be ultimately useful for your company. The security risks should also be imagined and you must determine if the return on investment is worth any risk. There are many applications from manufacturing to medical, and anywhere in between. Navigating what will be useful and worth the cost must come from honest discussion about whether your organization is willing to make change from the data collected. Otherwise, IoT is just noise.

[1] https://www.redbite.com/the-origin-of-the-internet-of-things/

Scroll down

Parisi.IO

Contact

Enter your comment to contact me, and I will get back to you as soon as possible.

E-Mail

marc@parisi.io

E-Mail

phrocker@apache.org

Scroll up