Package ckbot :: Module hitec
[hide private]
[frames] | no frames]

Source Code for Module ckbot.hitec

  1  """ 
  2  The CKBot.hitec python module provides classes used to communicate with CKBot robot modules connected the hitec HMI serial bus (rather than CAN) using a protocol similar to the Robotics Bus Protocol. These classes are primarily used to interface with the CKBot.logical python module found in the Modlab CKBot repository. For more information on 
  3  the low-level interface, please refer to the Hitec HMI Manual found at #J TODO. 
  4   
  5   
  6  Main uses of this module: 
  7  (*) control CKBot robot modules (specifically the servos) connected to a the hitec HMI serial bus 
  8  (*) mimics the behaviour of the CAN Bus, except using the hitec hitec HMI serial bus  as a communications channel 
  9   
 10  Example 1 - hitec robot module only:  
 11    #>>> import logical 
 12    #>>> nodes = [0x01, 0x06] 
 13    #>>> p = hitec.Protocol(nodes = nodes) 
 14    #>>> p.send_cmd([0x02,7,0]) 
 15   
 16  Example 2 - Integrate with CKBot.logical python module: 
 17    >>> import logical 
 18    >>> nodes = [0x01, 0x06, 0x03] 
 19    >>> p = hitec.Protocol(nodes = nodes) 
 20    >>> c = logical.Cluster(p) 
 21    >>> c.populate(3,{0x01:'head', 0x06:'mid', 0x03:'tail'}) 
 22    >>> c.at.head.set_pos(1000) 
 23   
 24  Both examples sets the position of the 0xD1 robot module, to 1000 (10 degrees from the  neutral position) 
 25  """ 
 26   
 27  from time import time, sleep 
 28  from sys import platform as SYS_PLATFORM, stdout 
 29  from struct import pack, unpack 
 30  from array import array 
 31  from serial import Serial 
 32  from subprocess import Popen, PIPE 
 33  from random import uniform 
 34   
 35  from ckmodule import Module, AbstractNodeAdaptor, AbstractProtocol, AbstractProtocolError, AbstractBusError, AbstractBus, AbstractServoModule, progress 
 36  from port2port import newConnection 
37 38 -def crop( val, lower, upper ):
39 return max(min(val,upper),lower)
40
41 -class HiTec( object ):
42 """Namespace containing useful constants and functions""" 43 CMD_READ_EEPROM = 0xE1 44 CMD_WRITE_EEPROM = 0xE2 45 CMD_READ_MEMORY = 0XE3 46 CMD_WRITE_MEMORY = 0XE4 47 CMD_READ_POSITION = 0XE5 48 CMD_SET_POSITION = 0XE6 49 CMD_GET_ID = 0XE7 50 CMD_GET_PULSEWITDTH_VOLTAGE = 0XE8 51 CMD_SET_SERVO_SPEED = 0XE9 52 CMD_SELECT_CNTRL_PARAMETER = 0XEA 53 CMD_GO_STOP = 0XEB 54 CMD_RELEASE = 0XEF 55 CMD_SPEED = 0XFF 56 ID_EADDR = 0x29 57 EEPROM_LEN = 0x2D 58 SYNC = 0x80 59 # Scale and offset for converting CKBot angles to and from hitec units 60 MAX_POS = 2450 61 MIN_POS = 550 62 MIN_ANG = -9000 63 MAX_ANG = 9000 64 SCL = float(MAX_POS - MIN_POS)/(MAX_ANG - MIN_ANG) 65 OFS = (MAX_POS - MIN_POS)/2 + MIN_POS 66 SYNC_CHR = pack('B',SYNC) 67 FRAME_SIZE = 7 68 69 @classmethod
70 - def ang2hitec(cls, ang ):
71 """Convert CKBot angle values to hitec units""" 72 return int(ang * cls.SCL + cls.OFS)
73 74 @classmethod
75 - def hitec2ang(cls, hitec ):
76 """Convert hitec units to CKBot angles""" 77 return int((hitec-cls.OFS) / cls.SCL)
78
79 -class BusError( AbstractBusError ):
80 - def __init__(self,msg=""):
81 AbstractBusError.__init__(self,"HITEC bus error : " + msg)
82
83 -class ProtocolError( AbstractProtocolError ):
84 - def __init__(self,msg=""):
85 AbstractProtocolError.__init__(self,"HITEC Bus error : "+msg)
86
87 -class Bus(AbstractBus):
88 """ 89 Concrete class that provides the functionality 90 needed to send messages to a Hitec servos 91 over their serial connection. 92 93 It is responsible for correctly formatting certain inputs 94 to hitec-understandable formats as documented in the Hitec 95 datasheet 96 """ 97 """ 98 Utilizes the protocol along with nid to create 99 an interface for a specific module 100 """
101 - def __init__(self, port = 'tty={ "baudrate":19200, "stopbits":2, "parity":"N", "timeout":0.5 }',*args,**kw):
102 """ 103 Initialize hitec bus 104 105 INPUT: 106 port -- serial port string, or None to autoconfig 107 108 ATTRIBUTES: 109 ser -- serial handle 110 """ 111 AbstractBus.__init__(self,*args,**kw) 112 self.ser = newConnection(port) 113 # Initialize receive buffer 114 self.rxbuf = ''
115 ###self.stats = dict(out=0,inp=0,sync=0,retry=0,frame=0,fail=0,empty=0) 116
117 - def close(self):
118 """ 119 Close communication with servo bus 120 """ 121 self.ser.close() 122 self.rxbuf = ''
123
124 - def reset(self):
125 """ 126 Reset communication with servo bus 127 ... note: this can be useful when serial fails after a power reset 128 """ 129 self.ser.close() 130 self.ser.open() 131 self.rxbuf = ''
132
133 - def read( self ):
134 """ read a single frame off the serial or returns None,None 135 136 Makes sure that the framing and sync byte are in place. 137 If a fragment is received -- stores it for future use, and 138 returns None,None 139 """ 140 b = self.rxbuf 141 SZ = HiTec.FRAME_SIZE 142 # Precondition: len(b) < SZ 143 while self.ser.inWaiting(): 144 b = b + self.ser.read( SZ - len(b) ) 145 # If not in sync --> skip a byte 146 if not b.startswith(HiTec.SYNC_CHR): 147 b = b[1:] 148 ###self.stats['frame'] += 1 149 continue 150 # If message was not completed --> failed 151 if len(b) < SZ: 152 self.rxbuf = b 153 ###self.stats['fail'] += 1 154 return None, None 155 assert len(b) == SZ 156 break 157 # Return the message 158 self.rxbuf = '' 159 if not b: 160 ###self.stats['empty'] += 1 161 return None, None 162 ###self.stats['inp'] += 1 163 return unpack('3B',b[1:4]), unpack('>1H',b[-2:])[0]
164
165 - def write(self, cmd, tx0, tx1 ):
166 """Formats a message, sends it and returns a copy of header""" 167 S = HiTec.SYNC 168 chk = (256 - (S + cmd + tx0 + tx1)) & 0xFF 169 msg = pack('7B',S,cmd,tx0,tx1,chk,0,0) 170 self.ser.write(msg) 171 ###self.stats['out'] += 1 172 return (cmd,tx0,tx1)
173
174 - def sync_write_read( self, pkt ):
175 """Synchronously send packet and read (same length) response""" 176 self.ser.flushInput() 177 self.ser.flushOutput() 178 N = len(pkt) 179 self.ser.write(pkt) 180 return self.ser.read(N)
181
182 - def send_cmd_sync( self, cmd, tx0, tx1, pause=0.01, retries=4 ):
183 """Send a command in synchronous form, waiting for reply 184 185 If reply times out -- retries until retry count runs out 186 """ 187 for k in xrange(retries): 188 hdr0 = self.write(cmd,tx0,tx1) 189 sleep(pause) 190 hdr1 = True 191 while hdr1: 192 hdr1, dat = self.read() 193 if hdr1 == hdr0: 194 ###self.stats['sync'] += 1 195 return dat 196 # fall through if no data in serial 197 ###self.stats['retry'] += 1 198 # fall through if no more retries 199 return None
200
201 -class EEPROM( object ):
202 """Concrete class representing the EEPROM in a servo""" 203
204 - def __init__(self, bus):
205 self.mem = None 206 self.bus = bus
207
208 - def readAll(self):
209 if self.mem is None: 210 self.mem = array('B',[0]*HiTec.EEPROM_LEN) 211 for addr in xrange(len(self.mem)): 212 val = self._bus_get(addr) 213 self.mem[addr] = val
214
215 - def _bus_put(self, addr, data):
216 """(private) 217 Write a value to servo EEPROM location. 218 219 WARNING: does not update checksum!!! 220 INPUT: 221 addr -- EEPROM address 222 data -- Desired EEPROM value 223 """ 224 self.bus.write( HiTec.CMD_WRITE_EEPROM ,addr, data )
225
226 - def _bus_get(self,addr, **kw):
227 """(private) 228 Read a value from servo EEPROM location. 229 230 WARNING: this is a synchronous operation that may take many 231 milliseconds to complete 232 233 INPUT: 234 addr -- EEPROM address 235 **kw -- passed on to Bus.send_cmd_sync 236 """ 237 res = self.bus.send_cmd_sync( 238 HiTec.CMD_READ_EEPROM, addr, 0, **kw ) 239 if res is None: 240 raise ProtocolError('Failed to read EEPROM address 0x%02x' % addr ) 241 if (res & 0xFF) != 0x03: 242 print "!!! > expected 0x03 in 2nd byte, got 0x%02x at 0x%02x" % (res & 0xFF,addr) 243 return (res >> 8) & 0xFF
244
245 - def __setitem__(self,addr,value):
246 # Make sure we have cached EEPROM 247 if self.mem is None: 248 self.readAll() 249 # perform operation 250 self.mem[addr] = value
251
252 - def __getitem__(self,addr):
253 # Make sure we have cached EEPROM 254 if self.mem is None: 255 self.readAll() 256 # perform operation 257 return self.mem[addr]
258
259 - def writeAll( self ):
260 """ 261 Write EEPROM out into servo, validating as we go 262 """ 263 # Make sure we have cached EEPROM 264 if self.mem is None: 265 self.readAll() 266 # Compute checksum 267 self.mem[-1] = 256 - (sum(self.mem[:-1]) & 0xFF) 268 # Write 269 for addr,val in enumerate(self.mem): 270 self._bus_put( addr, val ) 271 if self._bus_get(addr) != val: 272 raise BusError('Failed to write 0x%02x to EEPROM address 0x%02x' % (val, addr))
273
274 275 276 -class IncompleteMessage(object):
277 """ internal concrete class representing incomplete messages 278 """
279 - def __init__(self, nid, msg, ts=None, retries=0 ):
280 if ts is None: 281 ts = time() 282 self.nid = nid 283 self.msg = msg 284 self.ts = ts 285 self.retries = retries 286 self.promise = []
287
288 -class Protocol(AbstractProtocol):
289 """ 290 TBD 291 """
292 - def __init__(self, bus=None, nodes=None,*args,**kw):
293 """ 294 Initialize a hitec.Protocol 295 296 INPUT: 297 bus -- hitec.Bus -- Serial bus used to communicate with Hitec servos 298 nodes -- list -- list of hitec ids 299 300 ATTRIBUTES: 301 heartbeats -- dictionary -- nid : (timestamp, last message) 302 pnas -- dictionary -- table of NodeID to ProtocolNodeAdaptor mappings 303 """ 304 AbstractProtocol.__init__(self,*args,**kw) 305 if bus is None: 306 self.bus = Bus() 307 else: 308 self.bus = bus 309 310 self.heartbeats = {} # Gets populated by update 311 self._incm = [] 312 self.pnas = {} 313 self.ping_period = 3 314 if nodes is None: 315 progress("Scanning bus for nodes\n") 316 nodes = self.scan() 317 for nid in nodes: 318 self.generatePNA(nid) 319 progress("HiTec nodes: %s\n" % repr(list(nodes)))
320
321 - def scan( self ):
322 """Do a low-level scan for all nodes""" 323 # Build a "scan all" packet 324 def ping(nid): 325 msg = [HiTec.SYNC,HiTec.CMD_SET_SERVO_SPEED,nid,255,0,0,0] 326 msg[4] = 256 - (0xFF & sum(msg)) 327 return pack('7B',*msg)
328 pall = "".join([ ping(x) for x in xrange(128) ]) 329 330 # do the ping-all 331 while True: 332 res = self.bus.sync_write_read(pall) 333 if len(res) == len(pall): 334 break 335 progress('Ping packet response was truncated (got %d not %d)-- retrying' % (len(res),len(pall))) 336 # 337 found = set() 338 for k in xrange(128): 339 if ord(res[k*7+5]) or ord(res[k*7+6]): 340 found.add(k) 341 return found
342
343 - def hintNodes( self, nodes ):
344 n1 = set(nodes) 345 n0 = set(self.pnas.keys()) 346 for nid in n0-n1: 347 # corrupt the pna, just in case someone has access to it 348 self.pnas[nid].nid = "<<INVALID>>" 349 del self.pnas[nid] 350 for nid in n1-n0: 351 self.generatePNA(nid)
352
353 - def write( self, *args, **kw ):
354 """Write out a command --> redirected to Bus.write""" 355 return self.bus.write( *args, **kw )
356
357 - def read( self ):
358 """Read a response; redirected to Bus.read """ 359 return self.bus.read()
360
361 - def send_cmd_sync( self, *argv, **kw ):
362 return self.bus.send_cmd_sync( *argv, **kw )
363
364 - def request( self, cmd, tx0, tx1, nid=None, **kw ):
365 """Write out a command and record incomplete reply 366 367 return promise 368 """ 369 hdr = self.bus.write( cmd, tx0, tx1, **kw ) 370 inc = IncompleteMessage( nid, hdr ) 371 self._incm.append( inc ) 372 return inc.promise
373
374 - def _completeResponses(self, now, timeout = 0.05, retries = 4):
375 """(private) 376 Retries another request of incomplete message if timed out, and 377 completes any request for which a response arrived. 378 379 After several retries puts a ProtocolError in the result 380 381 INPUT: 382 now -- current time 383 timeout -- retry rate in seconds 384 retries -- number of retries 385 OUTPUT: 386 num_failed -- number of failed requests 387 """ 388 num_failed = 0 389 next = [] 390 while self._incm: 391 # Get the first incomplete msg 392 inc = self._incm.pop(0) 393 # If hasn't timed out --> keep 394 if now - inc.ts < timeout: 395 # find heartbeat; if not found -- put in the future 396 ts,dat = self.heartbeats.get(inc.nid,[now+1,None]) 397 # if response is newer than request --> fulfill promise 398 if now>ts: 399 inc.promise[:] = (dat,) 400 else: # otherwise --> leave request for later 401 next.append(inc) 402 continue 403 # If we are out of retry attempts --> emit error 404 if inc.retries >= retries: 405 inc.promise[:] = [ProtocolError("Timeout on node 0x%x" % inc.nid)] 406 num_failed += 1 407 continue 408 # assert: we should retry 409 self.bus.write( *inc.msg ) 410 inc.ts = now 411 inc.retries += 1 412 next.append(inc) 413 # Store new list of incomplete messages 414 self._incm = next 415 return num_failed
416
417 - def _pingAsNeeded( self, now ):
418 for nid,pna in self.pnas.iteritems(): 419 ts = self.heartbeats.get(nid,[0])[0] # we assume 0 is so far in past that timeout always occurs 420 if (now - ts) > self.ping_period * uniform(0.75,1.0): 421 pna.ping()
422
423 - def update(self):
424 """ 425 Complete any outstanding async requests and collect heartbeats 426 """ 427 now = time() 428 429 # Only process for a timeslice at the longest 430 timeslice = 0.05 431 while time() - now < timeslice: 432 msg, dat = self.read() 433 if msg is None: 434 break 435 # If was a set_speed --> may be a heartbeat / response 436 cmd,nid,spd = msg 437 if cmd == HiTec.CMD_SET_SERVO_SPEED and dat: 438 self.heartbeats[nid] = (now, dat) 439 440 # Clean / process any incomplete messages that can be completed 441 self._completeResponses(now) 442 self._pingAsNeeded(now) 443 return len(self._incm)
444
445 - def generatePNA(self, nid):
446 """ 447 Generates a pololu.ProtocolNodeAdaptor, associating a pololu protocol with 448 a specific node id and returns it 449 """ 450 pna = ProtocolNodeAdaptor(self, nid) 451 self.pnas[nid] = pna 452 return pna
453
454 -class ProtocolNodeAdaptor(AbstractNodeAdaptor):
455 """ 456 Utilizes the protocol along with nid to create 457 an interface for a specific module 458 """
459 - def __init__(self, protocol, nid = None):
460 AbstractNodeAdaptor.__init__(self) 461 self.p = protocol 462 self.nid = nid 463 self.eeprom = None 464 self._speed = 0xFF
465
466 - def set_pos(self, tgt): #J: Need to modify this
467 """ 468 Sends a position command 469 """ 470 self.p.write(self.nid 471 , (tgt >> 8) & 0xFF 472 , tgt & 0xFF 473 )
474
475 - def get_typecode( self ): # Change this!!! ... looks at nid and returns string accordingly
476 if self.nid < 64: 477 return "HitecServoModule" 478 else: 479 return "HitecMotorModule" 480
481 - def _onlyOne(self):
482 """(private) make sure there is only one servo on the bus 483 """ 484 # Check if more than on module is on the bus 485 if len(self.p.heartbeats) > 1: 486 raise BusError('Operation is not allowed if more than one module is on the Bus')
487
488 - def getID(self):
489 """ 490 Get current module ID ... note: only works with single module bus 491 492 OUTPUT: 493 ret -- module ID 494 """ 495 self._onlyOne() 496 # Get current version and id 497 vid = self.p.send_cmd_sync(HiTec.CMD_GET_ID, 0x00, 0x00) 498 ID = vid & 0xFF 499 if self.nid is not None and ID != self.nid: 500 raise BusError('getID returned ID 0x%02x, not 0x%02x' %(ID, self.nid)) 501 return ID
502
503 - def getEEPROM( self ):
504 self._onlyOne() 505 if self.eeprom is None: 506 self.eeprom = EEPROM( self.p.bus ) 507 self.eeprom.readAll() 508 return self.eeprom
509
510 - def setID(self, new_id):
511 """ 512 Sets module ID ... note: only works with single module bus 513 514 INPUT: 515 new_id -- desired servo ID 516 """ 517 new_id = int(new_id) & 0xFF 518 ep = self.getEEPROM() 519 ep[HiTec.ID_EADDR] = new_id 520 ep.writeAll() 521 self.nid = new_id
522
523 - def ping(self):
524 """ 525 Send query to servo without having protocol expect a reply 526 """ 527 self.p.write(HiTec.CMD_SET_SERVO_SPEED, self.nid, self._speed)
528
529 - def set_speed(self, speed):
530 """ 531 Set speed of for servo and get position 532 533 INPUT: 534 speed -- set speed 535 536 """ 537 self._speed = speed 538 self.p.write(HiTec.CMD_SET_SERVO_SPEED, self.nid, speed)
539
540 - def get_speed(self):
541 """ 542 Get speed value for servo ... this just returns the last speed set, 543 if that has not been set, it returns the default value: 0xFF 544 545 OUTPUT: 546 ret -- servo speed between [0 - 255] 547 """ 548 return self._speed
549
550 - def get_sync(self, tic=0.01):
551 """ 552 Synchronous wrapper on top of get_async 553 554 Returns the get_async result 555 """ 556 promise = self.get_async() 557 while not promise: 558 sleep(tic) 559 self.p.update() 560 return self.async_parse(promise)
561 562 @classmethod
563 - def async_parse(cls,promise,barf=True):
564 """ 565 Parse the result of an asynchronous get. 566 567 When barf is True, this may raise a deferred exception if the 568 operation timed out or did not complete. Otherwise the exception 569 object will be returned. 570 """ 571 if not promise: 572 raise ProtocolError("Asynchronous operation did not complete") 573 dat = promise[0] 574 if barf and isinstance(dat,Exception): 575 raise dat 576 return dat
577
578 - def get_async(self):
579 """ 580 Send out an asynchronous get request 581 582 OUTPUT: 583 ret -- promise that will hold result after update() 584 """ 585 return self.p.request( HiTec.CMD_SET_SERVO_SPEED 586 , self.nid 587 , self._speed 588 , nid=self.nid 589 )
590
591 - def go(self):
592 """ 593 Set servo to go state 594 ... if module is in stop state it will immediately move to 595 position commanded 596 """ 597 self.p.write( HiTec.CMD_GO_STOP, 0, 1 )
598
599 - def stop(self):
600 """ 601 Set servo to stop state 602 ... will freeze module motion until go state set or until power reset 603 """ 604 self.p.write( HiTec.CMD_GO_STOP, 0, 0 )
605
606 - def release(self):
607 """ 608 Release servo 609 ... will turn on PID and make servo go slack 610 """ 611 self.p.write(HiTec.CMD_RELEASE, 0x00, 0x00)
612
613 ## Safety/range values goes in here 614 615 -class HitecModule( Module ):
616 - def __init__(self, node_id, typecode, pna, *argv, **kwarg):
617 Module.__init__(self, node_id, typecode, pna) 618 self._attr.update( 619 go_slack="1R", 620 is_slack="1R" 621 ) 622 self.slack = False 623 self.eeprom = None
624
625 - def getEEPROM( self ):
626 """ 627 Access the module's EEPROM. 628 629 WARNING: can only be used when there is a SINGLE servo on bus 630 631 WARNING: data is only written when the .write() method is called 632 on the .eeprom object. Without that, changes will be lost 633 """ 634 self.eeprom = self.pna.getEEPROM()
635
636 - def is_slack(self):
637 """ 638 Returns true if the module is slack, none if go_slack has not been called yet. 639 """ 640 return self.slack
641
642 - def go_slack(self):
643 """ 644 Sets *ALL* servos on the Bus slack 645 """ 646 # Send the command via the PNA 647 self.pna.release() 648 # Module should now be slack 649 self.slack = True
650
651 -class ServoModule( AbstractServoModule, HitecModule ):
652 """ 653 TBD 654 """
655 - def __init__(self, node_id, typecode, pna, *argv, **kwarg):
656 AbstractServoModule.__init__(self, node_id, typecode, pna) 657 HitecModule.__init__(self, node_id, typecode, pna) 658 self._attr.update( 659 set_pos="2W", 660 set_pos_RAW="2W", 661 set_speed="2W", 662 get_speed="1R", 663 get_pos="1R", 664 ) 665 self.zero_ofs = 0.0 666 self.gain = 1.0
667
668 - def set_pos(self, pos):
669 """ 670 Set module position 671 """ 672 self.set_pos_RAW(crop(HiTec.ang2hitec(self.zero_ofs+self.gain*pos),HiTec.MIN_POS,HiTec.MAX_POS))
673
674 - def set_pos_RAW(self, pos):
675 """ 676 Set module position in native (HiTec) units 677 678 WARNING: no checks! 679 """ 680 # Send Position Command 681 self.slack = False 682 self.pna.set_pos(pos)
683
684 - def get_pos(self):
685 """ 686 Get module position 687 """ 688 return HiTec.hitec2ang(self.pna.get_sync())
689
690 - def set_speed(self, speed):
691 """ 692 Set module speed 693 """ 694 self.pna.set_speed(speed)
695
696 - def get_speed(self):
697 """ 698 Return latest speed set to module, else return default speed 699 """ 700 return self.pna.get_speed()
701
702 -class MotorModule( HitecModule ):
703 """ 704 TBD 705 """ 706 SPEED_LOWER = -450 707 SPEED_UPPER = 450 708 RPM_CONVERSION = 1 709
710 - def __init__(self, node_id, typecode, pna, *argv, **kwarg):
711 HitecModule.__init__(self, node_id, typecode, pna) 712 self._attr.update( 713 set_speed="2W", 714 get_speed="1R", 715 set_speed_UNSAFE="2W" 716 ) 717 self._speed = 0; 718 self.zero_pos = None 719 self.promise = None
720
721 - def set_speed(self,val):
722 """ 723 Sets speed of the module, with safety checks. 724 725 INPUT: 726 val -- units in between SPEED_LOWER and SPEED_UPPER 727 """ 728 self.set_speed_UNSAFE(crop(val,self.SPEED_LOWER,self.SPEED_UPPER))
729
730 - def _ensure_zero_pos(self):
731 """(private) 732 733 Asynchronously request the zero position from the servo, 734 and parse the response if it appeared. 735 736 """ 737 # If no pot position was ever requested --> make async request 738 if self.promise is None: 739 self.promise = self.pna.get_async() 740 return # without setting a zero_pos 741 742 # If async request for pot position has not completed --> warn 743 if not self.promise: 744 progress("WARNING: 0x%02x get zero_pos has not completed\n" % self.node_id ) 745 return # without setting a zero_pos 746 # Parse response 747 pos = self.pna.async_parse(self.promise, False) 748 # If failed --> retry 749 if isinstance(pos,Exception): 750 progress("WARNING: 0x%02x get zero_pos failed. Retrying...\n" % self.node_id ) 751 self.promise = self.pna.get_async() 752 return # without setting a zero_pos 753 # 754 self.zero_pos = pos
755
756 - def set_speed_UNSAFE(self, val):
757 """ 758 Sets speed of the module, without any validity checking 759 760 INPUT: 761 val -- units in RPM 762 763 Do not use values outside the range SPEED_LOWER to SPEED_UPPER, 764 otherwise the command may be ignored 765 """ 766 # If motor is uncalibrated --> try to calibrate pot zero value 767 if self.zero_pos is None: 768 self._ensure_zero_pos() 769 # If calibration isn't done --> skip this call, doing nothing 770 if self.zero_pos is None: 771 return 772 ofs = int(self.RPM_CONVERSION*val) 773 self.pna.set_pos(self.zero_pos + ofs)
774