Function bodies 405 total
TurbofanTempTest class · ruby · L2-L4 (3 LOC)examples/compute_environments/turbofan_temp_test/definition.rb
class TurbofanTempTest
include Turbofan::ComputeEnvironment
endHelloPolyglot class · ruby · L7-L18 (12 LOC)examples/pipelines/hello_polyglot/turbofan.rb
class HelloPolyglot
include Turbofan::Pipeline
pipeline_name "hello_polyglot"
compute_environment ComputeEnvironments::TurbofanTempTest
pipeline do
r = fan_out(hello_ruby(trigger_input), batch_size: 1)
p = fan_out(hello_python(r), batch_size: 1)
n = fan_out(hello_node(p), batch_size: 1)
fan_out(hello_rust(n), batch_size: 1)
endHomeWorkDetectionWorkers class · ruby · L1-L8 (8 LOC)examples/pipelines/home_work_detection_workers/turbofan.rb
class HomeWorkDetectionWorkers
include Turbofan::Pipeline
pipeline_name "home_work_detection_workers"
pipeline do
# Add steps with: turbofan add step STEP_NAME
endDetectLocations class · ruby · L1-L14 (14 LOC)examples/steps/detect_locations/worker.rb
class DetectLocations
include Turbofan::Step
family :c
cpu 2
uses :duckdb
input_schema "detect_locations_input.json"
output_schema "detect_locations_output.json"
def call(inputs, context)
# TODO: implement
end
endcall method · ruby · L11-L13 (3 LOC)examples/steps/detect_locations/worker.rb
def call(inputs, context)
# TODO: implement
endmain function · javascript · L5-L34 (30 LOC)examples/steps/hello_node/index.mjs
async function main() {
const bucket = process.env.TURBOFAN_BUCKET;
const executionId = process.env.TURBOFAN_EXECUTION_ID;
const stepName = process.env.TURBOFAN_STEP_NAME;
const arrayIndex = process.env.AWS_BATCH_JOB_ARRAY_INDEX;
// Read input
const inputKey = `${executionId}/${stepName}/input/${arrayIndex}.json`;
const getResponse = await s3.send(
new GetObjectCommand({ Bucket: bucket, Key: inputKey })
);
const body = await getResponse.Body.transformToString();
const data = JSON.parse(body);
// Append greeting
data.output.push("Hello from Node");
// Write output
const outputKey = `${executionId}/${stepName}/output/${arrayIndex}.json`;
await s3.send(
new PutObjectCommand({
Bucket: bucket,
Key: outputKey,
Body: JSON.stringify(data),
})
);
// Print result to stdout
console.log(JSON.stringify(data));
}HelloNode class · ruby · L1-L10 (10 LOC)examples/steps/hello_node/worker.rb
class HelloNode include Turbofan::Step compute_environment ComputeEnvironments::TurbofanTempTest cpu 1 ram 2 input_schema "hello_polyglot.json" output_schema "hello_polyglot.json" end
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
main function · python · L9-L28 (20 LOC)examples/steps/hello_python/main.py
def main():
bucket = os.environ["TURBOFAN_BUCKET"]
execution_id = os.environ["TURBOFAN_EXECUTION_ID"]
step_name = os.environ["TURBOFAN_STEP_NAME"]
array_index = os.environ["AWS_BATCH_JOB_ARRAY_INDEX"]
# Read input
input_key = f"{execution_id}/{step_name}/input/{array_index}.json"
response = s3.get_object(Bucket=bucket, Key=input_key)
data = json.loads(response["Body"].read())
# Append greeting
data["output"].append("Hello from Python")
# Write output
output_key = f"{execution_id}/{step_name}/output/{array_index}.json"
s3.put_object(Bucket=bucket, Key=output_key, Body=json.dumps(data))
# Print result to stdout
print(json.dumps(data))HelloPython class · ruby · L1-L10 (10 LOC)examples/steps/hello_python/worker.rb
class HelloPython include Turbofan::Step compute_environment ComputeEnvironments::TurbofanTempTest cpu 1 ram 2 input_schema "hello_polyglot.json" output_schema "hello_polyglot.json" end
HelloRuby class · ruby · L1-L8 (8 LOC)examples/steps/hello_ruby/worker.rb
class HelloRuby
include Turbofan::Step
if defined?(ComputeEnvironments::TurbofanTempTest)
compute_environment ComputeEnvironments::TurbofanTempTest
cpu 1
ram 2
endcall function · ruby · L13-L16 (4 LOC)examples/steps/hello_ruby/worker.rb
def call(inputs, context)
output = inputs.first["output"] + ["Hello from Ruby"]
{"output" => output}
endmain function · rust · L6-L45 (40 LOC)examples/steps/hello_rust/src/main.rs
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let s3 = Client::new(&config);
let bucket = env::var("TURBOFAN_BUCKET")?;
let execution_id = env::var("TURBOFAN_EXECUTION_ID")?;
let step_name = env::var("TURBOFAN_STEP_NAME")?;
let array_index = env::var("AWS_BATCH_JOB_ARRAY_INDEX")?;
// Read input
let input_key = format!("{execution_id}/{step_name}/input/{array_index}.json");
let get_result = s3
.get_object()
.bucket(&bucket)
.key(&input_key)
.send()
.await?;
let body = get_result.body.collect().await?.into_bytes();
let mut data: Value = serde_json::from_slice(&body)?;
// Append greeting
if let Some(output) = data.get_mut("output").and_then(|v| v.as_array_mut()) {
output.push(Value::String("Hello from Rust".to_string()));
}
// Write output
let output_key = format!("{execution_iHelloRust class · ruby · L1-L10 (10 LOC)examples/steps/hello_rust/worker.rb
class HelloRust include Turbofan::Step compute_environment ComputeEnvironments::TurbofanTempTest cpu 1 ram 2 input_schema "hello_polyglot.json" output_schema "hello_polyglot.json" end
run method · ruby · L6-L14 (9 LOC)lib/turbofan/check/dag_check.rb
def self.run(pipeline:)
dag = pipeline.turbofan_dag
dag.sorted_steps
Result.new(passed: true, errors: [], warnings: [], report: nil)
rescue ArgumentError => e
Result.new(passed: false, errors: [e.message], warnings: [], report: nil)
rescue TSort::Cyclic => e
Result.new(passed: false, errors: ["Cyclic dependency detected: #{e.message}"], warnings: [], report: nil)
endrun method · ruby · L6-L35 (30 LOC)lib/turbofan/check/instance_check.rb
def self.run(steps:)
report = {}
warnings = []
steps.each do |step_name, step_class|
duckdb = step_class.turbofan_needs_duckdb?
if step_class.turbofan_sizes.empty?
report[step_name] = build_single_report(step_class, duckdb)
check_narrow_pool(report[step_name][:instance_types], step_name, warnings)
else
sizes_report = {}
step_class.turbofan_sizes.each do |size_name, derived|
size_cpu = derived[:cpu]
size_ram = derived[:ram]
size_cpu ||= [size_ram / 2, 1].max if size_ram
size_ram ||= size_cpu * 2 if size_cpu
selector_result = InstanceSelector.select(
cpu: size_cpu,
ram: size_ram,
duckdb: duckdb
)
sizes_report[size_name] = {
instance_types: selector_result.instance_types,
waste: build_waste_hash(selector_Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
build_single_report function · ruby · L43-L62 (20 LOC)lib/turbofan/check/instance_check.rb
def self.build_single_report(step_class, duckdb)
cpu = step_class.turbofan_default_cpu
ram = step_class.turbofan_default_ram
return {instance_types: [], waste: {}, spot_availability: nil, note: "no cpu/ram set"} unless cpu || ram
# Default missing dimension based on what was specified
cpu ||= [(ram / 8.0).ceil, 1].max
ram ||= cpu * 2
selector_result = InstanceSelector.select(
cpu: cpu,
ram: ram,
duckdb: duckdb
)
{
instance_types: selector_result.instance_types,
waste: build_waste_hash(selector_result),
spot_availability: selector_result.spot_availability
}
endbuild_waste_hash function · ruby · L65-L68 (4 LOC)lib/turbofan/check/instance_check.rb
def self.build_waste_hash(selector_result)
selector_result.details.each_with_object({}) do |detail, hash|
hash[detail[:type]] = detail[:waste]
endcheck_narrow_pool function · ruby · L72-L76 (5 LOC)lib/turbofan/check/instance_check.rb
def self.check_narrow_pool(instance_types, step_name, warnings)
return if instance_types.size >= NARROW_POOL_THRESHOLD
warnings << "Step :#{step_name} has a narrow instance pool (#{instance_types.size} types)"
endrun method · ruby · L4-L12 (9 LOC)lib/turbofan/check/pipeline_check.rb
def self.run(pipeline:, steps:)
errors = []
warnings = []
# 1. Pipeline name must be present
name = pipeline.turbofan_name
if name.nil? || name.to_s.strip.empty?
errors << "Pipeline name is not set (turbofan_name is blank)"
endrun method · ruby · L9-L25 (17 LOC)lib/turbofan/check/resource_check.rb
def self.run(pipeline:, steps:, resources:)
errors = []
warnings = []
fan_out_step_names = detect_fan_out_steps(pipeline)
steps.each do |step_name, step_class|
keys = step_class.turbofan_resource_keys
next if keys.empty?
keys.each do |key|
next if BUILT_IN_RESOURCES.include?(key)
unless resources[key]
errors << "Step :#{step_name} uses :#{key} but no matching Resource with `key :#{key}` was found"
next
enddetect_fan_out_steps function · ruby · L39-L44 (6 LOC)lib/turbofan/check/resource_check.rb
def self.detect_fan_out_steps(pipeline)
dag = pipeline.turbofan_dag
dag.steps.select(&:fan_out?).map(&:name).to_set
rescue ArgumentError, Turbofan::SchemaIncompatibleError
Set.new
endrun method · ruby · L4-L18 (15 LOC)lib/turbofan/check/router_check.rb
def self.run(steps:, routers:)
errors = []
routers.each do |step_name, router_class|
step_class = steps[step_name]
next unless step_class
step_sizes = step_class.turbofan_sizes.keys
router_sizes = router_class.turbofan_sizes
extra = router_sizes - step_sizes
next if extra.empty?
errors << "Router for :#{step_name} declares sizes #{extra.map { |s| ":#{s}" }} not found on step"
endCLI class · ruby · L4-L18 (15 LOC)lib/turbofan/cli/add.rb
class CLI < Thor
module Add
def self.call(step_name, duckdb: true, compute_environment: :compute, cpu: 1, extensions: [])
Dir.chdir(Turbofan::CLI.project_root) do
step_dir = File.join("turbofans", "steps", step_name)
schemas_dir = File.join("turbofans", "schemas")
class_name = step_name.split("_").map(&:capitalize).join
FileUtils.mkdir_p(step_dir)
FileUtils.mkdir_p(schemas_dir)
CLI::New.write_step(step_dir, class_name, duckdb: duckdb, step_name: step_name, compute_environment: compute_environment, cpu: cpu, extensions: extensions)
CLI::New.write_schema(schemas_dir, step_name)
end
end
endPowered by Repobility — scan your code at https://repobility.com
call method · ruby · L6-L16 (11 LOC)lib/turbofan/cli/add.rb
def self.call(step_name, duckdb: true, compute_environment: :compute, cpu: 1, extensions: [])
Dir.chdir(Turbofan::CLI.project_root) do
step_dir = File.join("turbofans", "steps", step_name)
schemas_dir = File.join("turbofans", "schemas")
class_name = step_name.split("_").map(&:capitalize).join
FileUtils.mkdir_p(step_dir)
FileUtils.mkdir_p(schemas_dir)
CLI::New.write_step(step_dir, class_name, duckdb: duckdb, step_name: step_name, compute_environment: compute_environment, cpu: cpu, extensions: extensions)
CLI::New.write_schema(schemas_dir, step_name)
endCLI class · ruby · L4-L31 (28 LOC)lib/turbofan/cli/add_router.rb
class CLI < Thor
module AddRouter
def self.call(step_name)
Dir.chdir(Turbofan::CLI.project_root) do
class_name = step_name.split("_").map(&:capitalize).join
step_dir = File.join("turbofans", "steps", step_name)
router_dir = File.join(step_dir, "router")
FileUtils.mkdir_p(router_dir)
write_router(router_dir, class_name)
write_gemfile(router_dir)
end
end
def self.write_router(dir, class_name)
File.write(File.join(dir, "router.rb"), <<~RUBY)
class #{class_name}Router
include Turbofan::Router
sizes :s, :m, :l
def route(input)
# TODO: implement routing logic
:m
end
end
RUBY
endcall method · ruby · L6-L15 (10 LOC)lib/turbofan/cli/add_router.rb
def self.call(step_name)
Dir.chdir(Turbofan::CLI.project_root) do
class_name = step_name.split("_").map(&:capitalize).join
step_dir = File.join("turbofans", "steps", step_name)
router_dir = File.join(step_dir, "router")
FileUtils.mkdir_p(router_dir)
write_router(router_dir, class_name)
write_gemfile(router_dir)
endwrite_router method · ruby · L18-L29 (12 LOC)lib/turbofan/cli/add_router.rb
def self.write_router(dir, class_name)
File.write(File.join(dir, "router.rb"), <<~RUBY)
class #{class_name}Router
include Turbofan::Router
sizes :s, :m, :l
def route(input)
# TODO: implement routing logic
:m
end
endroute method · ruby · L25-L28 (4 LOC)lib/turbofan/cli/add_router.rb
def route(input)
# TODO: implement routing logic
:m
endwrite_gemfile method · ruby · L33-L39 (7 LOC)lib/turbofan/cli/add_router.rb
def self.write_gemfile(dir)
File.write(File.join(dir, "Gemfile"), <<~RUBY)
source "https://rubygems.org"
gem "turbofan"
# Keep lightweight — this runs on Lambda
RUBY
endCLI class · ruby · L2-L21 (20 LOC)lib/turbofan/cli/ce.rb
class CLI < Thor
module Ce
def self.new_ce(name)
Dir.chdir(Turbofan::CLI.project_root) do
class_name = Turbofan::Naming.pascal_case(name)
ce_dir = File.join("turbofans", "compute_environments")
FileUtils.mkdir_p(ce_dir)
File.write(File.join(ce_dir, "#{name}.rb"), <<~RUBY)
module ComputeEnvironments
class #{class_name}
include Turbofan::ComputeEnvironment
instance_types %w[optimal]
max_vcpus 256
end
end
RUBY
end
endnew_ce method · ruby · L4-L18 (15 LOC)lib/turbofan/cli/ce.rb
def self.new_ce(name)
Dir.chdir(Turbofan::CLI.project_root) do
class_name = Turbofan::Naming.pascal_case(name)
ce_dir = File.join("turbofans", "compute_environments")
FileUtils.mkdir_p(ce_dir)
File.write(File.join(ce_dir, "#{name}.rb"), <<~RUBY)
module ComputeEnvironments
class #{class_name}
include Turbofan::ComputeEnvironment
instance_types %w[optimal]
max_vcpus 256
end
endRepobility analyzer · published findings · https://repobility.com
deploy method · ruby · L23-L36 (14 LOC)lib/turbofan/cli/ce.rb
def self.deploy(stage:)
require "aws-sdk-cloudformation"
load_all_definitions
cf_client = Aws::CloudFormation::Client.new
Turbofan::ComputeEnvironment.discover.each do |ce_class|
template_body = ce_class.generate_template(stage: stage)
stack_name = ce_class.stack_name(stage)
Turbofan::Deploy::StackManager.deploy(
cf_client,
stack_name: stack_name,
template_body: template_body,
parameters: []
)
endlist function · ruby · L39-L43 (5 LOC)lib/turbofan/cli/ce.rb
def self.list
load_all_definitions
Turbofan::ComputeEnvironment.discover.each do |ce_class|
puts ce_class.name
endload_all_definitions function · ruby · L46-L49 (4 LOC)lib/turbofan/cli/ce.rb
def self.load_all_definitions
Dir.glob(File.join("turbofans", "compute_environments", "*.rb")).each do |path|
Kernel.load(File.expand_path(path))
endCLI class · ruby · L2-L54 (53 LOC)lib/turbofan/cli/check.rb
class CLI < Thor
module Check
def self.call(pipeline_name:, stage:, load_result: nil)
Turbofan::CLI::Ce.load_all_definitions
turbofans_root = "turbofans"
pipeline_file = File.join(turbofans_root, "pipelines", "#{pipeline_name}.rb")
load_result ||= Turbofan::Deploy::PipelineLoader.load(pipeline_file, turbofans_root: turbofans_root)
pipeline = load_result.pipeline
steps = load_result.steps
all_errors = []
all_warnings = []
# Run PipelineCheck
pipeline_result = Turbofan::Check::PipelineCheck.run(pipeline: pipeline, steps: steps)
all_errors.concat(pipeline_result.errors)
all_warnings.concat(pipeline_result.warnings)
# Run DagCheck
dag_result = Turbofan::Check::DagCheck.run(pipeline: pipeline)
all_errors.concat(dag_result.errors)
all_warnings.concat(dag_result.warnings)
# Run ResourceCheck
resources = Turbofan.discover_components[:rcall method · ruby · L4-L45 (42 LOC)lib/turbofan/cli/check.rb
def self.call(pipeline_name:, stage:, load_result: nil)
Turbofan::CLI::Ce.load_all_definitions
turbofans_root = "turbofans"
pipeline_file = File.join(turbofans_root, "pipelines", "#{pipeline_name}.rb")
load_result ||= Turbofan::Deploy::PipelineLoader.load(pipeline_file, turbofans_root: turbofans_root)
pipeline = load_result.pipeline
steps = load_result.steps
all_errors = []
all_warnings = []
# Run PipelineCheck
pipeline_result = Turbofan::Check::PipelineCheck.run(pipeline: pipeline, steps: steps)
all_errors.concat(pipeline_result.errors)
all_warnings.concat(pipeline_result.warnings)
# Run DagCheck
dag_result = Turbofan::Check::DagCheck.run(pipeline: pipeline)
all_errors.concat(dag_result.errors)
all_warnings.concat(dag_result.warnings)
# Run ResourceCheck
resources = Turbofan.discover_components[:resources]
resource_result = discover_routers function · ruby · L65-L71 (7 LOC)lib/turbofan/cli/check.rb
def self.discover_routers(steps)
routers = {}
steps.each_key do |step_name|
router_path = File.join("turbofans", "steps", step_name.to_s, "router.rb")
next unless File.exist?(router_path)
Kernel.load(router_path)
endDatabase class · ruby · L10-L15 (6 LOC)lib/turbofan/cli/cost.rb
class Database
def self.open = raise(LoadError, "duckdb gem not installed")
end
Error = Class.new(StandardError) unless defined?(DuckDB::Error)
endCLI class · ruby · L20-L121 (102 LOC)lib/turbofan/cli/cost.rb
class CLI < Thor
module Cost
def self.call(pipeline_name:, stage:)
dash_name = pipeline_name.tr("_", "-")
now = Time.now
billing_period = now.strftime("%Y-%m")
db = DuckDB::Database.open
conn = db.connect
exec_rows = query_executions(conn, dash_name, billing_period)
step_rows = query_steps(conn, dash_name, billing_period)
total_rows = query_total(conn, dash_name, billing_period)
if exec_rows.empty? && step_rows.empty?
$stdout.puts "No cost data found for #{dash_name} (#{stage})."
return
end
print_header(dash_name, stage, now, billing_period)
print_executions(exec_rows)
print_steps(step_rows, total_rows)
print_total(total_rows, now)
export_parquet(conn, dash_name, billing_period, now)
rescue DuckDB::Error => e
warn e.message
$stdout.puts "No cost data found for #{dash_name} (#{stage})."
ensure
connHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
call method · ruby · L22-L37 (16 LOC)lib/turbofan/cli/cost.rb
def self.call(pipeline_name:, stage:)
dash_name = pipeline_name.tr("_", "-")
now = Time.now
billing_period = now.strftime("%Y-%m")
db = DuckDB::Database.open
conn = db.connect
exec_rows = query_executions(conn, dash_name, billing_period)
step_rows = query_steps(conn, dash_name, billing_period)
total_rows = query_total(conn, dash_name, billing_period)
if exec_rows.empty? && step_rows.empty?
$stdout.puts "No cost data found for #{dash_name} (#{stage})."
return
endquery_executions method · ruby · L51-L67 (17 LOC)lib/turbofan/cli/cost.rb
def self.query_executions(conn, pipeline_name, billing_period)
conn.query(<<~SQL, pipeline_name, billing_period).to_a
SELECT
resource_tags_turbofan_pipeline AS pipeline,
resource_tags_turbofan_execution AS execution,
SUM(line_item_unblended_cost) AS cost,
MIN(line_item_usage_start_date) AS min_time,
MAX(line_item_usage_end_date) AS max_time
FROM 'cur/*.parquet'
WHERE resource_tags_turbofan_managed = 'true'
AND resource_tags_turbofan_pipeline = $1
AND billing_period = $2
GROUP BY resource_tags_turbofan_pipeline, resource_tags_turbofan_execution
ORDER BY min_time DESC
LIMIT 10
SQL
endquery_steps method · ruby · L70-L83 (14 LOC)lib/turbofan/cli/cost.rb
def self.query_steps(conn, pipeline_name, billing_period)
conn.query(<<~SQL, pipeline_name, billing_period).to_a
SELECT
resource_tags_turbofan_step AS step,
SUM(line_item_unblended_cost) AS cost
FROM 'cur/*.parquet'
WHERE resource_tags_turbofan_managed = 'true'
AND resource_tags_turbofan_pipeline = $1
AND resource_tags_turbofan_execution IS NOT NULL
AND billing_period = $2
GROUP BY resource_tags_turbofan_step
ORDER BY cost DESC
SQL
endquery_total method · ruby · L86-L97 (12 LOC)lib/turbofan/cli/cost.rb
def self.query_total(conn, pipeline_name, billing_period)
conn.query(<<~SQL, pipeline_name, billing_period).to_a
SELECT
SUM(line_item_unblended_cost) AS cost
FROM 'cur/*.parquet'
WHERE resource_tags_turbofan_managed = 'true'
AND resource_tags_turbofan_pipeline = $1
AND resource_tags_turbofan_step IS NOT NULL
AND resource_tags_turbofan_execution IS NOT NULL
AND billing_period = $2
SQL
endprint_header method · ruby · L100-L106 (7 LOC)lib/turbofan/cli/cost.rb
def self.print_header(pipeline_name, stage, now, billing_period)
period_start = "#{billing_period}-01"
period_end = now.strftime("%Y-%m-%d")
$stdout.puts "Pipeline: #{pipeline_name} (#{stage})"
$stdout.puts "Period: #{period_start} to #{period_end}"
$stdout.puts ""
endprint_executions method · ruby · L109-L119 (11 LOC)lib/turbofan/cli/cost.rb
def self.print_executions(rows)
$stdout.puts "Recent Executions:"
rows.each do |row|
execution = row["execution"]
cost = format("%.2f", row["cost"])
min_time = row["min_time"].to_s
max_time = row["max_time"].to_s
duration = format_duration(min_time, max_time)
date_part = min_time[0, 16]
$stdout.puts " #{date_part} #{execution} $#{cost} (#{duration})"
endprint_steps method · ruby · L124-L133 (10 LOC)lib/turbofan/cli/cost.rb
def self.print_steps(rows, total_rows)
total_cost = total_rows.first&.fetch("cost", 0).to_f
$stdout.puts "By Step (current month):"
rows.each do |row|
step_name = row["step"]
cost = row["cost"].to_f
cost_str = format("%.2f", cost)
pct = (total_cost > 0) ? (cost / total_cost * 100).round : 0
$stdout.puts " #{step_name} $#{cost_str} (#{pct}%)"
endprint_total function · ruby · L138-L142 (5 LOC)lib/turbofan/cli/cost.rb
def self.print_total(total_rows, now)
total_cost = total_rows.first&.fetch("cost", 0).to_f
month_name = now.strftime("%b %Y")
$stdout.puts "Total (#{month_name}): $#{format("%.2f", total_cost)}"
endProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
export_parquet function · ruby · L145-L165 (21 LOC)lib/turbofan/cli/cost.rb
def self.export_parquet(conn, pipeline_name, billing_period, now)
filename = "cost-#{now.strftime("%Y-%m-%dT%H%M%S")}.parquet"
conn.query(<<~SQL, pipeline_name, billing_period)
CREATE OR REPLACE TEMP VIEW cost_export AS
SELECT
resource_tags_turbofan_pipeline AS pipeline,
resource_tags_turbofan_step AS step,
resource_tags_turbofan_execution AS execution,
SUM(line_item_unblended_cost) AS cost
FROM 'cur/*.parquet'
WHERE resource_tags_turbofan_managed = 'true'
AND resource_tags_turbofan_pipeline = $1
AND billing_period = $2
GROUP BY ALL
ORDER BY cost DESC
SQL
# COPY TO does not support parameterized filenames; sanitize instead
safe_filename = filename.gsub(/[^a-zA-Z0-9._\-\/]/, "_")
conn.query("COPY cost_export TO '#{safe_filename}' (FORMAT PARQUET)")
$stdout.puts "Saved to #{filename}"
endformat_duration function · ruby · L168-L179 (12 LOC)lib/turbofan/cli/cost.rb
def self.format_duration(min_time_str, max_time_str)
return "0m" if min_time_str.empty? || max_time_str.empty?
min_t = Time.parse(min_time_str)
max_t = Time.parse(max_time_str)
seconds = (max_t - min_t).to_i
hours = seconds / 3600
minutes = (seconds % 3600) / 60
if hours > 0
"#{hours}h #{minutes.to_s.rjust(2, "0")}m"
else
"#{minutes}m"
endCLI class · ruby · L2-L28 (27 LOC)lib/turbofan/cli/deploy/preflight.rb
class CLI < Thor
module Deploy
module Preflight
def self.buildkit_available?
system("docker", "buildx", "version", out: File::NULL, err: File::NULL) == true
end
def self.aws_credentials_valid?
Aws::STS::Client.new.get_caller_identity
true
rescue Aws::STS::Errors::ServiceError
false
end
def self.git_clean?
`git status --porcelain`.strip.empty?
end
def self.warn_running_executions(sfn_client, state_machine_arn)
running = sfn_client.list_executions(state_machine_arn: state_machine_arn, status_filter: "RUNNING")
if running.executions.any?
puts "WARNING: #{running.executions.size} execution(s) currently running."
puts "Running executions will continue with previous job definitions."
end
end
end
endpage 1 / 9next ›