Package ckbot :: Module posable
[hide private]
[frames] | no frames]

Source Code for Module ckbot.posable

  1  """File posable.py 
  2   
  3  Provides basic functionality for "posable programming", i.e. programming a robot by recording its poses and then playing them back. 
  4   
  5  Main Classes and Functions 
  6  ========================== 
  7   
  8  class PoseRecoder -- provides most of the functionality, allowin poses to be recorded and played back. 
  9   
 10  function recordFromCluster -- an interactive interface on top of a PoseRecorder, allowing users to easily record poses. 
 11  """ 
 12   
 13  from time import time as now, sleep 
 14  from sys import stdout as STDOUT 
 15  from numpy import asarray 
 16  from cmd import Cmd 
 17  from re import split as re_split 
 18  from scipy.interpolate import interp1d 
 19   
20 -class PoseRecorder( object ):
21 """Concrete class PoseRecorder 22 23 Provides a convenient interface for 'poseable programming' by allowing 24 positions of servos to be recorded and played back. 25 26 Convenience methods for saving and loading .csv files are also provided 27 28 For typical usage, see the source code for recordFromCluster() 29 """
30 - def __init__(self,mods):
31 """Initialize the PoseRecorder to record a list of modules 32 33 INPUT: 34 mods -- sequence of modules with .get_pos(), .set_pos() and .go_slack() methods 35 """ 36 # for m in mods: 37 # assert isinstance(m,Module),"%s must be a module" % str(m) 38 self.servos = mods 39 self.record_time = None 40 self.reset() 41 self.off()
42
43 - def getPose(self):
44 """collect positions of all servos""" 45 return [ m.get_pos() for m in self.servos ]
46
47 - def _set_pose(self,pose):
48 """(private) set positions of all servos""" 49 for m,v in zip(self.servos, pose): 50 m.set_pos(v)
51
52 - def off(self):
53 """Emegency Stop for all servos -- .go_slack()-s all modules, ignoring any exceptions""" 54 for m in self.servos: 55 try: 56 m.go_slack() 57 except Exception, ex: 58 print "Exception in %s.go_slack():" % str(m), str(ex)
59
60 - def reset( self ):
61 """Reset the recording, allowing a new recording to be started""" 62 self.plan = [] 63 self.record_time = None
64
65 - def snap( self, t = None ):
66 """Add a snapshot of the current pose to the recording""" 67 if not self.plan: 68 t = 0 69 if t is None: 70 t = self.plan[-1][0]+1 71 self.plan.append( [t]+self.getPose())
72
73 - def snapClosed( self, dt = 1 ):
74 """Add the first pose at the end of the recording, with specified delay. 75 This allows loops to be cleanly repeated. 76 77 INPUT: 78 dt -- float -- delay in seconds between last and first pose 79 """ 80 if not self.plan: 81 raise IndexError("No recoding found; call .snap() at least once") 82 self.plan.append( [self.plan[-1][0]+dt]+self.plan[0][1:] )
83
84 - def show( self, stream=STDOUT, delim=" ", fmt="%5d", hfmt="%5s", rdel="\n", slc=slice(None)):
85 """Write the current recording out on a stream in a text-based format 86 87 INPUT: 88 stream -- output stream object. Must support .write() 89 delim -- str -- delimiter user between columns 90 fmt -- str -- format string for numbers (except time) 91 hfmt -- str -- format string for column headings 92 rdel -- str -- row delimiter 93 slc -- slice -- range of pose sequence to show 94 """ 95 stream.write( delim.join( 96 ["%5s" % "t"] 97 +[hfmt % m.name for m in self.servos[slc]] 98 ) + rdel ) 99 for pose in self.plan[slc]: 100 stream.write( delim.join( 101 ["%5d" % pose[0]] 102 +[fmt % v for v in pose[1:]] 103 ) + rdel )
104
105 - def unsnap(self):
106 """Drop the last pose from the recording""" 107 if self.plan: 108 self.plan.pop(-1)
109
110 - def appendFrom( self, stream, cdel=",", dt=1 ):
111 """append a recording from a textual representation 112 113 INPUT: 114 stream -- list -- a recording with one line per list entry 115 -- file -- a file containing a recording 116 cdel -- str -- column delimiter 117 dt -- float -- gap in time between last entry and newly loaded data 118 119 Caveats: 120 (1) columns must match the list of servos for which the 121 PoseRecorder was configured. 122 (2) the first (header) row of the file is ignored 123 124 Example: load froma literal multiline string 125 >>> pr.appendFrom(''' 126 0, 1000, 0, 1000 127 1, 0, 1000, 0 128 '''.split("\n")) 129 """ 130 if type(stream) is not list: 131 stream = stream.readlines() 132 t0 = 0 if not self.plan else (dt+self.plan[-1][0]) 133 for nl,line in enumerate(stream[1:]): 134 line = line.strip() 135 if not line or line.startswith("#"): 136 continue 137 val = [float(v.strip()) for v in line.split(cdel)] 138 if len(val) != len(self.servos)+1: 139 raise ValueError("%d: Found %d values instead of %d" 140 % (nl+1,len(val)-1,len(self.servos))) 141 val[0] += t0 142 self.plan.append(val)
143
144 - def loadCSV( self, stream ):
145 """Syntactic sugar for loading a .csv file using .appendFrom()""" 146 self.reset() 147 self.appendFrom(stream)
148
149 - def saveCSV( self, stream=STDOUT ):
150 """Syntactic sugar for using .show() to write a .csv formatted recording 151 152 INPUT: 153 stream -- a file object or a file path. The '.csv' will be added if missing 154 155 Typical usage: 156 pr.saveCSV('motion.csv') 157 """ 158 if type(stream) is str: 159 if not stream.endswith(".csv"): 160 stream = stream + ".csv" 161 stream = open(stream, "w") 162 self.show(stream, delim=", ", fmt="%d", hfmt='"%s"') 163 if stream is not STDOUT: 164 stream.close()
165
166 - def playback( self, period=None, count=1, rate = 0.03 ):
167 """Play back the current recording one or more times. 168 169 INPUT: 170 period -- float / None -- duration for entire recording 171 if period is None, the recording timestamps are used 172 count -- integer -- number of times to play 173 rate -- float -- delay between commands sent (sec) 174 """ 175 # playback current pose for a given amount of time and with a given period 176 if not self.plan: 177 raise ValueError("No recording -- .snap() poses first!") 178 return 179 gait = asarray(self.plan,int) 180 gaitfun = interp1d( gait[:,0], gait[:,1:].T ) # Gait interpolater function 181 dur = gait[-1,0]-gait[0,0] 182 if period is None: 183 period = self.plan[-1][0] - self.plan[0][0] 184 t0 = now() 185 t1 = t0 186 try: 187 while t1-t0 < period*count: 188 t1 = now() 189 phi = (t1-t0)/period 190 phi %= 1.0 191 goal = gaitfun(phi*dur).round() 192 print "\rphi: %.2f: " % phi, " ".join([ 193 "%6d" % g for g in goal 194 ]) 195 self._set_pose( goal ) 196 sleep(rate) 197 except KeyboardInterrupt: 198 self.off() 199 raise
200
201 -class PoseRecorderCLI( Cmd ):
202 """Concrete class PoseRecorderCLI provides a cmd.Cmd based commandline 203 interface to a PoseRecorder. 204 205 Start one and give the 'help' command to get more information 206 """
207 - def __init__(self,pr,clust=None):
208 """ 209 INPUTS: 210 pr -- PoseRecorder -- PoseRecorder to control 211 clust -- Cluster / None -- Cluster; to allow the general 212 command interface 213 """ 214 Cmd.__init__(self) 215 self.prompt = "PoseRecorder >> " 216 assert isinstance(pr,PoseRecorder) 217 self.pr = pr 218 self.pr_duration = None 219 self.pr_count = 1 220 self.clust = clust 221 if clust is None: 222 self.tgt = [] 223 else: 224 self.tgt = clust.values()
225
226 - def run(self):
227 self.cmdloop()
228
229 - def emptyline(self):
230 Cmd.emptyline(self) 231 self.do_show()
232
233 - def do_target(self,line):
234 """Specify target for commands 235 236 Target may be either * or a space-separate list of module names 237 """ 238 if self.clust is None: 239 print "No Cluster specified; 'target' command is not available" 240 return 241 line = line.strip() 242 if line == "*": 243 self.tgt = self.clust.values() 244 else: 245 try: 246 self.tgt = [getattr(self.clust.at,nm) for nm in line.split(" ")] 247 except AttributeError,ae: 248 print "Unknown module:",ae 249 return
250
251 - def do_cmd(self,line):
252 """Specify command to broadcast to targets""" 253 if self.clust is None: 254 print "No Cluster specified; 'cmd' command is not available" 255 return 256 sp = re_split("\s+",line,1) 257 if len(sp)>1: 258 cmd,val = sp[0],int(sp[1]) 259 else: 260 cmd,val = sp[0],None 261 # 262 res = [] 263 for m in self.tgt: 264 try: 265 if val is None: 266 res.append(getattr(m,cmd)()) 267 else: 268 res.append(getattr(m,cmd)(val)) 269 except AttributeError,ae: 270 print "Module",m.name,"does not support '%s'" % cmd 271 res.append(None) 272 # 273 print " ".join([ 274 "%6s" % m.name for m in self.clust.itervalues()]) 275 print " ".join([ 276 "%6s" % str(v) for v in res])
277
278 - def do_show(self,line=None):
279 """Show the current recording in text form""" 280 print "# duration ",self.pr_duration," count ",self.pr_count 281 self.pr.show()
282
283 - def do_pose(self,line=None):
284 """Append the current pose to the recording""" 285 self.pr.snap()
286
287 - def do_reset(self,line=None):
288 """Reset the recording""" 289 self.pr.reset()
290
291 - def do_off(self,line=None):
292 """Emergency stop the recording -- go_slack() all modules""" 293 self.pr.off()
294
295 - def do_closeLoop(self,line):
296 """closeLoop <dt> -- Close a loop in the recording, with delay <dt> 297 """ 298 try: 299 f = float(line) 300 except ValueError: 301 f = 1 302 self.pr.snapClosed(f)
303
304 - def do_drop(self,line):
305 """Drop the last pose from the recording""" 306 self.pr.unsnap()
307
308 - def do_save(self,line):
309 """save<file> Save recording to a .csv file""" 310 try: 311 self.pr.saveCSV(line) 312 except IOError,ioe: 313 print ioe
314
315 - def do_load(self,line):
316 """Append recording from a .csv file 317 load <file> 318 """ 319 try: 320 stream = open(line,"r").readlines() 321 except IOError,ioe: 322 print ioe 323 return 324 self.pr.appendFrom(stream)
325
326 - def do_count(self,line):
327 """Set number of times to loop""" 328 try: 329 self.pr_count = int(line) 330 except ValueError,ve: 331 print ve
332
333 - def do_duration(self,line):
334 """Set duration of one playback period (empty for automatic)""" 335 if not line: 336 self.pr_duration = None 337 else: 338 try: 339 self.pr_duration = float(line) 340 except ValueError, ve: 341 print ve
342
343 - def do_run(self,line=None):
344 """Run the recording. Set duration and cycle count with 'duration' and 'count' commands""" 345 try: 346 self.pr.playback(self.pr_duration,self.pr_count) 347 except KeyboardInterrupt: 348 self.pr.off()
349
350 - def do_inline(self,line=None):
351 "Show inline code for recording" 352 STDOUT.write(".appendFrom('''") 353 self.pr.show(delim=",",rdel="; ") 354 STDOUT.write("'''.split(';'))\n")
355
356 - def do_EOF(self,line=None):
357 return self.do_quit()
358
359 - def do_quit(self,line=None):
360 "quit" 361 return True
362
363 - def do_exit(self,line=None):
364 "quit" 365 return True
366
367 -def recordFromCluster( c ):
368 """Interactively prompt user and record poses from all modules of a cluster 369 INPUT: 370 c -- Cluster to record from 371 OUTPUT: PoseRecorder with the recording 372 """ 373 p = PoseRecorder(c.values()) 374 while True: 375 p.reset() 376 while True: 377 if raw_input("<Enter> to store <q><Enter> when done: ") is 'q': 378 break 379 p.snap() 380 p.show() 381 if raw_input("Have successfuly recorded? (y/n): ") is 'y': 382 break 383 return p
384