Raspberry Breadstick

A long, lean, delicious development board with a unique form factor and high-quality components

Limited items in stock

View Purchasing Options
Jan 16, 2024

Project update 6 of 8

Everything Everywhere All At Once: A Multitasking Tutorial

by Michael Rangen

This week, we’re showcasing how asynchronous programming with the asyncio library can revolutionize your projects. Using the asyncio library allows multiple tasks to run concurrently, significantly improving the efficiency and performance of your Breadstick. Here’s a breakdown of how it works:

Asynchronous Loops:

It’s easy to write a program that reads one sensor and drives one actuator, all you have to do is read, write, and repeat. Processors execute lines of code extremely quickly, so it’s common to put a delay in the code loop. The typical time.delay(seconds) function is a "busy" delay. It prevents our processor from doing anything useful until it’s finished being delayed. Asyncio lets us turn the, "wait here and do nothing for a bit" message into "go do something else for a bit!" This lets you set up a bunch of individual loops, each with its unique delay before its code is run again. For instance, I wanted to update the LED strip 30 times per second but only wanted to have the device print the current variable values to the computer screen every one second; asyncio makes this very easy to do.

Task Switching with ‘await asyncio.sleep(seconds)’

Allows the processor to efficiently switch between tasks, maximizing productivity and minimizing idle time.

await asyncio.sleep(5) translates to, "You can go do something else, but come back after five seconds has passed."

Global Dictionary for Communication

This allows the individual loops to have a shared place to read and write data to.

Practical Uses

Allows you to accomplish more complex tasks by scheduling how often each task should take place.

Watch it in action in this demo video:

'''
Raspberry Breadstick Multitasking with asyncio
This code creates a bunch of async loops that are revisited by the processor after a defined amount of time has passed.
Think of each async fuction as its own main loop running in a little container.
await asyncio.sleep(period) in each of those self-contained loops tells the processor it can go do something else for a bit.
A global dictionary is used as a way to share information across each of these self-contained loops.
'''

from board import *
import keypad
import asyncio
from digitalio import DigitalInOut
from adafruit_simplemath import map_range

#DotStar LED Libraries
from adafruit_dotstar import DotStar
from adafruit_fancyled.adafruit_fancyled import CRGB, CHSV

#I2C Acceleromter and Gyroscope
import busio
from adafruit_lsm6ds.lsm6ds3trc import LSM6DS3TRC as LSM6DS
from adafruit_lsm6ds import Rate, AccelRange, GyroRange

#Servo
from pwmio import PWMOut
from adafruit_motor.servo import Servo

#Potentiometer
from analogio import AnalogIn



async def status_update(frequency):
    '''Prints current values in the global shared dictionary.'''
    period = 1/frequency
    global shared
    while True:
        print()
        for key in shared.keys():
            print(key, ":", shared[key])
        await asyncio.sleep(period)

async def servo_update(frequency, pin, var):
    '''Updates servo angle.'''
    period = 1/frequency
    pwm = PWMOut(pin, duty_cycle=2 ** 15, frequency=50)
    servo = Servo(pwm)
    new_angle = 0
    global shared
    while True:
        new_angle = map_range(shared[var],0,65535,0,180)
        servo.angle = new_angle
        await asyncio.sleep(period)

async def pot_update(frequency,pin,var):
    '''Reads new potentiometer value.'''
    period = 1/frequency
    global shared
    adc = AnalogIn(pin)
    while True:
        shared[var] = adc.value
        await asyncio.sleep(period)


async def led_update(frequency):
    '''Updates LED Strip.'''
    period = 1/frequency
    global shared
    leds = DotStar(DOTSTAR_CLOCK, DOTSTAR_DATA, 24, brightness=0.25, auto_write=False)
    offset = 1
    while True:
        offset +=map_range(shared['gyro_z'],-10,10,-1,1)
        leds.fill((0,0,0))
        for i in range(map_range(shared['pot2'],0,65535,0,24)):
            c = CRGB(CHSV(i/24/5+offset,1.0,0.1))
            leds[i] = c.pack()
        leds.show()
        await asyncio.sleep(period)

async def imu_update(frequency):
    '''Reads latest X,Y,Z values from the Gyroscope and Accelerometer.'''
    period = 1/frequency
    global shared

    #Setup I2C Accelerometer and Gyroscope
    i2c = busio.I2C(IMU_SCL, IMU_SDA)
    IMU = LSM6DS(i2c)
    IMU.accelerometer_range = AccelRange.RANGE_4G
    print("Accelerometer range set to: %d G" % AccelRange.string[IMU.accelerometer_range])
    IMU.gyro_range = GyroRange.RANGE_1000_DPS
    print("Gyro range set to: %d DPS" % GyroRange.string[IMU.gyro_range])
    IMU.accelerometer_data_rate = Rate.RATE_1_66K_HZ
    print("Accelerometer rate set to: %d HZ" % Rate.string[IMU.accelerometer_data_rate])
    IMU.gyro_data_rate = Rate.RATE_1_66K_HZ
    print("Gyro rate set to: %d HZ" % Rate.string[IMU.gyro_data_rate])

    while True:
        shared['acc_x'], shared['acc_y'], shared['acc_z'] = IMU.acceleration
        shared['gyro_x'], shared['gyro_y'], shared['gyro_z'] = IMU.gyro
        await asyncio.sleep(period)

async def main():
    '''Main Program Loop.  Gets called below by asyncio.run(main()).'''
    led_task = asyncio.create_task(led_update(30))
    imu_task = asyncio.create_task(imu_update(30))
    servo_task = asyncio.create_task(servo_update(50,D9,'pot1'))
    pot1_task = asyncio.create_task(pot_update(50,A18,'pot1'))
    pot2_task = asyncio.create_task(pot_update(50,A17,'pot2'))
    status_task = asyncio.create_task(status_update(1))

    await led_task
    await imu_task
    await servo_task
    await pot1_task
    await pot2_task
    await status_task

#Dictionary of global values accessible to async functions
shared = {}
shared['pot1'] = 0
shared['pot2'] = 0
shared['acc_x'] = 0
shared['acc_y'] = 0
shared['acc_z'] = 0
shared['gyro_x'] = 0
shared['gyro_y'] = 0
shared['gyro_z'] = 0

#Run main loop
asyncio.run(main())



Additional Learning

For a deeper understanding of async programming, check out "Python Asynchronous Programming - AsyncIO & Async/Await" by Tech With Tim.


Sign up to receive future updates for Raspberry Breadstick.

Subscribe to the Crowd Supply newsletter, highlighting the latest creators and projects