AWS::S3 is not threadsafe. Hell, it’s not even reusable; most methods go through a class constant. To use it in threaded code, it’s necessary to isolate S3 operations in memory. Fork to the rescue!
def s3(key, data, bucket, opts)
begin
fork_to do
AWS::S3::Base.establish_connection!(
:access_key_id => KEY,
:secret_access_key => SECRET
)
AWS::S3::S3Object.store key, data, bucket, opts
end
rescue Timeout::Error
raise SubprocessTimedOut
end
end
def fork_to(timeout = 4)
r, w, pid = nil, nil, nil
begin
# Open pipe
r, w = IO.pipe
# Start subprocess
pid = fork do
# Child
begin
r.close
val = begin
Timeout.timeout(timeout) do
# Run block
yield
end
rescue Exception => e
e
end
w.write Marshal.dump val
w.close
ensure
# YOU SHALL NOT PASS
# Skip at_exit handlers.
exit!
end
end
# Parent
w.close
Timeout.timeout(timeout) do
# Read value from pipe
begin
val = Marshal.load r.read
rescue ArgumentError => e
# Marshal data too short
# Subprocess likely exited without writing.
raise Timeout::Error
end
# Return or raise value from subprocess.
case val
when Exception
raise val
else
return val
end
end
ensure
if pid
Process.kill "TERM", pid rescue nil
Process.kill "KILL", pid rescue nil
Process.waitpid pid rescue nil
end
r.close rescue nil
w.close rescue nil
end
end
There’s a lot of bookkeeping here. In a nutshell we’re forking and running a given block in a forked subprocess. The result of that operation is returned to the parent by a pipe. The rest is just timeouts and process accounting. Subprocesses have a tendency to get tied up, leaving dangling pipes or zombies floating around. I know there are weak points and race conditions here, but with robust retry code this approach is suitable for production.
Using this approach, I can typically keep ~8 S3 uploads running concurrently (on a fairly busy 6-core HT Nehalem) and obtain ~sixfold throughput compared to locking S3 operations with a mutex.
Post a Comment