5
5
sys .exit ("multiprocessing was not available" )
6
6
import os , datetime , time , signal , sys
7
7
from pyres import ResQ
8
-
9
8
from pyres .exceptions import NoQueueError
10
9
from pyres .worker import Worker
11
- class Minion (multiprocessing .Process ):
12
- def __init__ (self , conn , queue ):
10
+ class Minion (multiprocessing .Process , Worker ):
11
+ def __init__ (self , conn , queues = [] ):
13
12
self .conn = conn
14
- self .q = queue
13
+ self .queues = queues
15
14
super (Minion ,self ).__init__ (name = 'Minion' )
16
15
17
- def run (self ):
16
+ def _check_message (self ):
17
+ if self .conn .poll ():
18
+ message = self .conn .recv ()
19
+ self .process_message (message )
20
+
21
+ def process_message (self , message ):
22
+ pass
23
+
24
+ def work (self , interval = 5 ):
25
+ self .startup ()
18
26
while True :
19
- job = self .q .get ()
20
- print 'pid: %s is running %s ' % (self .pid ,job )
27
+ self ._check_messages ()
28
+ if self ._shutdown :
29
+ print 'shutdown scheduled'
30
+ break
31
+ job = self .reserve ()
32
+ if job :
33
+ self .process (job )
34
+ else :
35
+ if interval == 0 :
36
+ break
37
+ time .sleep (interval )
38
+ self .unregister_worker ()
39
+
40
+ def run (self ):
41
+ self .work ()
42
+ #while True:
43
+ # job = self.q.get()
44
+ # print 'pid: %s is running %s ' % (self.pid,job)
21
45
22
- class Khan (object ):
46
+ class Khan (Worker ):
47
+ _command_map = {
48
+ 'ADD' : '_add_minion' ,
49
+ 'REMOVE' : '_remove_minion' ,
50
+ 'SHUTDOWN' : '_schedule_shutdown'
51
+ }
23
52
_workers = {}
24
- def __init__ (self , pool_size = 5 ):
53
+ def __init__ (self , pool_size = 5 , queue_list = [], server = 'localhost:6379' , password = None ):
25
54
self .pool_size = pool_size
55
+ self .resq = ResQ (server , password = password )
56
+ self ._workers = list ()
57
+
58
+ def _check_command (self ):
59
+ if not self ._shutdown :
60
+ command = self .resq .redis .pop ('resque:khan:%s' % self )
61
+ if command :
62
+ self .process_command (command )
63
+ self ._check_command ()
64
+
65
+ def process_command (self , command ):
66
+ #available commands, shutdown, add 1, remove 1
67
+ command = self ._command_map .get (command , None )
68
+ if command :
69
+ fn = getattr (self , command )
70
+ if fn :
71
+ fn ()
72
+
73
+ def _add_minion (self ):
74
+ parent_conn , child_conn = multiprocessing .Pipe ()
75
+ m = Minion (child_conn , q )
76
+ m .start ()
77
+ self ._workers .append ((parent_conn , m ))
78
+
79
+ def _remove_minion (self ):
80
+ self ._workers
26
81
def run ():
82
+ self .startup ()
27
83
q = multiprocessing .Queue ()
28
84
for i in range (pool_size ):
29
- parent_conn , child_conn = multiprocessing .Pipe ()
30
- m = Minion (child_conn , q )
31
- print m .pid
32
- m .start ()
33
- print m .pid
34
- self ._workers [m .pid ] = parent_conn
85
+ self ._add_minion ()
86
+ while True :
87
+ self ._check_commands ()
88
+ if self ._shutdown :
89
+ #send signals to each child
90
+ break
91
+ time .sleep (5 )
92
+ self .unregister_worker ()
93
+
35
94
95
+ #if __name__ == "__main__":
96
+ # k = Khan()
97
+ # k.run()
36
98
37
99
if __name__ == "__main__" :
38
- k = Khan ()
39
- k .run ()
100
+ from optparse import OptionParser
101
+ parser = OptionParser (usage = "%prog [options] queue list" )
102
+ parser .add_option ("-s" , dest = "server" , default = "localhost:6379" )
103
+ (options ,args ) = parser .parse_args ()
104
+ if len (args ) < 1 :
105
+ parser .print_help ()
106
+ parser .error ("Please give the horde at least one queue." )
107
+ khan = Khan (queue_list = args , server = options .dest )
108
+ khan .run ()
109
+ #Worker.run(queues, options.server)
0 commit comments