Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup downloads on timescaledb #4979

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
3 changes: 1 addition & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ gem "phlex-rails", "~> 1.2"
gem "discard", "~> 1.4"
gem "user_agent_parser", "~> 2.18"
gem "pghero", "~> 3.6"
gem "timescaledb", "~> 0.3.0"
gem "faraday-multipart", "~> 1.0"
gem "timescaledb", "~> 0.3"
gem "sigstore", "~> 0.1.1"

# Admin dashboard
gem "avo", "~> 3.13"
Expand Down
5 changes: 4 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,10 @@ GEM
faraday-follow_redirects
tailwindcss-rails (3.0.0)
railties (>= 7.0.0)
tailwindcss-ruby
terser (1.2.3)
execjs (>= 0.3.0, < 3)
tilt (2.3.0)
tailwindcss-ruby
tailwindcss-ruby (3.4.14)
tailwindcss-ruby (3.4.14-aarch64-linux)
tailwindcss-ruby (3.4.14-arm64-darwin)
Expand Down
15 changes: 15 additions & 0 deletions app/avo/resources/log_download_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class LogDownloadResource < Avo::BaseResource
self.title = :id
self.includes = []

class BackendFilter < ScopeBooleanFilter; end
filter BackendFilter, arguments: {default: LogDownload.backends.transform_values { true } }
class StatusFilter < ScopeBooleanFilter; end
filter StatusFilter, arguments: {default: LogDownload.statuses.transform_values { true } }

field :id, as: :id
field :key, as: :text
field :directory, as: :text
field :backend, as: :select, enum: LogDownload.backends
field :status, as: :select, enum: LogDownload.statuses
end
4 changes: 4 additions & 0 deletions app/controllers/avo/log_downloads_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/2.0/controllers.html
class Avo::LogDownloadsController < Avo::ResourcesController
end
99 changes: 99 additions & 0 deletions app/jobs/fastly_log_downloads_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
require "zlib"

# Process log files downloaded from Fastly and insert row by row into the database.
# It works in a similar way to FastlyLogProcessor, but it's optimized for a different
# use case: it processes log files downloaded from Fastly and inserts the raw data into
# the database in batches.
# The counters and other metrics are calculated in a separate job directly in
# the database through the continuous aggregates.
# Check Download::PerMinute, Download::PerHour and other classes as an example.
class FastlyLogDownloadsProcessor
class LogFileNotFoundError < ::StandardError; end

extend StatsD::Instrument

BATCH_SIZE = 5000

attr_accessor :bucket, :key
attr_reader :processed_count

def initialize(bucket, key)
@bucket = bucket
@key = key
@processed_count = 0
end

def perform
StatsD.increment("fastly_log_downloads_processor.started")
raise LogFileNotFoundError if body.nil?

count = 0
parse_success_downloads.each_slice(BATCH_SIZE) do |batch|
Download.insert_all batch
count += batch.size
end

if count > 0
element.update(status: "processed", processed_count: count)
else
element.update(status: "failed")
end

# This value may diverge from numbers from the fastly_log_processor as totals are
# not aggregated with the number of downloads but each row represents a download.
StatsD.gauge("fastly_log_downloads_processor.processed_count", count)
count
end

def body
@body ||= element&.body
end

def element
@element ||= LogDownload.pop(directory: @bucket, key: @key)
end

def parse_success_downloads
body.each_line.map do |log_line|
fragments = log_line.split
path, response_code = fragments[10, 2]
case response_code.to_i
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when 200, 304
m = path.match(PATH_PATTERN)
gem_name = m[:gem_name] || path
gem_version = m[:gem_version]
created_at = Time.parse fragments[4..9].join(' ')
env = parse_env fragments[12..-1]
payload = {env:}

{created_at:, gem_name:, gem_version:, payload:}
end
end.compact
end


# Parse the env into a hash of key value pairs
# example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0"
# output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"}
# case it says single word like jruby it appends true as the value
# example env = "jruby"
# output = {jruby: "true"}
# also removes some unwanted characters
def parse_env(output)
env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip
env = nil if env == "(null)"
env = env.split(' ').map do |info|
pair = info.split(/\/|-/,2)
pair << "true" if pair.size == 1
pair
end.to_h
end

statsd_count_success :perform, "fastly_log_downloads_processor.perform"
statsd_measure :perform, "fastly_log_downloads_processor.job_performance"

PATH_PATTERN = /\/gems\/(?<gem_name>.*)-(?<gem_version>\d+.*)\.gem/
private_constant :PATH_PATTERN
end
21 changes: 21 additions & 0 deletions app/jobs/fastly_log_downloads_processor_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Same as the FastlyLogProcessorJob but for saving it to TimescaleDB
# and the Download table as flat downloads.
class FastlyLogDownloadsProcessorJob < ApplicationJob
queue_as :default
queue_with_priority PRIORITIES.fetch(:stats)

include GoodJob::ActiveJobExtensions::Concurrency
good_job_control_concurrency_with(
# Maximum number of jobs with the concurrency key to be
# concurrently performed (excludes enqueued jobs)
#
# Limited to avoid overloading the gem_download table with
# too many concurrent conflicting updates
perform_limit: good_job_concurrency_perform_limit(default: 5),
key: name
)

def perform(bucket:, key:)
FastlyLogDownloadsProcessor.new(bucket, key).perform
end
end
20 changes: 20 additions & 0 deletions app/models/download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class Download < DownloadsDB
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: 'created_at', segment_by: [:gem_name, :gem_version]

scope :total_downloads, -> { select("count(*) as downloads").order(:created_at) }
scope :downloads_by_gem, -> { select("gem_name, count(*) as downloads").group(:gem_name).order(:created_at) }
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as downloads").group(:gem_name, :gem_version).order(:created_at) }

continuous_aggregates(
timeframes: [:minute, :hour, :day, :month],
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})
end
5 changes: 5 additions & 0 deletions app/models/downloads_db.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class DownloadsDB < ApplicationRecord
self.abstract_class = true

connects_to database: { writing: :downloads }
end
33 changes: 33 additions & 0 deletions app/models/log_download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Mimic LogTicket model to store the log files but for the downloads database.
# It will be backfilled with the log files from the main database to the downloads database.
# There will be a background job to process the log files
class LogDownload < DownloadsDB
enum backend: { s3: 0, local: 1 }
enum status: %i[pending processing failed processed].index_with(&:to_s)

scope :latest_created_at, -> { order(created_at: :desc).select(:created_at).pluck(:created_at).first }

def self.pop(key: nil, directory: nil)
scope = pending.limit(1).order("created_at ASC")
scope = scope.where(key: key) if key
scope = scope.where(directory: directory) if directory
scope.lock(true).sole.tap do |log_download|
log_download.update_column(:status, "processing")
end
rescue ActiveRecord::RecordNotFound
nil # no ticket in queue found by `sole` call
end

def fs
@fs ||=
if s3?
RubygemFs::S3.new(bucket: directory)
else
RubygemFs::Local.new(directory)
end
end

def body
fs.get(key)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module Maintenance
# This task is used to backfill log downloads from log tickets.
# It is used to migrate from past to present using created_at date to order
# limit 500 per iteration and use latest created_at date to get the next 500
# later union with pending tickets.
class BackfillLogDownloadsFromLogTicketsTask < MaintenanceTasks::Task
def collection
# migrate from past to present using created_at date to order
# limit 500 per iteration and use latest created_at date to get the next 500
# later union with pending tickets
scope = LogTicket.processed.order(created_at: :asc)
last_created_at = LogDownload.latest_created_at
scope = scope.where("created_at < ?", last_created_at) if last_created_at
scope
.limit(500)
.union(LogTicket.pending.order(created_at: :asc).limit(500))
end

def process(batch)
LogDownload.insert_all(batch.select(:id, :status, :directory, :key, :created_at).to_a.map(&:attributes))
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Maintenance
# Helper to keep backfilling LogTickets to TimescaleDB downloads table.
# It will be used to migrate the data from the old LogTicket table to the new LogDownload table.
# It will be executed in the background and it will be a one time task.
# Later, after all pending LogTickets are migrated, this job will be removed.
class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task
def collection
LogDownload.where(status: "pending")
end

def process(element)
FastlyLogDownloadsProcessor.new(element.directory, element.key).perform
end
end
end
3 changes: 2 additions & 1 deletion config/initializers/zeitwerk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

Rails.autoloaders.once.inflector.inflect(
"http" => "HTTP",
"oidc" => "OIDC"
"oidc" => "OIDC",
"downloads_db" => "DownloadsDB"
)
29 changes: 29 additions & 0 deletions db/downloads_migrate/20240708184547_create_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class CreateDownloads < ActiveRecord::Migration[7.1]

disable_ddl_transaction!

def self.up
self.down if Download.table_exists?

hypertable_options = {
time_column: 'created_at',
chunk_time_interval: '1 day',
compress_segmentby: 'gem_name, gem_version',
compress_orderby: 'created_at DESC',
compression_interval: '7 days'
}

create_table(:downloads, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :created_at, null: false
t.text :gem_name, :gem_version, null: false
t.jsonb :payload
end

Download.create_continuous_aggregates
end
def self.down
Download.drop_continuous_aggregates

drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists?
end
end
16 changes: 16 additions & 0 deletions db/downloads_migrate/20240823181725_create_log_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Mimic LogTicket table to store the log files but for the downloads database
# It will be used to store the log files to be processed during the migration
class CreateLogDownloads < ActiveRecord::Migration[7.1]
def change
create_table :log_downloads do |t|
t.string :key
t.string :directory
t.integer :backend
t.string :status, default: "pending"
t.integer :processed_count, default: 0
t.timestamps
end

add_index :log_downloads, [:key, :directory], unique: true
end
end
Loading
Loading