#
# pyjControl.py
from subprocess import call
from pulsectl import Pulse
import sys, time
import pyjulius
import Queue
class MyWords():
# the julius client part - listen for words & put them in list
def __init__( self ): pass
def getWord(self):
# Initialize
self.wordList = {}
# make a wordlist for testing
self.wordList = {'pulse': 'down','clem': 'up,pause'}
x = []
try:
x = str(self.wordList['clem']).split(",")
self.wordList['clem'] = x
x = []
except: pass
try:
x = str(self.wordList['pulse']).split(",")
self.wordList['pulse'] = x
except: pass
def getWords(self):
# Initialize
self.wordList = {}
# Initialize and try to connect
flag0 = True
# open the client
client = pyjulius.Client('localhost', 10500)
try:
client.connect()
except pyjulius.ConnectionError: print( 'Start julius as module first! ') ; sys.exit(1)
# Start listening to the server
client.start()
try:
while flag0 == True:
try:
result = client.results.get(False)
except Queue.Empty: continue
if isinstance(result, pyjulius.Sentence):
# isinstance - what a cool word
#print( 'Sentence "%s" recognized with score %.2f' % (words, result.score))
# result.score can be used to limit results - future work maybe
words = str(result)
words = words.split(" ")
if words[0] == "go":
flag0 = False
else:
# this try is to see if the command is first time or repeat
try:
q = self.wordList[words[0]]
self.wordList[words[0]] = self.wordList[words[0]] + "," + words[1]
except: self.wordList[words[0]] = words[1]
except KeyboardInterrupt: pass
#print('Exiting...')
client.stop() # send the stop signal
client.join() # wait for the thread to die
client.disconnect() # disconnect from julius
x = []
# fix wordlist - from sloppy coding above
try:
x = str(self.wordList['clem']).split(",")
self.wordList['clem'] = x
except: pass
x = []
try:
x = str(self.wordList['pulse']).split(",")
self.wordList['pulse'] = x
except: pass
# end - we should have words in wordlist
class MyClem():
# dbus control of clementine music player
def __init__( self ):
self.level = 0.4
import dbus
self.dict = {}
self.wordList = {}
self.player = dbus.SessionBus().get_object('org.mpris.MediaPlayer2.clementine', '/org/mpris/MediaPlayer2')
self.interface = dbus.Interface(self.player, dbus_interface='org.mpris.MediaPlayer2.Player')
self.property_interface = dbus.Interface(self.player, dbus_interface='org.freedesktop.DBus.Properties')
def getSonginfo(self):
self.metadata = self.player.Get('org.mpris.MediaPlayer2.Player', 'Metadata',
dbus_interface='org.freedesktop.DBus.Properties')
# now we can get a list of metadata
self.dict['title'] = ('%s'% (self.metadata['xesam:title']) if 'xesam:title' in self.metadata else 'Unknown')
self.dict['artist'] = ('%s' % (self.metadata['xesam:artist'][0]))
try:
self.dict['trackid'] = "%s"%self.metadata['mpris:trackid']
except: pass
try:
self.dict['album'] = ('%s'% (self.metadata['xesam:album']) if 'xesam:album' in self.metadata else 'Unknown')
except: pass
try:
self.dict['rating'] = self.metadata['rating']
except: pass
try:
self.dict['useCount'] = self.metadata['xesam:useCount']
except: pass
try:
self.dict['trackNumber'] = self.metadata['xesam:trackNumber']
except: pass
try:
self.dict['year'] = self.metadata['year']
except: pass
try:
self.dict['discNumber'] = self.metadata['xesam:discNumber']
except: pass
try:
self.dict['genre'] = self.metadata['xesam:genre'][0]
except: pass
try:
self.dict['lastUsed'] = self.metadata['xesam:lastUsed']
except: pass
try:
self.dict['length'] = self.metadata['mpris:length']
except: pass
try:
self.dict['artUrl'] = self.metadata['mpris:artUrl']
except: pass
try:
self.dict['autoRating'] = self.metadata['xesam:autoRating']
except: pass
try:
self.dict['contentCreated'] = self.metadata['xesam:contentCreated']
except: pass
try:
self.dict['userRating'] = self.metadata['xesam:userRating']
except: pass
try:
self.dict['bitrate'] = self.metadata['bitrate']
except: pass
try:
self.dict['url'] = self.metadata['xesam:url']
except: pass
def getCvol(self):
pass
volume = self.player.Get('org.mpris.MediaPlayer2.Player', 'Volume',
dbus_interface='org.freedesktop.DBus.Properties')
return ('%s' % (volume*100))
def setCvol(self):
pass
self.property_interface.Set('org.mpris.MediaPlayer2.Player', 'Volume', self.level)
return
def setSong(self,x):
# plays song by index from playlist
q = "-k%d"%(int(x))
call(['/usr/bin/clementine',q])
return
def printSong(self):
t = self.dict['trackid']
w = t.split("/")
print('title : %s' % (self.dict['title']))
print('artist : %s' % (self.dict['artist']))
print('album : %s' % (self.dict['album']))
print('location : %s' % (self.dict['url']))
print('trackid : %s' % (w[5]))
return
def printVol(self):
v = self.getCvol()
print('volume : %s' % (v))
return
def returns(self):
print("dict\nand return the following keys")
for key, value in self.dict.iteritems() :
print("dict : %s" % (key))
pass
class MyPulse():
# PulseAduio volume setting
def __init__( self ): pass
def volume_up(self):
with Pulse('volume-increaser') as pulse:
for sink in pulse.sink_list():
pulse.volume_change_all_chans(sink, 0.1)
def volume_down(self):
with Pulse('volume-increaser') as pulse:
for sink in pulse.sink_list():
pulse.volume_change_all_chans(sink, -0.1)
if __name__ == '__main__':
print("Say Go when done with commands")
MyW = MyWords()
MyL = MyPulse()
MyC = MyClem()
# set volume to 60% use 0.6
MyC.level = 0.6
MyC.setCvol()
# main can loop if client cooperates
# MyW.getWords() # getwords uses julius
MyW.getWord() # getword uses dummy words
print(MyW.wordList)
try:
for u in MyW.wordList['pulse']:
print(u)
#Next(), Previous(), Play(), Stop(), Pause() or PlayPause()
if u == 'louder':
MyL.volume_up()
if u == 'down':
MyL.volume_down()
except: pass
try:
for u in MyW.wordList['clem']:
print(u)
#Next(), Previous(), Play(), Stop(), Pause()
if u == 'song':
MyC.printSong()
if u == 'previous':
MyC.player.Previous()
if u == 'next':
MyC.player.Next()
if u == 'pause':
MyC.player.Pause()
if u == 'play':
MyC.player.Play()
if u == 'louder':
MyC.level += 0.1
if MyC.level >= 1:
MyC.level = 1.0
MyC.setCvol()
if u == 'down':
MyC.level -= 0.1
if MyC.level <= 0:
MyC.level = 0.0
MyC.setCvol()
except: pass
time.sleep(3)
"""
a bash script to keep julius running
#!/bin/bash
while true
do
julius -input mic -C /home/al/src/pyAls/Sample.jconf -module 10500
done
*******************************************************************
# make a wordlist for testing
self.wordList = {}
def getWords(self):
# for testing
self.wordList = {'pulse': 'down','clem': 'up,pause'}
x = []
try:
x = str(self.wordList['clem']).split(",")
self.wordList['clem'] = x
x = []
except: pass
try:
x = str(self.wordList['pulse']).split(",")
self.wordList['pulse'] = x
except: pass
"""
No comments:
Post a Comment