Skip to content

Commit

Permalink
remove the cdc from pytest
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-jam committed Dec 20, 2024
1 parent 776deca commit a1e0772
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 188 deletions.
161 changes: 43 additions & 118 deletions device/esp_tinyusb/test_apps/teardown_device/main/test_teardown.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,37 @@
static const char *TAG = "teardown";

SemaphoreHandle_t wait_mount = NULL;
SemaphoreHandle_t wait_terminal = NULL;
SemaphoreHandle_t wait_command = NULL;

static uint8_t rx_buf[CONFIG_TINYUSB_CDC_RX_BUFSIZE + 1];
static uint8_t tx_buf[CONFIG_TINYUSB_CDC_TX_BUFSIZE + 1] = { 0 };
#define TEARDOWN_DEVICE_INIT_DELAY_MS 1000
#define TEARDOWN_DEVICE_ATTACH_TIMEOUT_MS 1000
#define TEARDOWN_DEVICE_DETACH_DELAY_MS 1000

#define TEARDOWN_CMD_KEY 0xAA
#define TEARDOWN_RPL_KEY 0x55
#define TEARDOWN_CMD_RPL_SIZE ((TUD_OPT_HIGH_SPEED ? 512 : 64))
#define TEARDOWN_ATTACH_TIMEOUT_MS 2000
#define TEARDOWN_COMMAND_TIMEOUT_MS 3000
#define TEARDOWN_AMOUNT 4
#define TEARDOWN_AMOUNT 10

static const tusb_desc_device_t cdc_device_descriptor = {
.bLength = sizeof(cdc_device_descriptor),
#define TUSB_DESC_TOTAL_LEN (TUD_CONFIG_DESC_LEN)

static uint8_t const test_configuration_descriptor[] = {
// Config number, interface count, string index, total length, attribute, power in mA
TUD_CONFIG_DESCRIPTOR(1, 0, 0, TUSB_DESC_TOTAL_LEN, TUSB_DESC_CONFIG_ATT_SELF_POWERED | TUSB_DESC_CONFIG_ATT_REMOTE_WAKEUP, 100),
};

static const tusb_desc_device_t test_device_descriptor = {
.bLength = sizeof(test_device_descriptor),
.bDescriptorType = TUSB_DESC_DEVICE,
.bcdUSB = 0x0200,
.bDeviceClass = TUSB_CLASS_MISC,
.bDeviceSubClass = MISC_SUBCLASS_COMMON,
.bDeviceProtocol = MISC_PROTOCOL_IAD,
.bMaxPacketSize0 = CFG_TUD_ENDPOINT0_SIZE,
.idVendor = USB_ESPRESSIF_VID,
.idVendor = 0x303A, // This is Espressif VID. This needs to be changed according to Users / Customers
.idProduct = 0x4002,
.bcdDevice = 0x0100,
.bcdDevice = 0x100,
.iManufacturer = 0x01,
.iProduct = 0x02,
.iSerialNumber = 0x03,
.bNumConfigurations = 0x01
};

static const uint16_t cdc_desc_config_len = TUD_CONFIG_DESC_LEN + CFG_TUD_CDC * TUD_CDC_DESC_LEN;
static const uint8_t cdc_desc_configuration[] = {
TUD_CONFIG_DESCRIPTOR(1, 2, 0, cdc_desc_config_len, TUSB_DESC_CONFIG_ATT_REMOTE_WAKEUP, 100),
TUD_CDC_DESCRIPTOR(0, 4, 0x81, 8, 0x02, 0x82, (TUD_OPT_HIGH_SPEED ? 512 : 64)),
};

#if (TUD_OPT_HIGH_SPEED)
static const tusb_desc_device_qualifier_t device_qualifier = {
.bLength = sizeof(tusb_desc_device_qualifier_t),
Expand All @@ -76,32 +71,6 @@ static const tusb_desc_device_qualifier_t device_qualifier = {
};
#endif // TUD_OPT_HIGH_SPEED

static void tinyusb_cdc_rx_callback(int itf, cdcacm_event_t *event)
{
// Something was received
xSemaphoreGive(wait_command);
}

/**
* @brief CDC device line change callback
*
* CDC device signals, that the DTR, RTS states changed
*
* @param[in] itf CDC device index
* @param[in] event CDC event type
*/
void tinyusb_cdc_line_state_changed_callback(int itf, cdcacm_event_t *event)
{
int dtr = event->line_state_changed_data.dtr;
int rts = event->line_state_changed_data.rts;
ESP_LOGD(TAG, "Line state changed on channel %d: DTR:%d, RTS:%d", itf, dtr, rts);

// Terminal:
// dtr==1 && rts==1 - connected
// dtr==0 && rts==0 - disconnected
xSemaphoreGive(wait_terminal);
}

// Invoked when device is mounted
void tud_mount_cb(void)
{
Expand All @@ -112,102 +81,58 @@ void tud_mount_cb(void)
* @brief TinyUSB Teardown specific testcase
*
* Scenario:
* - Installs the tinyUSB driver via esp-tinyusb wrapper with 1xCDC class device
* - Awaits device configuration be the Host (TEARDOWN_ATTACH_TIMEOUT_MS)
* - Awaits the terminal connection (TEARDOWN_ATTACH_TIMEOUT_MS)
* - Expects the command sequence from the Host (TEARDOWN_COMMAND_TIMEOUT_MS)
* - Replies with the response sequence to the Host
* - Awaits terminal disconnection (TEARDOWN_ATTACH_TIMEOUT_MS)
* - Teardowns the tinyUSB driver via esp-tinyusb wrapper
* - Repeats the steps from the step.1 N times (where N = TEARDOWN_AMOUNT)
* - Verifies amount of attempts and memory leakage (attempts should be 0)
*
* command sequence[] = ep_size * 0xAA
* response sequence[] = ep_size * 0x55
*
* Hint: Values 0xAA and 0x55 were selected to verify the buffer memory integrity,
* as the 0xAA and 0x55 are the inversion of each other and the data bits in the same position changes from 1 to 0 in every transaction.
* 1. Install TinyUSB device without any class
* 2. Wait SetConfiguration() (tud_mount_cb)
* 3. If attempts == 0 goto step 8
* 4. Wait TEARDOWN_DEVICE_DETACH_DELAY_MS
* 5. Uninstall TinyUSB device
* 6. Wait TEARDOWN_DEVICE_INIT_DELAY_MS
* 7. Decrease attempts by 1, goto step 3
* 8. Wait TEARDOWN_DEVICE_DETACH_DELAY_MS
* 9. Uninstall TinyUSB device
*/
TEST_CASE("tinyusb_teardown", "[esp_tinyusb][teardown]")
{
size_t rx_size = 0;

wait_mount = xSemaphoreCreateBinary();
TEST_ASSERT_NOT_EQUAL(NULL, wait_mount);
wait_command = xSemaphoreCreateBinary();
TEST_ASSERT_NOT_EQUAL(NULL, wait_command);
wait_terminal = xSemaphoreCreateBinary();
TEST_ASSERT_NOT_EQUAL(NULL, wait_terminal);

// Prep reply
for (int i = 0; i < TEARDOWN_CMD_RPL_SIZE; i++) {
tx_buf[i] = TEARDOWN_RPL_KEY;
}

// TinyUSB driver configuration
const tinyusb_config_t tusb_cfg = {
.device_descriptor = &cdc_device_descriptor,
.device_descriptor = &test_device_descriptor,
.string_descriptor = NULL,
.string_descriptor_count = 0,
.external_phy = false,
#if (TUD_OPT_HIGH_SPEED)
.fs_configuration_descriptor = cdc_desc_configuration,
.hs_configuration_descriptor = cdc_desc_configuration,
.fs_configuration_descriptor = test_configuration_descriptor,
.hs_configuration_descriptor = test_configuration_descriptor,
.qualifier_descriptor = &device_qualifier,
#else
.configuration_descriptor = cdc_desc_configuration,
.configuration_descriptor = test_configuration_descriptor,
#endif // TUD_OPT_HIGH_SPEED
};

// TinyUSB ACM Driver configuration
const tinyusb_config_cdcacm_t acm_cfg = {
.usb_dev = TINYUSB_USBDEV_0,
.cdc_port = TINYUSB_CDC_ACM_0,
.rx_unread_buf_sz = 64,
.callback_rx = &tinyusb_cdc_rx_callback,
.callback_rx_wanted_char = NULL,
.callback_line_state_changed = &tinyusb_cdc_line_state_changed_callback,
.callback_line_coding_changed = NULL
};
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_driver_install(&tusb_cfg));
// Wait for the usb event
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_mount, pdMS_TO_TICKS(TEARDOWN_DEVICE_ATTACH_TIMEOUT_MS)));

int attempts = TEARDOWN_AMOUNT;
while (attempts) {
while (attempts--) {
// Keep device attached
vTaskDelay(pdMS_TO_TICKS(TEARDOWN_DEVICE_DETACH_DELAY_MS));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_driver_uninstall());
// Teardown
vTaskDelay(pdMS_TO_TICKS(TEARDOWN_DEVICE_INIT_DELAY_MS));
// Reconnect
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_driver_install(&tusb_cfg));
// Init CDC 0
TEST_ASSERT_FALSE(tusb_cdc_acm_initialized(TINYUSB_CDC_ACM_0));
TEST_ASSERT_EQUAL(ESP_OK, tusb_cdc_acm_init(&acm_cfg));
TEST_ASSERT_TRUE(tusb_cdc_acm_initialized(TINYUSB_CDC_ACM_0));
// Wait for the usb event
ESP_LOGD(TAG, "wait dev mounted...");
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_mount, pdMS_TO_TICKS(TEARDOWN_ATTACH_TIMEOUT_MS)));
ESP_LOGD(TAG, "wait terminal connection...");
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_terminal, pdMS_TO_TICKS(TEARDOWN_ATTACH_TIMEOUT_MS)));
// Wait for the command
ESP_LOGD(TAG, "wait command...");
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_command, pdMS_TO_TICKS(TEARDOWN_COMMAND_TIMEOUT_MS)));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_cdcacm_read(TINYUSB_CDC_ACM_0, rx_buf, CONFIG_TINYUSB_CDC_RX_BUFSIZE, &rx_size));
for (int i = 0; i < TEARDOWN_CMD_RPL_SIZE; i++) {
TEST_ASSERT_EQUAL(TEARDOWN_CMD_KEY, rx_buf[i]);
}
ESP_LOGD(TAG, "command received");
// Reply the response sequence
ESP_LOGD(TAG, "send response...");
TEST_ASSERT_EQUAL(TEARDOWN_CMD_RPL_SIZE, tinyusb_cdcacm_write_queue(TINYUSB_CDC_ACM_0, tx_buf, TEARDOWN_CMD_RPL_SIZE));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_cdcacm_write_flush(TINYUSB_CDC_ACM_0, pdMS_TO_TICKS(1000)));
ESP_LOGD(TAG, "wait for terminal disconnection");
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_terminal, pdMS_TO_TICKS(TEARDOWN_ATTACH_TIMEOUT_MS)));
// Teardown
attempts--;
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_cdcacm_unregister_callback(TINYUSB_CDC_ACM_0, CDC_EVENT_RX));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_cdcacm_unregister_callback(TINYUSB_CDC_ACM_0, CDC_EVENT_LINE_STATE_CHANGED));
TEST_ASSERT_EQUAL(ESP_OK, tusb_cdc_acm_deinit(TINYUSB_CDC_ACM_0));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_driver_uninstall());
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(wait_mount, pdMS_TO_TICKS(TEARDOWN_DEVICE_ATTACH_TIMEOUT_MS)));
}

// Teardown
vTaskDelay(pdMS_TO_TICKS(TEARDOWN_DEVICE_DETACH_DELAY_MS));
TEST_ASSERT_EQUAL(ESP_OK, tinyusb_driver_uninstall());
// Remove primitives
vSemaphoreDelete(wait_mount);
vSemaphoreDelete(wait_command);
// All attempts should be completed
TEST_ASSERT_EQUAL(0, attempts);
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,36 @@

import pytest
from pytest_embedded_idf.dut import IdfDut
import serial
from serial.tools.list_ports import comports
from time import sleep, time
from usb import core, util

def get_tusb_cdc_device(vid, pid):
ports = comports()
for cdc in ports:
if cdc.vid == vid and cdc.pid == pid:
return cdc.device
return None

def wait_for_tusb_cdc(vid, pid, timeout=30):
start_time = time()
while time() - start_time < timeout:
sleep(0.5) # Check every 0.5 seconds
tusb_cdc = get_tusb_cdc_device(vid, pid)
if tusb_cdc:
return tusb_cdc
return None
def find_tusb_dev(vid, pid):
device = core.find(idVendor=vid, idProduct=pid)
return device is not None

def teardown_device(key_len, amount):
TUSB_VID = 0x303A # Espressif TinyUSB VID
TUSB_PID = 0x4002 # Espressif TinyUSB VID

# Command to send and expected response
COMMAND = b'\xAA' * key_len
EXPECTED_RESPONSE = b'\x55' * key_len

# Number of iterations, must be equal to ITERATIONS in the test application
ITERATIONS = amount

for i in range(ITERATIONS):
print(f"Iteration {i+1} of {ITERATIONS}")
def tusb_device_presence(vid, pid, iterations):
for i in range(iterations):
print(f"\Teardown {i + 1}/{iterations}")
# Wait until the device is present
print(f"Waiting for device ...")
while not find_tusb_dev(vid, pid):
sleep(0.5) # Check every 0.5 second
print("Device detected.")

# Wait for the device to appear
print("Waiting for the device to connect...")
tusb_cdc = wait_for_tusb_cdc(TUSB_VID, TUSB_PID)
if not tusb_cdc:
print("Error: Device did not appear within the timeout period.")
assert True
# Wait until the device is removed
print("Waiting for the device to be removed...")
while find_tusb_dev(vid, pid):
sleep(0.5) # Check every 0.5 second
print("Device removed.")

try:
# Open the serial port
with serial.Serial(port=tusb_cdc, baudrate=9600, timeout=1) as cdc:
print(f"Opened port: {tusb_cdc}")
# Send the key command
res = cdc.write(COMMAND)
assert res == key_len
# Get the response
res = cdc.readline()
assert len(res) == key_len
# Explicitly close the cdc
cdc.close()
# Check if the response matches the expected response
if res == EXPECTED_RESPONSE:
print("Response matches expected value.")
else:
print(f"Sent {len(COMMAND)}: {COMMAND.hex().upper()}")
print(f"Received {len(res)}: {res.hex().upper()}")
raise Exception("Error: Response does not match expected value.")

except serial.SerialException as e:
print(f"Error communicating with the serial port: {e}")
raise

# Wait for the device to disconnect
print("Waiting for the device to disconnect...")
while get_tusb_cdc_device(TUSB_VID, TUSB_PID):
sleep(0.1) # Poll every 0.1 second while tinyusb cdc dev still in list

print("Finished all iterations.")
def teardown_device(amount):
TUSB_VID = 0x303A # Espressif TinyUSB VID
TUSB_PID = 0x4002 # Espressif TinyUSB VID
tusb_device_presence(TUSB_VID, TUSB_PID, amount)
print("Monitoring completed.")

@pytest.mark.esp32s2
@pytest.mark.esp32s3
Expand All @@ -84,11 +43,5 @@ def test_usb_teardown_device(dut) -> None:
dut.write('[teardown]')
dut.expect_exact('TinyUSB: TinyUSB Driver installed')
sleep(2) # Some time for the OS to enumerate our USB device
if dut.target == 'esp32p4':
MPS = 512
else:
MPS = 64
# On Linux, the serial port kept opened, while len==MPS, https://github.com/pyserial/pyserial/issues/753
# that can be seen via Beagle: CDC OUT transaction appears only when terminal is closed.
teardown_device(MPS, 4) # Teardown tusb device
teardown_device(10) # Teardown tusb device

0 comments on commit a1e0772

Please sign in to comment.