diff --git a/lib/yt/actions/upload.rb b/lib/yt/actions/upload.rb new file mode 100644 index 00000000..76350000 --- /dev/null +++ b/lib/yt/actions/upload.rb @@ -0,0 +1,48 @@ +require 'net/http' +require 'yt/actions/base' + +module Yt + module Actions + module Upload + include Base + + private + + def do_upload(extra_upload_params = {}) + params = upload_params.merge(extra_upload_params) + uri = params[:uri] + http = params[:http] || new_upload_http(uri) + + req = Net::HTTP::Put.new(uri.request_uri) + params.fetch(:headers, {}).each { |k, v| req[k] = v } + req['Authorization'] = "Bearer #{params[:token]}" + + body = params[:body] + if body.nil? + # no body (e.g. status check) + elsif body.respond_to?(:read) + req.body_stream = body + req['Transfer-Encoding'] = 'chunked' + else + req.body = body + end + + response = http.request(req) + block_given? ? yield(response) : response + end + + def upload_params + {} + end + + def new_upload_http(uri) + Net::HTTP.new(uri.host, uri.port).tap do |http| + http.use_ssl = uri.scheme == 'https' + http.open_timeout = 30 + http.read_timeout = 300 + http.start + end + end + end + end +end diff --git a/lib/yt/collections/resumable_upload_sessions.rb b/lib/yt/collections/resumable_upload_sessions.rb new file mode 100644 index 00000000..7a53fdba --- /dev/null +++ b/lib/yt/collections/resumable_upload_sessions.rb @@ -0,0 +1,82 @@ +require 'net/http' +require 'uri' +require 'yt/collections/base' +require 'yt/models/resumable_upload_session' + +module Yt + module Collections + class ResumableUploadSessions < Base + + def insert(body = {}, options = {}) + @remote_url_auth = options[:remote_url_auth] + content_length = resolve_file_size(options) + + @insert_options = options.merge(file_size: content_length) + @headers = headers_for content_length + + @remote_auth = options[:remote_auth] + do_insert body: body, headers: @headers + end + + private + + def attributes_for_new_item(data) + @insert_options.slice(:file_path, :remote_url, :file_size).tap do |attributes| + attributes[:url] = data['Location'] + attributes[:content_type] = @parent.upload_content_type + attributes[:chunk_size] = @insert_options.fetch(:chunk_size, 0) + attributes[:max_retries] = @insert_options.fetch(:max_retries, 10) + attributes[:auth] = @auth + attributes[:remote_url_auth] = @remote_url_auth + attributes[:remote_auth] = @remote_auth + attributes[:remote_url_refresh] = @insert_options[:remote_url_refresh] + end + end + + def insert_params + super.tap do |params| + params[:response_format] = nil + params[:path] = @parent.resumable_upload_path + options = @insert_options.slice(:on_behalf_of_content_owner_channel) + params[:params] = @parent.resumable_upload_params(options).merge uploadType: 'resumable' + end + end + + def resolve_file_size(options) + return options[:file_size] if options[:file_size] + + if options[:file_path] + File.size(options[:file_path]) + elsif options[:remote_url] + uri = URI.parse(options[:remote_url]) + + Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| + request = Net::HTTP::Head.new(uri) + token = remote_url_auth_token + request['Authorization'] = "Bearer #{token}" if token.present? + response = http.request(request) + raise "Cannot determine remote file size: HTTP #{response.code}" unless response.is_a?(Net::HTTPSuccess) + response['Content-Length'].to_i + end + end + end + + def headers_for(content_length) + {}.tap do |headers| + headers['X-Upload-Content-Length'] = content_length + headers['X-Upload-Content-Type'] = @parent.upload_content_type + end + end + + # The result is not in the body but in the headers + def extract_data_from(response) + response.header + end + + def remote_url_auth_token + return @remote_url_auth.call if @remote_url_auth + @auth.access_token + end + end + end +end diff --git a/lib/yt/models/account.rb b/lib/yt/models/account.rb index 322d6582..6149ef26 100644 --- a/lib/yt/models/account.rb +++ b/lib/yt/models/account.rb @@ -77,7 +77,7 @@ def upload_video(path_or_url, params = {}) file = URI.open(path_or_url) session = resumable_sessions.insert file.size, upload_body(params) - session.update(body: file) do |data| + session.upload(body: file) do |data| Yt::Video.new( id: data['id'], snippet: data['snippet'], @@ -87,6 +87,25 @@ def upload_video(path_or_url, params = {}) end end + # Uploads a video using the resumable upload protocol with chunked + # uploads. Returns a {ResumableUploadSession} that is already + # initiated and ready for +next_chunk+. + # + # @param path_or_url [String] local path or remote URL to the video file. + # @param params [Hash] video metadata and upload options. + # @option params [String] :title The video's title. + # @option params [String] :description The video's description. + # @option params [Array] :tags The video's tags. + # @option params [Integer] :category_id The video's category ID. + # @option params [String] :privacy_status The video's privacy status. + # @option params [Boolean] :self_declared_made_for_kids The video's made for kids self-declaration. + # @option params [Integer] :chunk_size Bytes per chunk (0 = whole file). + # @option params [Integer] :max_retries Max retries per chunk (default: 10). + # @return [Yt::Models::ResumableUploadSession] initiated session ready for next_chunk. + def resumable_upload_video(path_or_url, params = {}) + resumable_upload_sessions.insert upload_body(params), upload_options(path_or_url, params) + end + # Creates a playlist in the account’s channel. # @return [Yt::Models::Playlist] the newly created playlist. # @param [Hash] params the attributes of the playlist. @@ -164,16 +183,22 @@ def create_playlist(params = {}) # the account’s channel. has_many :subscribers + # @!attribute [r] video_groups + # @return [Yt::Collections::VideoGroups] the video-groups created by the + # account. + has_many :video_groups + # @!attribute [r] resumable_sessions # @private # @return [Yt::Collections::ResumableSessions] the sessions used to # upload videos using the resumable upload protocol. has_many :resumable_sessions - # @!attribute [r] video_groups - # @return [Yt::Collections::VideoGroups] the video-groups created by the - # account. - has_many :video_groups + # @!attribute [r] resumable_upload_sessions + # @private + # @return [Yt::Collections::ResumableUploadSessions] the sessions used to + # upload videos using the resumable upload protocol. + has_many :resumable_upload_sessions ### PRIVATE API ### @@ -211,6 +236,7 @@ def playlist_items_params def upload_path '/upload/youtube/v3/videos' end + # @private # Tells `has_many :resumable_sessions` what params are set for the object # associated to the uploaded file. @@ -218,6 +244,21 @@ def upload_params {part: 'snippet,status'} end + # @private + # Tells `has_many :resumable_upload_sessions` what path to hit to upload + # a file. Separate from `upload_path` so ContentOwner can override + # `upload_path` for references without affecting chunked video uploads. + def resumable_upload_path + '/upload/youtube/v3/videos' + end + + # @private + # Tells `has_many :resumable_upload_sessions` what params are set for the + # object associated to the uploaded file. + def resumable_upload_params(_options = {}) + {part: 'snippet,status'} + end + # @private # Tells `has_many :resumable_sessions` what metadata to set in the object # associated to the uploaded file. @@ -236,6 +277,24 @@ def upload_body(params = {}) end end + # @private + # Tells `has_many :resumable_upload_sessions` how to read the file — + # locally from disk or by ranged GETs against a remote URL. + def upload_options(path_or_url, params = {}) + remote_url_auth = params.delete(:remote_url_auth) + remote_auth = params.delete(:remote_auth) + + params.slice(:file_size, :chunk_size, :on_behalf_of_content_owner_channel).tap do |options| + if path_or_url.match?(%r{\Ahttps?://}) + options[:remote_url] = path_or_url + options[:remote_url_auth] = remote_url_auth if remote_url_auth + options[:remote_auth] = remote_auth if remote_auth + else + options[:file_path] = path_or_url + end + end + end + # @private # Tells `has_many :resumable_sessions` what type of file can be uploaded. def upload_content_type diff --git a/lib/yt/models/authentication.rb b/lib/yt/models/authentication.rb index 18023097..724126a8 100644 --- a/lib/yt/models/authentication.rb +++ b/lib/yt/models/authentication.rb @@ -79,7 +79,7 @@ def expiration_date(options = {}) if options['expires_in'] Time.now + options['expires_in'].seconds else - Time.parse options['expires_at'] rescue nil + Time.parse options['expires_at'].to_s rescue nil end end end diff --git a/lib/yt/models/content_owner.rb b/lib/yt/models/content_owner.rb index 8fa50f0a..c5076db7 100644 --- a/lib/yt/models/content_owner.rb +++ b/lib/yt/models/content_owner.rb @@ -61,7 +61,7 @@ def upload_reference_file(path_or_url, params = {}) file = URI.open(path_or_url) session = resumable_sessions.insert file.size, params - session.update(body: file) do |data| + session.upload(body: file) do |data| Yt::Reference.new id: data['id'], data: data, auth: self end end @@ -100,6 +100,18 @@ def upload_params {part: 'snippet,status', on_behalf_of_content_owner: self.owner_name} end + # @private + # YouTube requires `onBehalfOfContentOwnerChannel` alongside + # `onBehalfOfContentOwner` on `videos.insert`; the caller passes it + # through `:on_behalf_of_content_owner_channel`. + def resumable_upload_params(options = {}) + params = {part: 'snippet,status', on_behalf_of_content_owner: owner_name} + if (channel = options[:on_behalf_of_content_owner_channel]) + params[:on_behalf_of_content_owner_channel] = channel + end + params + end + # @private # Tells `has_many :video_groups` that content_owner.video_groups should # return all the video-groups *on behalf of* the content owner diff --git a/lib/yt/models/resumable_session.rb b/lib/yt/models/resumable_session.rb index ba189245..35967a40 100644 --- a/lib/yt/models/resumable_session.rb +++ b/lib/yt/models/resumable_session.rb @@ -1,4 +1,6 @@ +require 'json' require 'yt/models/base' +require 'yt/actions/upload' module Yt module Models @@ -6,6 +8,8 @@ module Models # Provides methods to upload videos with the resumable upload protocol. # @see https://developers.google.com/youtube/v3/guides/using_resumable_upload_protocol class ResumableSession < Base + include Actions::Upload + # Sets up a resumable session using the URI returned by YouTube def initialize(options = {}) @uri = URI.parse options[:url] @@ -13,8 +17,11 @@ def initialize(options = {}) @headers = options[:headers] end - def update(params = {}) - do_update(params) {|data| yield data} + def upload(params = {}) + body = params[:body] + do_upload headers: upload_headers(body), body: body do |response| + yield JSON.parse(response.body) + end end # Uploads a thumbnail using the current resumable session @@ -23,29 +30,21 @@ def update(params = {}) # @return the new thumbnail resource for the given image. # @see https://developers.google.com/youtube/v3/docs/thumbnails#resource def upload_thumbnail(file) - do_update(body: file) {|data| data['items'].first} + do_upload headers: upload_headers(file), body: file do |response| + data = JSON.parse(response.body) + data['items'].first + end end - private + private - def session_params - URI.decode_www_form(@uri.query || "").to_h + def upload_params + { uri: @uri, token: @auth.access_token } end - # @note: YouTube documentation states that a valid upload returns an HTTP - # code of 201 Created -- however it looks like the actual code is 200. - # To be sure to include both cases, HTTPSuccess is used - def update_params - super.tap do |params| - params[:request_format] = :file - params[:host] = @uri.host - params[:path] = @uri.path - params[:expected_response] = Net::HTTPSuccess - params[:headers] = @headers - params[:camelize_params] = false - params[:params] = session_params - end + def upload_headers(body) + @headers.merge('Content-Length' => body.size.to_s) end end end -end \ No newline at end of file +end diff --git a/lib/yt/models/resumable_upload_session.rb b/lib/yt/models/resumable_upload_session.rb new file mode 100644 index 00000000..4113552c --- /dev/null +++ b/lib/yt/models/resumable_upload_session.rb @@ -0,0 +1,346 @@ +# frozen_string_literal: true + +require 'net/http' +require 'uri' +require 'json' +require 'yt/models/base' +require 'yt/actions/upload' + +module Yt + module Models + # @private + # Provides methods to upload videos with the resumable upload protocol + # using chunked PUTs with Content-Range headers. + # + # Unlike {ResumableSession} which sends the whole file in a single PUT, + # this class uploads in 256 KB-aligned chunks and supports resuming + # after interruptions. + # + # @see https://developers.google.com/youtube/v3/guides/using_resumable_upload_protocol + # + # @example Drive the upload chunk by chunk. + # session = account.resumable_upload_video('video.mp4', + # title: 'My Video', + # chunk_size: 10 * 1024 * 1024 + # ) + # loop do + # bytes_uploaded, video = session.next_chunk + # break video if video + # puts "#{bytes_uploaded}/#{session.file_size} bytes" + # end + # + # @example Upload from a remote URL with progress reporting. + # session = account.resumable_upload_video(drive_url, + # remote_auth: -> { account.access_token }, + # remote_url_auth: -> { user.access_token }, + # title: 'My Video', + # chunk_size: 10 * 1024 * 1024 + # ) + # video = session.perform do |bytes_uploaded, file_size| + # # report progress + # end + class ResumableUploadSession < Base + include Actions::Upload + CHUNK_ALIGNMENT = 256 * 1024 + + attr_reader :uri, :file_size, :bytes_uploaded + + def initialize(options = {}) + @uri = options[:url] ? URI.parse(options[:url]) : nil + @auth = options[:auth] + @file_path = options[:file_path] + @remote_url = options[:remote_url] + @remote_url_auth = options[:remote_url_auth] + @remote_auth = options[:remote_auth] + @remote_url_refresh = options[:remote_url_refresh] + @content_type = options.fetch(:content_type, 'video/*') + @chunk_size = align_chunk_size(options.fetch(:chunk_size, 0)) + @max_retries = options.fetch(:max_retries, 10) + + @file_size = options[:file_size] + + @bytes_uploaded = 0 + @file_handle = nil + @remote_http = nil + @upload_http = nil + @complete = false + end + + # Uploads the next chunk of the file to the session URI. + # + # Returns a two-element array: + # - +[bytes_uploaded, nil]+ when the upload is still in progress + # - +[nil, Yt::Video]+ when the upload is complete + # + # @return [Array(Integer, nil), Array(nil, Yt::Video)] + # @raise [Yt::Errors::RequestError] on permanent failure or expired session + def next_chunk + raise "No session URI — was initiation successful?" unless @uri + raise "Upload already complete" if @complete + + offset = @bytes_uploaded + chunk_end = [offset + effective_chunk_size - 1, @file_size - 1].min + length = chunk_end - offset + 1 + + chunk_data = if @remote_url + read_remote_chunk_with_retries(offset, chunk_end) + else + ensure_file_open + @file_handle.seek(offset) + @file_handle.read(length) + end + + unless chunk_data && chunk_data.bytesize == length + raise "Failed to read #{length} bytes at offset #{offset}" + end + + response = with_retries do + do_upload headers: upload_headers(length, offset, chunk_end), body: chunk_data + end + + handle_chunk_response(response, chunk_end) + end + + # Queries the server for how many bytes have been received. + # Useful after an interruption to find the resume point. + # + # @return [Integer] number of bytes the server has + # @raise [Yt::Errors::RequestError] if session expired or request fails + def check_status + raise "No session URI" unless @uri + + response = with_retries do + do_upload headers: upload_headers(0) + end + + case response.code.to_i + when 200, 201 + @file_size + when 308 + parse_range_header(response) || 0 + when 404 + raise Yt::Errors::RequestError, "Session URI expired (404)" + else + raise Yt::Errors::RequestError, "Status check failed: HTTP #{response.code}" + end + end + + # Uploads all remaining chunks and returns the completed video. + # + # @return [Yt::Video] the uploaded video + def perform + loop do + bytes_uploaded, video = next_chunk + return video if video + yield bytes_uploaded, @file_size if block_given? + end + end + + def complete? + @complete + end + + private + + def upload_params + { uri: @uri, token: remote_auth_token, http: ensure_upload_http } + end + + def upload_headers(length, offset = nil, chunk_end = nil) + { + 'Content-Length' => length.to_s, + 'Content-Range' => offset ? "bytes #{offset}-#{chunk_end}/#{@file_size}" : "bytes */#{@file_size}", + }.tap do |headers| + headers['Content-Type'] = @content_type if offset + end + end + + def handle_chunk_response(response, chunk_end) + code = response.code.to_i + + case code + when 200, 201 + @complete = true + @bytes_uploaded = @file_size + release_resources + + data = JSON.parse(response.body) + video = Yt::Video.new( + id: data['id'], + snippet: data['snippet'], + status: data['status'], + auth: @auth, + ) + [nil, video] + when 308 + @bytes_uploaded = parse_range_header(response) || (chunk_end + 1) + @uri = URI.parse(response['Location']) if response['Location'] + + [@bytes_uploaded, nil] + when 404 + release_resources + raise Yt::Errors::RequestError, "Session URI expired (404). Start a new upload." + else + release_resources + detail = begin + parsed = JSON.parse(response.body) + err = parsed.dig('error', 'errors', 0) || {} + "#{err['reason']}: #{err['message']}" + rescue + response.body.to_s[0, 300] + end + raise Yt::Errors::RequestError, "Upload failed: HTTP #{code} — #{detail}" + end + end + + # "Range: bytes=0-999999" → 1000000 + def parse_range_header(response) + range = response['Range'] + return nil unless range + match = range.match(/bytes=(\d+)-(\d+)/) + return nil unless match + match[2].to_i + 1 + end + + def with_retries + retries = 0 + loop do + begin + response = yield + code = response.code.to_i + return response unless retriable_http_codes.include?(code) + + retries += 1 + raise Yt::Errors::ServerError, "Max retries exceeded (HTTP #{code})" if retries > @max_retries + + wait = retry_delay(retries, response['Retry-After']&.to_i) + sleep(wait) + rescue *server_errors => e + retries += 1 + raise Yt::Errors::ServerError, "Max retries exceeded: #{e.class}" if retries > @max_retries + + wait = retry_delay(retries) + sleep(wait) + end + end + end + + def retry_delay(attempt, server_retry_after = nil) + return server_retry_after if server_retry_after && server_retry_after > 0 + max_delay = [64.0, 1.0 * (2**attempt)].min + rand * max_delay + end + + def read_remote_chunk_with_retries(offset, chunk_end) + attempts = 0 + last_success = true + loop do + if @remote_url_refresh + new_url = @remote_url_refresh.call(last_success) + if new_url && new_url != @remote_url + @remote_url = new_url + @remote_http = finish_http(@remote_http) + end + end + + begin + return read_remote_chunk(offset, chunk_end) + rescue => e + last_success = false + attempts += 1 + raise if attempts > @max_retries + sleep retry_delay(attempts) + end + end + end + + def read_remote_chunk(offset, chunk_end) + uri = URI.parse(@remote_url) + request = Net::HTTP::Get.new(uri) + token = remote_url_auth_token + request['Authorization'] = "Bearer #{token}" if token.present? + request['Range'] = "bytes=#{offset}-#{chunk_end}" + response = ensure_remote_http.request(request) + unless response.is_a?(Net::HTTPSuccess) || response.is_a?(Net::HTTPPartialContent) + raise "Remote read failed: HTTP #{response.code}" + end + response.body + end + + def ensure_file_open + @file_handle ||= File.open(@file_path, 'rb') + end + + def ensure_remote_http + return @remote_http if @remote_http&.started? + uri = URI.parse(@remote_url) + @remote_http = start_http(uri.host, uri.port, use_ssl: uri.scheme == 'https') + end + + def ensure_upload_http + return @upload_http if @upload_http&.started? + @upload_http = start_http(@uri.host, @uri.port, use_ssl: true) + end + + def release_resources + @file_handle&.close + @file_handle = nil + @remote_http = finish_http(@remote_http) + @upload_http = finish_http(@upload_http) + end + + def start_http(host, port, use_ssl:) + http = Net::HTTP.new(host, port) + http.use_ssl = use_ssl + http.open_timeout = 30 + http.read_timeout = 300 + http.keep_alive_timeout = 120 + http.start + http + end + + def finish_http(http) + http.finish if http&.started? + nil + rescue + nil + end + + def effective_chunk_size + chunked? ? @chunk_size : @file_size + end + + def chunked? + @chunk_size > 0 + end + + def align_chunk_size(size) + return 0 if size.nil? || size <= 0 + aligned = ((size + CHUNK_ALIGNMENT - 1) / CHUNK_ALIGNMENT) * CHUNK_ALIGNMENT + [aligned, CHUNK_ALIGNMENT].max + end + + def retriable_http_codes + [500, 502, 503, 504] + end + + def server_errors + [ + OpenSSL::SSL::SSLError, + Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ETIMEDOUT, + Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::EPIPE, + Net::OpenTimeout, Net::ReadTimeout, IOError, SocketError, + ] + end + + def remote_auth_token + @remote_auth&.call || @auth.access_token + end + + def remote_url_auth_token + return @remote_url_auth.call if @remote_url_auth + @remote_auth&.call || @auth.access_token + end + end + end +end diff --git a/lib/yt/models/video.rb b/lib/yt/models/video.rb index 4a381562..d3f133fb 100644 --- a/lib/yt/models/video.rb +++ b/lib/yt/models/video.rb @@ -553,7 +553,7 @@ def upload_thumbnail(path_or_url) file = URI.open(path_or_url) session = resumable_sessions.insert file.size - session.update(body: file) do |data| + session.upload(body: file) do |data| snippet.instance_variable_set :@thumbnails, data['items'].first end end