Atlas Scientific w/ EZO Sensors on Raspberry Pi and MQTT Atlas IoT Monitoring Software

Working with pi’s is pretty easy and I can always cycle them into something else if I go another route with carrier boards but I am fully commited to their EZO ecosystem. I’m happy with the sensor package. I probably didnt need individual screens for each nutrient tank but its what we were used to with previous solutions. The only thing that really hurts financially was my decision to use their peristatic pumps rather than driving cheaper pumps with a relay board. Ultimately it’ll be easier and I’ll have better quantity and time logging of the pumps, things I don’t know how to do with gpio.

1 Like

I agree with you, Atlas Scientific’s EZO ecosystem is great. It basically includes everything I need or would ever consider adding, with only one exception. It would be nice if they offered a full-spectrum quantum PAR sensor. I know their EZO-RGB can measure LUX, which can supposedly be converted to PAR, but I’m not sure if there are any drawbacks to doing that.

I also looked into what changes I’d need to make if I had to switch to their AtlasDesktop platform because they abounded AtlasIoT for the RPi. From what I can tell, the RPi would have to be replaced with a PC stick, and the i3 InterLink or any other carrier boards currently in use would need to be swapped for USB EZO carrier boards. If those are the only changes, it wouldn’t be too bad.

Regarding peristaltic pumps, I’m thinking about purchasing one or two, and if I like how they perform, I’ll get more. I have two hydroponic systems that would each require five pumps if I were to automate all of the dosing. Atlas sells two kits with the regular-sized pumps and all the necessary components. From what I can see, the only significant difference between the kits is that one is a single pump kit and the other is a triple pump kit. The triple kit also has a larger enclosure and uses just one data cable and one power supply. However, I don’t understand the pricing. Normally, buying more of something results in a slight discount, but buying three single pump kits is actually $72 cheaper than purchasing the triple pump kit. Am I missing something?

You also mentioned that you considered using a cheaper pump to save money but felt the benefits of the Atlas pumps justified the higher cost. After using them, do you still feel the same way?

Lastly, I recently switched to using dry nutrients that I premix before adding to my solution. They usually dissolve completely, but sometimes I notice small remnants left behind. Do you think this could pose an issue for these pumps, or do you believe they’ll perform as needed without any problems?

For the triple pump kit I’ve come to figure Atlas charges about triple what you would expect for each component, expecially wires and cables. You need to consider there is a sensor bridge($10) inside, a higher output power supply, and a project box, possibly with screen printing or stickers. Plus the labor to assemble it. So it seems accurate to their pricing. I use to mix dry ferts at 4:1 with water but I would get precipitates after a day. 6:1 is stable but I ultimately use 10:1 so I can consolidate to parts A and B rather than separating epsom salts. Plut its makes my dosing really easy to figure out as I can directly copy grams of my formulas to ml of liquid dose. I use a tool called Hydro Buddy to make my formulas. Took a while to learn to use it but I have my lab analysis of my water supply built into it and I use nutrient formulas from colleges. You may have better luck mixing all your dry ferts into individual wet mixes before combining. At least separating anything with calcium and magnesium, and never mix your calcium nitrate with anything but water before adding to the reservoir.
As for the pumps I’ve been smashing my head against the wall trying to figure out how to send commands to them. While the atlasiot service is running their sample code is interfered with buy the service polling all the sensors and pumps. I dont know enough about python to code anything myself and I cant find a way to send commands with mqtt. It was great being able to calibrate the pumps with a UI but I haven’t found a path to automations at all. Adding microcontrollers and controlling them with esphome looks like the easiest path right now. I have an email out to their support team looking for some direction.

Okay. I just wanted to make sure that I wasn’t missing something just in case I decide to buy three individual units. Right about now, a little extra wiring might be worth saving $72.

How do you measure the ratio, by weight.? I use a magnetic mixer with a 1000 mL glass flask to to premix parts A and B. I fill it up with RO water in between 500-800 mL depending on how grams of dry mix I’m using. How would I apply the 10:1 ratio to see where I’m at?

I’m familiar with the Hydro Buddy but never used it. My nute mixtures are premade from CropSalt and I use the scale they provide as a reference. I then adjust accordingly based on the PPM reading.

I forgot all about this but actually did it before and got it to work. I only have a pH and DO sensor so I could use AtlasIoT for everything but wanted to know if I could configure everything seperate and could both be run at the same time or while in use would AtlasIoT lock everything else out. It took me a while to figure it out but I eventually did. I’ll look for the scripts I used for the RPi and you should be able to alter them to work for yoy.

So there’s no automation with the pumps when using AtlasIoT? What options does it actually provide? I would have assumed the pumps would be able to use an attached pH or Conductivity sensor to control the dosing. However, from what you’re saying it sounds like they don’t. If not, then you should be able to use HA to automate based on your sensor readings and send the MQTT command to the pump to turn on until the desired level is met and then shutoff. I’ll post it as soon as I find it and makes sure it works.

You should get a response back from Atlas by their tech Charles. He does all of the coding for the Atlas IoT RPi and Desktop version. He recently emailed me and said he’s currently bringing the Desktop version up-to-date and afterwards he’ll implement the updates and changes into the RPi version.

I found the scripts that work and I know how to get the pH reading but anything else I can’t do. I’ve been trying to shutoff the led on my pH sensor using the L, 0 command as stated in the docs but it’s returning the pH value instead. I didn’t use the image version of AtlasIoT, instead I installed it manually with the Nginx auto-restart so I’m wondering now if AtlasIoT is actually locking me out or due to the constant polling and restarting when i kill the process.

I bastardize things and use gallons and g and ml due to containers i use but I’ll translate it to all metric to make it easier. 10:1 isnt exactly acurate since i focus on nutrient quantity. I take 100g of dry part A, add water to the 1000ml mark and mix. I use my formulas to figure out how many grams of A and B I need in the reservoir. (in my case my reservoirs are 40gal each). If you need 10g of part A you would add 100ml. General guideline is good growing conditions should be around 20% uptake a take so the daily dosing would be 20ml for this example. Adjust based on your EC target and plant health. In theory if things are all balanced out your pH shouldnt move much if your EC is correct for the system but the internet loves to argue about that. I currently use Masterblend premade but I supplement with Potassium Nitrate, Zinc EDTA, Manganese EDTA and Iron EDDHA. Important note here Iron EDTA that is widely used fully precipitates out by 6.5pH and up, and starts leaving above 6. Oh and EDDHA is VERY red.

There’s an on screen calibration, continuous dispensing, volume dispensing, volume over time dispensing, and constant flow rate. I mostly use volume over time and distribute a dose over the full daylight cycle since plants pull nitrogen out of the water within minutes. This way they get an even nitrogen dose all day and that alone is a big upgrade for me. There is no logic paths built in for automation from a control aspect. There are alarm functions for the inputs which look as though integrating actions would be a straight forward. I’m fairly confident throwing commands at the components from a 2nd script while the atlasiot or sample code environments are active would be a bad idea.
I installed manually as well, I had a lot of issues with the image on my first my first test run. I had to tweak things myself to launch the browser on startup and the RPI5’s start the browser before the atlas service gets running and I get a 502 bad gateway error from nginx. Goes away when I reload or x out the browser and it reloads. I haven’t figured out a way to delay the browser loading on startup yet.

So I should be able to help you a little. I used a shell script that is run when the RPi starts with a sleep command. That should solve your 502 bad gateway error.

#!/bin/bash
sleep 15
chromium-browser http://localhost:5000

I was also able to get the single commands of “R” and “i” on the pH sensor to work when AtlasIoT is running but I’m having problems with the others. I tried both with the AtlasIoT service killed and running. All you have to do is create the script, change the permissions to “Anyone”, and execute it. The script it not written to be triggered via MQTT from Home Assistant. I wrote them just to test them within the RPi to make sure they work.

Edit: The device info script does NOT work with the AtlasIoT service running. Below are the three commands to kill, check the sttatus, and restart the service. To see the difference, try running the device info script with the service running and then after you kill it. When the service is running the response will either be the pH value or an error.

kill - sudo systemctl stop AtlasIoT
restart - sudo systemctl start AtlasIoT
status - sudo systemctl status AtlasIoT
import smbus
import time

# I2C configuration
I2C_BUS = 1  # I2C bus 1 on most Raspberry Pi models
I2C_ADDRESS = 0x63  # Default I2C address of the pH EZO sensor

def read_ph():
    try:
        bus = smbus.SMBus(I2C_BUS)
        
        # Send the 'R' command to request a reading
        bus.write_byte(I2C_ADDRESS, ord('R'))
        time.sleep(1.5)  # Wait for the sensor to process
        
        # Read 7 bytes of data
        data = bus.read_i2c_block_data(I2C_ADDRESS, 0, 7)
        
        # Check the status byte
        if data[0] == 1:  # Successful reading
            # Decode the pH value
            ph_string = ''.join(chr(b) for b in data[1:] if chr(b).isdigit() or chr(b) == '.')
            return float(ph_string)
        else:
            print(f"Error: Status byte {data[0]}")
            return None
    except Exception as e:
        print(f"Error reading pH: {e}")
        return None

# Test the function
ph_value = read_ph()
if ph_value is not None:
    print(f"pH: {ph_value}")
import smbus
import time

# I2C configuration
I2C_BUS = 1  # I2C bus 1 on most Raspberry Pi models
I2C_ADDRESS = 0x63  # Default I2C address of the pH EZO sensor

def send_command(command):
    try:
        # Send the command to the device as a single byte
        bus.write_byte(I2C_ADDRESS, ord(command))  # Use write_byte instead of write_i2c_block_data
        time.sleep(0.3)  # Allow the sensor to process the command
        print(f"Command '{command}' sent successfully.")
    except Exception as e:
        print(f"Error sending command '{command}': {e}")

def read_response():
    try:
        # Increase response length to 16 or more bytes
        response = bus.read_i2c_block_data(I2C_ADDRESS, 0, 16)  # Adjusted to 16 bytes
        # Clean up the response and join the data
        response_str = "".join(chr(b) for b in response if b != 0).strip()
        return response_str
    except Exception as e:
        print(f"Error reading response: {e}")
        return None

def get_device_info():
    # Send 'i' command to get device info
    send_command("i")
    # Read the response
    response = read_response()
    if response:
        print(f"Raw Device Info: {response}")
        # Split response into parts
        response_parts = response.split(',')
        
        # Clean up the first part to remove the status byte
        cleaned_device_info = response_parts[1:]  # Remove the first item (status byte)
        
        # Output the cleaned information
        print(f"Device Info (cleaned): {cleaned_device_info}")
    else:
        print("No response received.")

if __name__ == '__main__':
    bus = smbus.SMBus(I2C_BUS)  # Re-initialize the bus
    get_device_info()

I’ve tried to alter them for the “Status” and the “L,?” command which go through but I’m not getting a response. See what you can come up with and let me know.

I also found this but haven’t had a chance to really try anything from it.

Okay. I’m making progress. This file is found on the Atlas github page…

…but you could just copy the code below and make a file name AtlasI2C.py.

#!/usr/bin/python

import io
import sys
import fcntl
import time
import copy
import string


class AtlasI2C:

    # the timeout needed to query readings and calibrations
    LONG_TIMEOUT = 1.5
    # timeout for regular commands
    SHORT_TIMEOUT = .3
    # the default bus for I2C on the newer Raspberry Pis, 
    # certain older boards use bus 0
    DEFAULT_BUS = 1
    # the default address for the sensor
    DEFAULT_ADDRESS = 98
    LONG_TIMEOUT_COMMANDS = ("R", "CAL")
    SLEEP_COMMANDS = ("SLEEP", )

    def __init__(self, address=None, moduletype = "", name = "", bus=None):
        '''
        open two file streams, one for reading and one for writing
        the specific I2C channel is selected with bus
        it is usually 1, except for older revisions where its 0
        wb and rb indicate binary read and write
        '''
        self._address = address or self.DEFAULT_ADDRESS
        self.bus = bus or self.DEFAULT_BUS
        self._long_timeout = self.LONG_TIMEOUT
        self._short_timeout = self.SHORT_TIMEOUT
        self.file_read = io.open(file="/dev/i2c-{}".format(self.bus), 
                                 mode="rb", 
                                 buffering=0)
        self.file_write = io.open(file="/dev/i2c-{}".format(self.bus),
                                  mode="wb", 
                                  buffering=0)
        self.set_i2c_address(self._address)
        self._name = name
        self._module = moduletype

	
    @property
    def long_timeout(self):
        return self._long_timeout

    @property
    def short_timeout(self):
        return self._short_timeout

    @property
    def name(self):
        return self._name
        
    @property
    def address(self):
        return self._address
        
    @property
    def moduletype(self):
        return self._module
        
        
    def set_i2c_address(self, addr):
        '''
        set the I2C communications to the slave specified by the address
        the commands for I2C dev using the ioctl functions are specified in
        the i2c-dev.h file from i2c-tools
        '''
        I2C_SLAVE = 0x703
        fcntl.ioctl(self.file_read, I2C_SLAVE, addr)
        fcntl.ioctl(self.file_write, I2C_SLAVE, addr)
        self._address = addr

    def write(self, cmd):
        '''
        appends the null character and sends the string over I2C
        '''
        cmd += "\00"
        self.file_write.write(cmd.encode('latin-1'))

    def handle_raspi_glitch(self, response):
        '''
        Change MSB to 0 for all received characters except the first 
        and get a list of characters
        NOTE: having to change the MSB to 0 is a glitch in the raspberry pi, 
        and you shouldn't have to do this!
        '''
        if self.app_using_python_two():
            return list(map(lambda x: chr(ord(x) & ~0x80), list(response)))
        else:
            return list(map(lambda x: chr(x & ~0x80), list(response)))
            
    def app_using_python_two(self):
        return sys.version_info[0] < 3

    def get_response(self, raw_data):
        if self.app_using_python_two():
            response = [i for i in raw_data if i != '\x00']
        else:
            response = raw_data

        return response

    def response_valid(self, response):
        valid = True
        error_code = None
        if(len(response) > 0):
            
            if self.app_using_python_two():
                error_code = str(ord(response[0]))
            else:
                error_code = str(response[0])
                
            if error_code != '1': #1:
                valid = False

        return valid, error_code

    def get_device_info(self):
        if(self._name == ""):
            return self._module + " " + str(self.address)
        else:
            return self._module + " " + str(self.address) + " " + self._name
        
    def read(self, num_of_bytes=31):
        '''
        reads a specified number of bytes from I2C, then parses and displays the result
        '''
        
        raw_data = self.file_read.read(num_of_bytes)
        response = self.get_response(raw_data=raw_data)
        #print(response)
        is_valid, error_code = self.response_valid(response=response)

        if is_valid:
            char_list = self.handle_raspi_glitch(response[1:])
            result = "Success " + self.get_device_info() + ": " +  str(''.join(char_list))
            #result = "Success: " +  str(''.join(char_list))
        else:
            result = "Error " + self.get_device_info() + ": " + error_code

        return result

    def get_command_timeout(self, command):
        timeout = None
        if command.upper().startswith(self.LONG_TIMEOUT_COMMANDS):
            timeout = self._long_timeout
        elif not command.upper().startswith(self.SLEEP_COMMANDS):
            timeout = self.short_timeout

        return timeout

    def query(self, command):
        '''
        write a command to the board, wait the correct timeout, 
        and read the response
        '''
        self.write(command)
        current_timeout = self.get_command_timeout(command=command)
        if not current_timeout:
            return "sleep mode"
        else:
            time.sleep(current_timeout)
            return self.read()

    def close(self):
        self.file_read.close()
        self.file_write.close()

    def list_i2c_devices(self):
        '''
        save the current address so we can restore it after
        '''
        prev_addr = copy.deepcopy(self._address)
        i2c_devices = []
        for i in range(0, 128):
            try:
                self.set_i2c_address(i)
                self.read(1)
                i2c_devices.append(i)
            except IOError:
                pass
        # restore the address we were using
        self.set_i2c_address(prev_addr)

        return i2c_devices

Put it in the same directory as the following script. Run the script and you should get responses for several tests commands I tried.

import time
from AtlasI2C import AtlasI2C  # Assuming the second script is saved as atlas_i2c.py

# I2C configuration
I2C_BUS = 1  # I2C bus 1 on most Raspberry Pi models
I2C_ADDRESS = 0x63  # Default I2C address of the pH EZO sensor

def send_command(command):
    try:
        # Initialize the AtlasI2C object
        atlas = AtlasI2C(address=I2C_ADDRESS, bus=I2C_BUS)

        # Send the command and get the response
        response = atlas.query(command)
        print(f"Command '{command}' sent successfully.")
        print(f"Response: {response}")

    except Exception as e:
        print(f"Error sending command '{command}': {e}")

def test_command(command):
    # Send the specified command
    send_command(command)

if __name__ == '__main__':
    # Test the 'Status' command
    print("Testing 'Status' command:")
    test_command("Status")

    # Test the 'i' command
    print("\nTesting 'i' command:")
    test_command("i")

    # Test the 'R' command
    print("\nTesting 'R' command:")
    test_command("R")

    # Test the 'L,?' command
    print("\nTesting 'L,?' command:")
    test_command("L,?")

You should be able to look at my script and see what you need to change to test your other sensors. It should just be the I2C address and the test commands at the bottom.

I have not tried this with the AtlasIoT service running. I “killed” the process and then got these working. Now that they’re working I’m going to try to send commands via MQTT from Home Assistant and see if I could get a response that could be used in an automation. I’m currently working 12-hour shifts doing security so i don’t have a lot of time to try so if you beat me to it, let me know.

I got it working.

Create a script on your RPi with the following code. Change the permissions to ANYONE. Then execute in a terminal window. It will continue to print a statement showing that it is running. You can always change this later and have it run in the background. You’ll also need to create a service to start the script on boot. Add your info for the ip, username, and password.

import paho.mqtt.client as mqtt
import time
from AtlasI2C import AtlasI2C  # Import the AtlasI2C class from your module

# Define MQTT settings with username and password
MQTT_BROKER = "your home assistant ip"  # IP or hostname of your MQTT broker
MQTT_PORT = 1883
MQTT_USERNAME = "your mqtt user name"  # Replace with your MQTT username
MQTT_PASSWORD = "your mqtt password"  # Replace with your MQTT password
MQTT_TOPIC_COMMAND = "ezo/command"  # Topic to receive commands
MQTT_TOPIC_RESPONSE = "ezo/response"  # Topic to send responses back

# Callback when a message is received
def on_message(client, userdata, msg):
    print("Message received callback triggered")
    payload = msg.payload.decode()
    print(f"Received message: {payload}")
    
    if ":" in payload:
        address_str, command = payload.split(":", 1)
        try:
            address = int(address_str)
            print(f"Using address: {address}")
        except ValueError:
            print("Invalid address format.")
            return
        
        ezo_sensor = AtlasI2C(address=address)
        try:
            response = ezo_sensor.query(command)
            client.publish(MQTT_TOPIC_RESPONSE, response)
            print(f"Response sent: {response}")
        except Exception as e:
            print(f"Error querying sensor: {e}")
            client.publish(MQTT_TOPIC_RESPONSE, f"Error: {e}")
    else:
        print("Invalid message format. Expected 'address:command'.")

# Create MQTT client instance
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)  # Set username and password
client.on_message = on_message

# Connect to MQTT broker
client.connect(MQTT_BROKER, MQTT_PORT, 60)

# Subscribe to the command topic
client.subscribe(MQTT_TOPIC_COMMAND)

# Loop forever to handle incoming messages
client.loop_start()

# Add periodic print statements
while True:
    print("Script is running...")
    time.sleep(60)  # Sleep for 60 seconds

Make the the AtlasIoT script from my previous comment is in the same folder as this script.

In Home Assistant create a MQTT sensor in your configuration file or the file you point to for this. Depending on how you have it set up it will look like one of the below examples. You might have to restart HA.

- sensor:
  - name: "EZO Sensor Response"
    state_topic: "ezo/response"
    qos: 1

sensor:
  - platform: mqtt
    name: "EZO Sensor Response"
    state_topic: "ezo/response"
    qos: 1

Then use the Developer Tools or whatever you want to run the below action. Enter the decimal for the sensor followed by a colon followed by the command. For example, for a pH sensor use 99 not the 0x63. The below action gets the status of the LED, Change it to 99:Status or 99:R or 99:i or whatever command you want from the datasheet. Change the 99 for whatever sensor you want. After you send the command check the state of the sensor you created and it should have a response if the command you sent provides one.

action: mqtt.publish
data:
  topic: "ezo/command"
  payload: "99:L,?"

You could probably get away with using only one sensor and add a delay in between commands in your automation. For example, an automation to control dosage of pH up and pH down would use a time trigger to check to publish a MQTT command to check the pH every 30 minutes. Then use conditions based on whether the level is to high or to low that send a command to trigger the pump to add a certain amount. Then in 30 minutes when its had time to mix it’ll check again and trigger the pump again if needed. You might have to strip away some of the response and tweak a fee other things but I think the biggest issue is solved.

Here’s how to start the script on system boot and automatically restart it if it stops.

Created a Systemd Service File:

sudo nano /etc/systemd/system/mqtt_ezo_script.service

Add the following content to the service file but with the correct path to your script and the working directory that it is in:

INI
[Unit]
Description=MQTT EZO Script Service
After=network.target

[Service]
ExecStart=/usr/bin/python3 /path/to/script.py
WorkingDirectory=/path/to/  # Ensure this directory exists if necessary
StandardOutput=inherit
StandardError=inherit
Restart=always
RestartSec=5
User=pi

[Install]
WantedBy=multi-user.target

Reloaded the Systemd Daemon:

sudo systemctl daemon-reload

Enabled the Service to Start on Boot:

sudo systemctl enable mqtt_ezo_script.service

Started the Service Immediately:

sudo systemctl start mqtt_ezo_script.service

Verified the Service Status:

sudo systemctl status mqtt_ezo_script.service

To temporarily stop the service so you could edit your script and do whatever and not worry about it restarting.

sudo systemctl stop mqtt_ezo_script.service

To start it back up.

sudo systemctl start mqtt_ezo_script.service

The AtlasIoT service needs to be stopped for the script and all of the sensor commands to work. I thought about adding a command to the script that stops and then restarts the AtlasIoT service to work around sending commands via MQTT but it seems like too much trouble. Maybe I’ll send an email to Atlas tech support to see if there’s a way for both to function at the same time. However, if I had your setup I don’t even think i would use the AtlasIoT and instead handle everything in Home Assistant. Instead of having the AtlasIoT UI on the touchscreens, I’d replace it with HA in kiosk mode. I’m sure you could design a better UI and have it display and function the same as AtlasIoT but with additional functionality to handle the pumps or anything else you might want. Then you’ll no longer have to do it from your computer or wherever you primarily work on HA at. You could have four different views, one for each screen, that all use the same exact yaml by using the decluttering card and passing a variable from each view/screen so the everything goes to the correct rpi.

I actually started making one several months ago just to see if i could duplicate the UI of AtlasIoT. I didn’t put too much time into it but as you can see it looks similar.