-
Notifications
You must be signed in to change notification settings - Fork 84
Expand file tree
/
Copy pathjob.rb
More file actions
191 lines (163 loc) · 6.81 KB
/
job.rb
File metadata and controls
191 lines (163 loc) · 6.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
module CloudCrowd
# A chunk of work that will be farmed out into many WorkUnits to be processed
# in parallel by each active CloudCrowd::Worker. Jobs are defined by a list
# of inputs (usually public urls to files), an action (the name of a script that
# CloudCrowd knows how to run), and, eventually a corresponding list of output.
class Job < ActiveRecord::Base
include ModelStatus
CLEANUP_GRACE_PERIOD = 7 # That's a week.
has_many :work_units, :dependent => :destroy
validates_presence_of :status, :inputs, :action, :options
before_validation :set_initial_status, :on => :create
after_create :queue_for_workers
before_destroy :cleanup_assets
# Jobs that were last updated more than N days ago.
scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} }
# Create a Job from an incoming JSON request, and add it to the queue.
def self.create_from_request(h)
self.create(
:inputs => h['inputs'].to_json,
:action => h['action'],
:options => (h['options'] || {}).to_json,
:email => h['email'],
:callback_url => h['callback_url']
)
end
# Clean up all jobs beyond a certain age.
def self.cleanup_all(opts = {})
days = opts[:days] || CLEANUP_GRACE_PERIOD
self.complete.older_than(days).find_in_batches(:batch_size => 100) do |jobs|
jobs.each {|job| job.destroy }
end
end
# After work units are marked successful, we check to see if all of them have
# finished, if so, continue on to the next phase of the job.
def check_for_completion
return unless all_work_units_complete?
set_next_status
outs = gather_outputs_from_work_units
return queue_for_workers([outs]) if merging?
if complete?
update_attributes(:outputs => outs, :time => time_taken)
puts "Job ##{id} (#{action}) #{display_status}." unless ENV['RACK_ENV'] == 'test'
Thread.new { fire_callback } if callback_url
end
self
end
# Transition this Job's current status to the appropriate next one, based
# on the state of the WorkUnits and the nature of the Action.
def set_next_status
update_attribute(:status,
any_work_units_failed? ? FAILED :
self.splitting? ? PROCESSING :
self.mergeable? ? MERGING :
SUCCEEDED
)
end
# If a <tt>callback_url</tt> is defined, post the Job's JSON to it upon
# completion. The <tt>callback_url</tt> may include HTTP basic authentication,
# if you like:
# http://user:[email protected]/job_complete
# If the callback URL returns a '201 Created' HTTP status code, CloudCrowd
# will assume that the resource has been successfully created, and the Job
# will be cleaned up.
def fire_callback
begin
response = RestClient.post(callback_url, {:job => self.to_json})
Thread.new { self.destroy } if response && response.code == 201
rescue RestClient::Exception => e
puts "Job ##{id} (#{action}) failed to fire callback: #{callback_url}"
end
end
# Cleaning up after a job will remove all of its files from S3 or the
# filesystem. Destroying a Job will cleanup_assets first. Run this in a
# separate thread to get out of the transaction's way.
# TODO: Convert this into a 'cleanup' work unit that gets run by a worker.
def cleanup_assets
AssetStore.new.cleanup(self)
end
# Have all of the WorkUnits finished?
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
# Have any of the WorkUnits failed?
def any_work_units_failed?
self.work_units.failed.count > 0
end
# This job is splittable if its Action has a +split+ method.
def splittable?
self.action_class.public_instance_methods.map {|m| m.to_sym }.include? :split
end
# This job is done splitting if it's finished with its splitting work units.
def done_splitting?
splittable? && work_units.splitting.count <= 0
end
# This job is mergeable if its Action has a +merge+ method.
def mergeable?
self.processing? && self.action_class.public_instance_methods.map {|m| m.to_sym }.include?(:merge)
end
# Retrieve the class for this Job's Action.
def action_class
@action_class ||= CloudCrowd.actions[self.action]
return @action_class if @action_class
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
# How complete is this Job?
# Unfortunately, with the current processing sequence, the percent_complete
# can pull a fast one and go backwards. This happens when there's a single
# large input that takes a long time to split, and when it finally does it
# creates a whole swarm of work units. This seems unavoidable.
def percent_complete
return 99 if merging?
return 100 if complete?
unit_count = work_units.count
return 100 if unit_count <= 0
(work_units.complete.count / unit_count.to_f * 100).round
end
# How long has this Job taken?
def time_taken
return self.time if self.time
Time.now - self.created_at
end
# Generate a stable 8-bit Hex color code, based on the Job's id.
def color
@color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end
# A JSON representation of this job includes the statuses of its component
# WorkUnits, as well as any completed outputs.
def to_json(opts={})
atts = {
'id' => id,
'color' => color,
'status' => display_status,
'percent_complete' => percent_complete,
'work_units' => work_units.count,
'time_taken' => time_taken
}
atts['outputs'] = JSON.parse(outputs) if outputs
atts['email'] = email if email
atts.to_json
end
private
# When the WorkUnits are all finished, gather all their outputs together
# before removing them from the database entirely. Returns their merged JSON.
def gather_outputs_from_work_units
units = self.work_units.complete
outs = self.work_units.complete.map {|u| u.parsed_output }
self.work_units.complete.destroy_all
outs.to_json
end
# When starting a new job, or moving to a new stage, split up the inputs
# into WorkUnits, and queue them. Workers will start picking them up right
# away.
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
input.each {|i| WorkUnit.start(self, action, i, status) }
self
end
# A Job starts out either splitting or processing, depending on its action.
def set_initial_status
self.status = self.splittable? ? SPLITTING : PROCESSING
end
end
end