@@ -135,3 +135,50 @@ def reserve(cls, queues, res, worker=None, timeout=10):
135
135
queue , payload = res .pop (queues , timeout = timeout )
136
136
if payload :
137
137
return cls (queue , payload , res , worker )
138
+
139
+ class TransitionJob (Job ):
140
+ def perform (self ):
141
+ func = None
142
+ #check for callable
143
+ if 'callable' in self ._payload :
144
+ callable_string = self ._payload ['callable' ]
145
+ func = safe_str_to_class (callable_string )
146
+ #check for class.perform
147
+ else :
148
+ klass_string = self ._payload ['class' ]
149
+ klass = safe_str_to_class (klass_string )
150
+ klass .resq = self .resq
151
+ if callable (klass ):
152
+ func = klass
153
+ before_perform = None
154
+ else :
155
+ before_perform = getattr (klass , "before_perform" , None )
156
+ func = getattr (klass , 'perform' )
157
+ args = self ._payload .get ('args' )
158
+
159
+ metadata = dict (args = args )
160
+ if self .enqueue_timestamp :
161
+ metadata ['enqueue_timestamp' ] = self .enqueue_timestamp
162
+
163
+ metadata ["failed" ] = False
164
+ metadata ['perform_timestamp' ] = time .time ()
165
+ check_after = True
166
+ try :
167
+ if before_perform :
168
+ before_perform (klass , metadata )
169
+ return func (* args )
170
+ except :
171
+ check_after = False
172
+ metadata ["failed" ] = True
173
+ if not self .retry (klass , args ):
174
+ metadata ["retried" ] = False
175
+ raise
176
+ else :
177
+ metadata ["retried" ] = True
178
+
179
+ finally :
180
+ after_perform = getattr (klass , "after_perform" , None )
181
+ if after_perform :
182
+ after_perform (klass , metadata )
183
+ delattr (klass , 'resq' )
184
+
0 commit comments