2023-04-01 18:53:28 +02:00
|
|
|
import random
|
|
|
|
import time
|
|
|
|
import can
|
|
|
|
|
|
|
|
waitForUserInput = True
|
|
|
|
bus = None
|
|
|
|
|
|
|
|
|
2023-04-01 18:55:55 +02:00
|
|
|
def sendMessageOnCan(message, bustype="socketcan", channel="can0"):
|
|
|
|
global bus
|
|
|
|
if bus is None:
|
|
|
|
bus = can.interface.Bus(channel, bustype="socketcan")
|
|
|
|
busEnabled = True
|
|
|
|
|
|
|
|
msg = can.Message(arbitration_id=0x002, data=message, is_extended_id=False)
|
|
|
|
bus.send(msg)
|
|
|
|
|
2023-04-01 18:53:28 +02:00
|
|
|
|
2023-04-01 18:55:55 +02:00
|
|
|
def recive(bustype="socketcan", channel="can0"):
|
|
|
|
global bus
|
|
|
|
message = bus.recv()
|
2023-04-01 18:53:28 +02:00
|
|
|
|
2023-04-01 18:55:55 +02:00
|
|
|
if message is None:
|
|
|
|
print("Timeout occurred, no message.")
|
|
|
|
# timeOut
|
2023-04-01 18:53:28 +02:00
|
|
|
|
2023-04-01 18:55:55 +02:00
|
|
|
else:
|
|
|
|
# print('We got a message hell yeah')
|
|
|
|
print(message)
|
|
|
|
return message
|
|
|
|
|
|
|
|
|
2023-04-01 20:51:35 +02:00
|
|
|
def waitForUserInput(waitForUserInput, prompt="Ready to start?"):
|
2023-04-01 18:55:55 +02:00
|
|
|
if waitForUserInput:
|
|
|
|
readInput = "n"
|
2023-04-01 20:51:35 +02:00
|
|
|
print(prompt)
|
2023-04-01 18:55:55 +02:00
|
|
|
while "y" not in readInput:
|
|
|
|
print("Enter y to start or c to cancel ")
|
|
|
|
readInput = input(" ")
|
|
|
|
if readInput == "c":
|
|
|
|
return -1
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def CanTest(bustype="socketcan", channel="can0"):
|
|
|
|
# -----------------------------------------------------------------------------------------------------#
|
2023-04-01 20:35:33 +02:00
|
|
|
print("====================")
|
2023-04-01 18:55:55 +02:00
|
|
|
print("Start testing the can bus connectivity ")
|
|
|
|
global waitForUserInput
|
|
|
|
if waitForUserInput(waitForUserInput) != 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
print("testing can for a single message")
|
|
|
|
messageType = 1
|
|
|
|
testMessage = [messageType, 0xC, 0, 0xF, 0xF, 0xE, 0xE]
|
|
|
|
testMessageData = [1, 12, 0, 15, 15, 14, 14, 0]
|
|
|
|
|
|
|
|
sendMessageOnCan(testMessage)
|
|
|
|
recivedMessage = recive().data
|
|
|
|
|
|
|
|
result = True
|
|
|
|
count = 0
|
|
|
|
|
|
|
|
for entry in recivedMessage:
|
|
|
|
if entry != testMessageData[count]:
|
|
|
|
result = False
|
|
|
|
break
|
|
|
|
|
|
|
|
count = count + 1
|
|
|
|
|
|
|
|
print(recivedMessage)
|
2023-04-01 20:35:33 +02:00
|
|
|
print(
|
|
|
|
"Sending a single message test with result {}".format(
|
|
|
|
"PASSED" if result else "FAILED"
|
|
|
|
)
|
|
|
|
)
|
2023-04-01 18:55:55 +02:00
|
|
|
# -----------------------------------------------------------------------------------------------------#
|
|
|
|
|
2023-04-01 20:35:33 +02:00
|
|
|
print("====================")
|
2023-04-01 18:55:55 +02:00
|
|
|
print("testing for multiple sends")
|
|
|
|
if waitForUserInput(waitForUserInput) != 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
packete = []
|
|
|
|
sendCountFinally = 0
|
|
|
|
trials = 12
|
|
|
|
for sendCount in range(trials):
|
|
|
|
sendMessageOnCan([1, 0xC, 0, 0xF, 0xF, 0xE, 0xE, sendCount])
|
|
|
|
time.sleep(0.1)
|
|
|
|
packete.append(recive())
|
|
|
|
|
|
|
|
for packet in packete:
|
|
|
|
print(packet.data)
|
|
|
|
|
|
|
|
print(
|
2023-04-01 20:35:33 +02:00
|
|
|
"Sending multiple testmessages test with result {}".format(
|
|
|
|
"PASSED" if len(packete) == len(range(trials)) else "FAILED"
|
2023-04-01 18:55:55 +02:00
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
# -----------------------------------------------------------------------------------------------------#
|
|
|
|
print("I am done with this shit")
|