Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 275 additions & 26 deletions pynq/pl_server/remote_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
bitstream_hash
)
from pynq.remote import (
remote_device_pb2_grpc, mmio_pb2_grpc, buffer_pb2_grpc,
remote_device_pb2, mmio_pb2, buffer_pb2,
remote_device_pb2_grpc, remote_device_pb2,
mmio_pb2_grpc, mmio_pb2,
buffer_pb2_grpc, buffer_pb2,
gpio_pb2_grpc, gpio_pb2,
)

import grpc
Expand Down Expand Up @@ -184,6 +186,7 @@ def __init__(self, index=0, ip_addr=None, port=PYNQ_PORT, tag="remote{}"):
'device': remote_device_pb2_grpc.RemoteDeviceStub(self.client.channel),
'mmio': mmio_pb2_grpc.MmioStub(self.client.channel),
'buffer': buffer_pb2_grpc.RemoteBufferStub(self.client.channel),
'gpio': gpio_pb2_grpc.GpioStub(self.client.channel),
}

self.arch = self.get_arch()
Expand Down Expand Up @@ -515,48 +518,294 @@ def allocate(self, shape, dtype, cacheable=1, **kwargs):


class RemoteGPIO:
"""Remote GPIO placeholder class

Placeholder implementation for GPIO operations on remote devices.
GPIO functionality is not yet implemented for remote PYNQ devices.
"""Internal Helper class to wrap Linux's GPIO Sysfs API.

This GPIO class does not handle PL I/O without the use of
device tree overlays.

Attributes
----------
index : int
The index of the GPIO, starting from the GPIO base.
direction : str
Input/output direction of the GPIO.
"""

def __init__(self, gpio_index=None, direction=None):
"""Initialize RemoteGPIO object
def __init__(self, gpio_index, direction, device=None):
"""Return a new RemoteGPIO object.

Parameters
----------
gpio_index : int, optional
GPIO pin index number
direction : str, optional
GPIO direction ('in' or 'out')
gpio_index : int
The index of the RemoteGPIO using Linux's GPIO Sysfs API.
direction : 'str'
Input/output direction of the GPIO.
device : RemoteDevice
Device object for RemoteGPIO operations
"""
if device is None:
raise RuntimeError("RemoteGPIO requires a RemoteDevice instance")
self.device = device
self.gpio_index = gpio_index
self.direction = direction
warnings.warn("GPIO operations are not yet implemented for remote devices")
self._direction = direction
self._stub = device._stub['gpio']

response = self._stub.get_gpio(
gpio_pb2.GetGpioRequest(index=gpio_index, direction=direction)
)
self._gpio_id = response.gpio_id

def read(self):
raise RuntimeError("GPIO operations are not yet implemented for remote devices")
"""The method to read a value from the GPIO.

Returns
-------
int
An integer read from the GPIO

"""
if self.direction != 'in':
raise AttributeError("Cannot read from GPIO output.")

response = self._stub.read(
gpio_pb2.GpioReadRequest(gpio_id=self._gpio_id)
)
return response.value

def write(self, value):
raise RuntimeError("GPIO operations are not yet implemented for remote devices")
"""The method to write a value into the GPIO.

Parameters
----------
value : int
An integer value, either 0 or 1

Returns
-------
None

"""
if self.direction != 'out':
raise AttributeError("Cannot write to GPIO input.")

if value not in (0, 1):
raise ValueError("Can only write integer 0 or 1.")

response = self._stub.write(
gpio_pb2.GpioWriteRequest(gpio_id=self._gpio_id, value=value)
)
return

def unexport(self):
"""The method to unexport the GPIO using Linux's GPIO Sysfs API.

Returns
-------
None

"""
response = self._stub.unexport(
gpio_pb2.GpioUnexportRequest(gpio_id=self._gpio_id)
)

def release(self):
"""Release GPIO resources
"""The method to release the GPIO.

Returns
-------
None

"""
self.unexport()

No-op for remote GPIO placeholder
def is_exported(self):
"""The method to check if a GPIO is still exported using
Linux's GPIO Sysfs API.

Returns
-------
bool
True if the GPIO is still loaded.

"""
pass
response = self._stub.is_exported(
gpio_pb2.GpioIsExportedRequest(gpio_id=self._gpio_id)
)
return bool(response.is_exported)

@property
def index(self):
return self.gpio_index

@property
def direction(self):
return self._direction

@property
def path(self):
warnings.warn("This property is not implemented for remote devices.")

# Add class methods to match the GPIO API
@staticmethod
def get_gpio_pin(gpio_user_index, target_label=None):
"""Get GPIO pin by user index
def get_gpio_base_path(target_label=None, device=None):
"""This method returns the path to the Remote GPIO base using Linux's
GPIO Sysfs API. This path relates to the target device, not the
host machine.

This is a static method. To use:

>>> from pynq import GPIO

>>> from pynq.pl_server import RemoteDevice

Placeholder method to prevent attribute errors in remote device context.
>>> device = RemoteDevice(ip_addr='192.168.2.99')

>>> gpio = GPIO.get_gpio_base_path(device=device)

Parameters
----------
target_label : str
The label of the GPIO driver to look for, as defined in a
device tree entry.
device : RemoteDevice
Remote device object for RemoteGPIO operations

Returns
-------
str
The path to the Remote GPIO base.

"""
if device is None:
raise RuntimeError("get_gpio_base_path requires a RemoteDevice instance.")
stub = device._stub['gpio']

response = stub.get_gpio_base_path(
gpio_pb2.GetGpioBasePathRequest(target_label=target_label)
)
if response.base_path == "":
return None
return response.base_path

@staticmethod
def get_gpio_base(target_label=None, device=None):
"""This method returns the GPIO base using Linux's GPIO Sysfs API.

This is a static method. To use:

>>> from pynq import GPIO

>>> from pynq.pl_server import RemoteDevice

>>> device = RemoteDevice(ip_addr='192.168.2.99')

>>> gpio = GPIO.get_gpio_base(device)

Note
----
For path '/sys/class/gpio/gpiochip138/', this method returns 138.

Parameters
----------
target_label : str
The label of the GPIO driver to look for, as defined in a
device tree entry.
device : RemoteDevice
Remote device object for RemoteGPIO operations

Returns
-------
int
The GPIO index of the base.

"""
warnings.warn("GPIO operations are not yet implemented for remote devices")
return gpio_user_index # Just return the index to prevent errors
if device is None:
raise RuntimeError("get_gpio_base requires a RemoteDevice instance.")

base_path = RemoteGPIO.get_gpio_base_path(target_label=target_label, device=device)
if base_path is not None:
return int(''.join(x for x in base_path if x.isdigit()))

@staticmethod
def get_gpio_pin(gpio_user_index, target_label=None, device=None):
"""This method returns a GPIO instance for PS GPIO pins.

Users only need to specify an index starting from 0; this static
method will map this index to the correct Linux GPIO pin number.

Note
----
The GPIO pin number can be calculated using:
GPIO pin number = GPIO base + GPIO offset + user index
e.g. The GPIO base is 138, and pin 54 is the base GPIO offset.
Then the Linux GPIO pin would be (138 + 54 + 0) = 192.

Parameters
----------
gpio_user_index : int
The index specified by users, starting from 0.
target_label : str
The label of the GPIO driver to look for, as defined in a
device tree entry.

Returns
-------
int
The Linux Sysfs GPIO pin number.

"""
if device is None:
raise RuntimeError("get_gpio_pin requires a RemoteDevice instance.")

if target_label is not None:
GPIO_OFFSET = 0
else:
if device.arch == "aarch64":
GPIO_OFFSET = 78
else:
GPIO_OFFSET = 54
return (RemoteGPIO.get_gpio_base(target_label=target_label, device=device) +
GPIO_OFFSET +
gpio_user_index)

@staticmethod
def get_gpio_npins(target_label=None, device=None):
"""This method returns the number of GPIO pins for the GPIO base
using Linux's GPIO Sysfs API.

This is a static method. To use:

>>> from pynq import GPIO

>>> from pynq.pl_server import RemoteDevice

>>> device = RemoteDevice(ip_addr='192.168.2.99')

>>> gpio = GPIO.get_gpio_npins()

Parameters
----------
target_label : str
The label of the GPIO driver to look for, as defined in a
device tree entry.
device : RemoteDevice
Remote device object for RemoteGPIO operations

Returns
-------
int
The number of GPIO pins for the GPIO base.

"""
if device is None:
raise RuntimeError("get_gpio_npins requires a RemoteDevice instance.")

stub = device._stub['gpio']
response = stub.get_gpio_npins(
gpio_pb2.GetGpioNPinsRequest(
target_label=target_label)
)
return response.npins


class RemoteInterrupt:
"""Remote Interrupt placeholder class
Expand Down
54 changes: 54 additions & 0 deletions pynq/remote/gpio_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading