Compare commits
5 Commits
414e407151
...
4f601c8b44
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f601c8b44 | |||
| 72b3a0e14c | |||
| 750697f4f7 | |||
| 97ba8fc01b | |||
| 91f29264f9 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,2 +1,5 @@
|
||||
__pycache__/
|
||||
*.py[ocd]
|
||||
|
||||
env/
|
||||
.venv/
|
||||
|
||||
10
Test.py
10
Test.py
@ -1,17 +1,21 @@
|
||||
import time
|
||||
import can
|
||||
|
||||
bustype = 'socketcan'
|
||||
channel = 'can0' #echten can nutzen
|
||||
bustype = "socketcan"
|
||||
channel = "can0" # echten can nutzen
|
||||
|
||||
|
||||
def producer(id):
|
||||
""":param id: Spam the bus with messages including the data id."""
|
||||
bus = can.Bus(channel=channel, interface=bustype)
|
||||
for i in range(10):
|
||||
msg = can.Message(arbitration_id=0x002, data=[id, i, 0, 1, 3, 1, 4, 1], is_extended_id=False)
|
||||
msg = can.Message(
|
||||
arbitration_id=0x002, data=[id, i, 0, 1, 3, 1, 4, 1], is_extended_id=False
|
||||
)
|
||||
bus.send(msg)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
producer(10) # rausfinden welche id der Slave hat
|
||||
print("I am done with this shit")
|
||||
|
||||
@ -3,13 +3,12 @@ import can
|
||||
|
||||
input("startReading")
|
||||
|
||||
can_interface = 'can0' #echten can nutzen
|
||||
bus = can.interface.Bus(can_interface, bustype='socketcan')
|
||||
can_interface = "can0" # echten can nutzen
|
||||
bus = can.interface.Bus(can_interface, bustype="socketcan")
|
||||
message = bus.recv(5.0)
|
||||
|
||||
if message is None:
|
||||
print('Timeout occurred, no message.')
|
||||
print("Timeout occurred, no message.")
|
||||
else:
|
||||
print('We got a message hell yeah')
|
||||
print("We got a message hell yeah")
|
||||
print("{}".format(message.data)) # hat gesendete data von can benutzen
|
||||
|
||||
|
||||
49
canTest.py
49
canTest.py
@ -5,23 +5,23 @@ import can
|
||||
waitForUserInput = True
|
||||
bus = None
|
||||
|
||||
def sendMessageOnCan(message, bustype='socketcan',channel='can0'):
|
||||
|
||||
def sendMessageOnCan(message, bustype="socketcan", channel="can0"):
|
||||
global bus
|
||||
if bus is None:
|
||||
bus = can.interface.Bus(channel, bustype='socketcan')
|
||||
bus = can.interface.Bus(channel, bustype="socketcan")
|
||||
busEnabled = True
|
||||
|
||||
msg = can.Message(arbitration_id=0x002, data=message, is_extended_id=False)
|
||||
bus.send(msg)
|
||||
time.sleep(1)
|
||||
|
||||
def recive(bustype='socketcan',channel='can0'):
|
||||
|
||||
def recive(bustype="socketcan", channel="can0"):
|
||||
global bus
|
||||
message = bus.recv()
|
||||
|
||||
if message is None:
|
||||
print('Timeout occurred, no message.')
|
||||
print("Timeout occurred, no message.")
|
||||
# timeOut
|
||||
|
||||
else:
|
||||
@ -29,27 +29,30 @@ def recive(bustype='socketcan',channel='can0'):
|
||||
print(message)
|
||||
return message
|
||||
|
||||
def waitForUserInput(waitForUserInput):
|
||||
if(waitForUserInput):
|
||||
|
||||
def waitForUserInput(waitForUserInput, prompt="Ready to start?"):
|
||||
if waitForUserInput:
|
||||
readInput = "n"
|
||||
print("Ready to start ? ")
|
||||
while('y' not in readInput):
|
||||
print(prompt)
|
||||
while "y" not in readInput:
|
||||
print("Enter y to start or c to cancel ")
|
||||
readInput = input(" ")
|
||||
if(readInput == 'c'):
|
||||
if readInput == "c":
|
||||
return -1
|
||||
return 0
|
||||
|
||||
def CanTest(bustype='socketcan',channel='can0'):
|
||||
|
||||
def CanTest(bustype="socketcan", channel="can0"):
|
||||
# -----------------------------------------------------------------------------------------------------#
|
||||
print("====================")
|
||||
print("Start testing the can bus connectivity ")
|
||||
global waitForUserInput
|
||||
if (waitForUserInput(waitForUserInput) != 0):
|
||||
if waitForUserInput(waitForUserInput) != 0:
|
||||
return
|
||||
|
||||
print("testing can for a single message")
|
||||
messageType = 1
|
||||
testMessage = [messageType, 0xc, 0, 0xf, 0xf, 0xe, 0xe]
|
||||
testMessage = [messageType, 0xC, 0, 0xF, 0xF, 0xE, 0xE]
|
||||
testMessageData = [1, 12, 0, 15, 15, 14, 14, 0]
|
||||
|
||||
sendMessageOnCan(testMessage)
|
||||
@ -66,28 +69,34 @@ def CanTest(bustype='socketcan',channel='can0'):
|
||||
count = count + 1
|
||||
|
||||
print(recivedMessage)
|
||||
print("Sending a single message test with answer result {}".format(result))
|
||||
print(
|
||||
"Sending a single message test with result {}".format(
|
||||
"PASSED" if result else "FAILED"
|
||||
)
|
||||
)
|
||||
# -----------------------------------------------------------------------------------------------------#
|
||||
|
||||
print("====================")
|
||||
print("testing for multiple sends")
|
||||
if (waitForUserInput(waitForUserInput) != 0):
|
||||
if waitForUserInput(waitForUserInput) != 0:
|
||||
return
|
||||
|
||||
packete = []
|
||||
sendCountFinally = 0
|
||||
trials = 12
|
||||
for sendCount in range(trials):
|
||||
sendMessageOnCan([1, 0xc, 0, 0xf, 0xf, 0xe, 0xe, sendCount])
|
||||
sendMessageOnCan([1, 0xC, 0, 0xF, 0xF, 0xE, 0xE, sendCount])
|
||||
time.sleep(0.1)
|
||||
packete.append(recive())
|
||||
|
||||
for packet in packete:
|
||||
print(packet.data)
|
||||
|
||||
print("Sending multiple testmessages test with answer result {}".format(len(packete) == len(range(trials))))
|
||||
print(
|
||||
"Sending multiple testmessages test with result {}".format(
|
||||
"PASSED" if len(packete) == len(range(trials)) else "FAILED"
|
||||
)
|
||||
)
|
||||
|
||||
# -----------------------------------------------------------------------------------------------------#
|
||||
print("I am done with this shit")
|
||||
|
||||
|
||||
|
||||
|
||||
8
eProm.py
8
eProm.py
@ -1,12 +1,14 @@
|
||||
import canTest
|
||||
|
||||
|
||||
def ePromTest():
|
||||
print("====================")
|
||||
print("start eprom test")
|
||||
message = [4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||
canTest.waitForUserInput(True)
|
||||
canTest.sendMessageOnCan(message)
|
||||
answer = canTest.recive();
|
||||
print("ePromTest reult {}".format(str(answer.data) == "bytearray(b'iiiiBiii')"))
|
||||
|
||||
answer = canTest.recive()
|
||||
result = answer.data == bytearray(b"iiiBiiii")
|
||||
print("ePromTest result {}".format("PASSED" if result else "FAILED"))
|
||||
|
||||
print("I am done with this shit")
|
||||
|
||||
13
realTest.py
13
realTest.py
@ -5,18 +5,19 @@ import eProm
|
||||
|
||||
|
||||
def startBalancingTest():
|
||||
message = [5, 0xc, 0, 0xf, 0xf, 0xe, 0xe]
|
||||
message = [5, 0xC, 0, 0xF, 0xF, 0xE, 0xE]
|
||||
print("====================")
|
||||
print("startBalancing")
|
||||
canTest.waitForUserInput(True)
|
||||
canTest.sendMessageOnCan(message)
|
||||
|
||||
|
||||
#canTest.CanTest()
|
||||
#eProm.ePromTest()
|
||||
#voltageTest.voltagesTest()
|
||||
canTest.CanTest()
|
||||
eProm.ePromTest()
|
||||
voltageTest.voltagesTest()
|
||||
tempTest.tempTest()
|
||||
|
||||
#startBalancingTest()
|
||||
startBalancingTest()
|
||||
"""
|
||||
print("start Temp test")
|
||||
message = [3, 0xc, 0, 0xf, 0xf, 0xe, 0xe]
|
||||
@ -33,5 +34,3 @@ canTest.sendMessageOnCan(message)
|
||||
answer = canTest.recive();
|
||||
print(answer.data)
|
||||
"""
|
||||
|
||||
|
||||
|
||||
10
requirements.txt
Normal file
10
requirements.txt
Normal file
@ -0,0 +1,10 @@
|
||||
black==23.3.0
|
||||
click==8.1.3
|
||||
msgpack==1.0.5
|
||||
mypy-extensions==1.0.0
|
||||
packaging==23.0
|
||||
pathspec==0.11.1
|
||||
platformdirs==3.2.0
|
||||
python-can==4.1.0
|
||||
typing_extensions==4.5.0
|
||||
wrapt==1.15.0
|
||||
27
tempTest.py
27
tempTest.py
@ -3,12 +3,15 @@ import can
|
||||
import canTest
|
||||
import struct
|
||||
|
||||
bustype = 'socketcan'
|
||||
channel = 'can0'
|
||||
bustype = "socketcan"
|
||||
channel = "can0"
|
||||
|
||||
|
||||
def getTempOverCan():
|
||||
bus = can.Bus(channel=channel, interface=bustype)
|
||||
message = can.Message(arbitration_id=0x002, data=[3, 0, 0, 0, 0, 0, 0], is_extended_id=False)
|
||||
message = can.Message(
|
||||
arbitration_id=0x002, data=[3, 0, 0, 0, 0, 0, 0], is_extended_id=False
|
||||
)
|
||||
bus.send(message)
|
||||
voltages = []
|
||||
runtime = 2
|
||||
@ -19,9 +22,10 @@ def getTempOverCan():
|
||||
voltages.append(msg.data)
|
||||
print("begin {}".format(begin))
|
||||
begin = begin + 1
|
||||
if(begin > runtime):
|
||||
if begin > runtime:
|
||||
return voltages
|
||||
|
||||
|
||||
def verifyNumbers(numberListList):
|
||||
zeroCount = 0
|
||||
max = 0
|
||||
@ -29,7 +33,6 @@ def verifyNumbers(numberListList):
|
||||
for numberList in numberListList:
|
||||
print("Temperatures {}".format(numberList))
|
||||
for number in numberList:
|
||||
|
||||
if number < min:
|
||||
min = number
|
||||
|
||||
@ -39,7 +42,10 @@ def verifyNumbers(numberListList):
|
||||
print("Temperatures {}".format())
|
||||
print("The biggest difference tmep was max{} and min{}".format((max), (min)))
|
||||
|
||||
|
||||
def tempTest():
|
||||
while True:
|
||||
print("====================")
|
||||
print("starting temperature test ")
|
||||
canTest.waitForUserInput(True)
|
||||
Temps = getTempOverCan()
|
||||
@ -47,14 +53,15 @@ def tempTest():
|
||||
changed = []
|
||||
|
||||
for temp in Temps:
|
||||
allTemp.append(struct.unpack("<HHHH", temp)) #Format anpassen
|
||||
allTemp += struct.unpack("<HHHH", temp) # Format anpassen
|
||||
|
||||
print("all Number")
|
||||
print("Raw temperatures:")
|
||||
print(allTemp)
|
||||
|
||||
for te in allTemp:
|
||||
for con in te:
|
||||
changed.append(((con/16)*0.0625))
|
||||
changed.append(((te >> 4) * 0.0625))
|
||||
print("Interpreted temperatures:")
|
||||
print(changed)
|
||||
if canTest.waitForUserInput(True, "Repeat temperature test?") != 0:
|
||||
break
|
||||
print("I am done with this shit")
|
||||
|
||||
|
||||
@ -3,12 +3,15 @@ import can
|
||||
import canTest
|
||||
import struct
|
||||
|
||||
bustype = 'socketcan'
|
||||
channel = 'can0'
|
||||
bustype = "socketcan"
|
||||
channel = "can0"
|
||||
|
||||
|
||||
def getBatteryVoltageOverCan():
|
||||
bus = can.Bus(channel=channel, interface=bustype)
|
||||
message = can.Message(arbitration_id=0x002, data=[2, 0, 0, 0, 0, 0, 0], is_extended_id=False)
|
||||
message = can.Message(
|
||||
arbitration_id=0x002, data=[2, 0, 0, 0, 0, 0, 0], is_extended_id=False
|
||||
)
|
||||
bus.send(message)
|
||||
voltages = []
|
||||
runtime = 3
|
||||
@ -19,9 +22,10 @@ def getBatteryVoltageOverCan():
|
||||
voltages.append(msg.data)
|
||||
# print("begin {}".format(begin))
|
||||
begin = begin + 1
|
||||
if(begin > 4):
|
||||
if begin > 4:
|
||||
return voltages
|
||||
|
||||
|
||||
def verifyNumbers(numberListList):
|
||||
zeroCount = 0
|
||||
max = 0
|
||||
@ -38,10 +42,17 @@ def verifyNumbers(numberListList):
|
||||
max = number
|
||||
|
||||
print("There were 3 expected Zeros and {} detected".format(zeroCount))
|
||||
print("The biggest difference were over {} and under {}".format((max-35000), (35000-min)))
|
||||
print(
|
||||
"The biggest difference were over {} and under {}".format(
|
||||
(max - 35000), (35000 - min)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def voltagesTest():
|
||||
print("====================")
|
||||
print("starting voltage test pls but 3.500 as reference value")
|
||||
while True:
|
||||
canTest.waitForUserInput(True)
|
||||
voltages = getBatteryVoltageOverCan() # rausfinden welche id der Slave hat
|
||||
allVoltages = []
|
||||
@ -50,4 +61,5 @@ def voltagesTest():
|
||||
|
||||
verifyNumbers(allVoltages)
|
||||
print("I am done with this shit")
|
||||
|
||||
if canTest.waitForUserInput(True, "Repeat voltage test?") != 0:
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user