← back to dataPlor__turbofan

Function bodies 405 total

All specs Real LLM only Function bodies
TurbofanTempTest class · ruby · L2-L4 (3 LOC)
examples/compute_environments/turbofan_temp_test/definition.rb
  class TurbofanTempTest
    include Turbofan::ComputeEnvironment
  end
HelloPolyglot class · ruby · L7-L18 (12 LOC)
examples/pipelines/hello_polyglot/turbofan.rb
class HelloPolyglot
  include Turbofan::Pipeline

  pipeline_name "hello_polyglot"
  compute_environment ComputeEnvironments::TurbofanTempTest

  pipeline do
    r = fan_out(hello_ruby(trigger_input), batch_size: 1)
    p = fan_out(hello_python(r), batch_size: 1)
    n = fan_out(hello_node(p), batch_size: 1)
    fan_out(hello_rust(n), batch_size: 1)
  end
HomeWorkDetectionWorkers class · ruby · L1-L8 (8 LOC)
examples/pipelines/home_work_detection_workers/turbofan.rb
class HomeWorkDetectionWorkers
  include Turbofan::Pipeline

  pipeline_name "home_work_detection_workers"

  pipeline do
    # Add steps with: turbofan add step STEP_NAME
  end
DetectLocations class · ruby · L1-L14 (14 LOC)
examples/steps/detect_locations/worker.rb
class DetectLocations
  include Turbofan::Step

  family :c
  cpu 2
  uses :duckdb

  input_schema "detect_locations_input.json"
  output_schema "detect_locations_output.json"

  def call(inputs, context)
    # TODO: implement
  end
end
call method · ruby · L11-L13 (3 LOC)
examples/steps/detect_locations/worker.rb
  def call(inputs, context)
    # TODO: implement
  end
main function · javascript · L5-L34 (30 LOC)
examples/steps/hello_node/index.mjs
async function main() {
  const bucket = process.env.TURBOFAN_BUCKET;
  const executionId = process.env.TURBOFAN_EXECUTION_ID;
  const stepName = process.env.TURBOFAN_STEP_NAME;
  const arrayIndex = process.env.AWS_BATCH_JOB_ARRAY_INDEX;

  // Read input
  const inputKey = `${executionId}/${stepName}/input/${arrayIndex}.json`;
  const getResponse = await s3.send(
    new GetObjectCommand({ Bucket: bucket, Key: inputKey })
  );
  const body = await getResponse.Body.transformToString();
  const data = JSON.parse(body);

  // Append greeting
  data.output.push("Hello from Node");

  // Write output
  const outputKey = `${executionId}/${stepName}/output/${arrayIndex}.json`;
  await s3.send(
    new PutObjectCommand({
      Bucket: bucket,
      Key: outputKey,
      Body: JSON.stringify(data),
    })
  );

  // Print result to stdout
  console.log(JSON.stringify(data));
}
HelloNode class · ruby · L1-L10 (10 LOC)
examples/steps/hello_node/worker.rb
class HelloNode
  include Turbofan::Step

  compute_environment ComputeEnvironments::TurbofanTempTest
  cpu 1
  ram 2

  input_schema "hello_polyglot.json"
  output_schema "hello_polyglot.json"
end
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
main function · python · L9-L28 (20 LOC)
examples/steps/hello_python/main.py
def main():
    bucket = os.environ["TURBOFAN_BUCKET"]
    execution_id = os.environ["TURBOFAN_EXECUTION_ID"]
    step_name = os.environ["TURBOFAN_STEP_NAME"]
    array_index = os.environ["AWS_BATCH_JOB_ARRAY_INDEX"]

    # Read input
    input_key = f"{execution_id}/{step_name}/input/{array_index}.json"
    response = s3.get_object(Bucket=bucket, Key=input_key)
    data = json.loads(response["Body"].read())

    # Append greeting
    data["output"].append("Hello from Python")

    # Write output
    output_key = f"{execution_id}/{step_name}/output/{array_index}.json"
    s3.put_object(Bucket=bucket, Key=output_key, Body=json.dumps(data))

    # Print result to stdout
    print(json.dumps(data))
HelloPython class · ruby · L1-L10 (10 LOC)
examples/steps/hello_python/worker.rb
class HelloPython
  include Turbofan::Step

  compute_environment ComputeEnvironments::TurbofanTempTest
  cpu 1
  ram 2

  input_schema "hello_polyglot.json"
  output_schema "hello_polyglot.json"
end
HelloRuby class · ruby · L1-L8 (8 LOC)
examples/steps/hello_ruby/worker.rb
class HelloRuby
  include Turbofan::Step

  if defined?(ComputeEnvironments::TurbofanTempTest)
    compute_environment ComputeEnvironments::TurbofanTempTest
    cpu 1
    ram 2
  end
call function · ruby · L13-L16 (4 LOC)
examples/steps/hello_ruby/worker.rb
  def call(inputs, context)
    output = inputs.first["output"] + ["Hello from Ruby"]
    {"output" => output}
  end
main function · rust · L6-L45 (40 LOC)
examples/steps/hello_rust/src/main.rs
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
    let s3 = Client::new(&config);

    let bucket = env::var("TURBOFAN_BUCKET")?;
    let execution_id = env::var("TURBOFAN_EXECUTION_ID")?;
    let step_name = env::var("TURBOFAN_STEP_NAME")?;
    let array_index = env::var("AWS_BATCH_JOB_ARRAY_INDEX")?;

    // Read input
    let input_key = format!("{execution_id}/{step_name}/input/{array_index}.json");
    let get_result = s3
        .get_object()
        .bucket(&bucket)
        .key(&input_key)
        .send()
        .await?;
    let body = get_result.body.collect().await?.into_bytes();
    let mut data: Value = serde_json::from_slice(&body)?;

    // Append greeting
    if let Some(output) = data.get_mut("output").and_then(|v| v.as_array_mut()) {
        output.push(Value::String("Hello from Rust".to_string()));
    }

    // Write output
    let output_key = format!("{execution_i
HelloRust class · ruby · L1-L10 (10 LOC)
examples/steps/hello_rust/worker.rb
class HelloRust
  include Turbofan::Step

  compute_environment ComputeEnvironments::TurbofanTempTest
  cpu 1
  ram 2

  input_schema "hello_polyglot.json"
  output_schema "hello_polyglot.json"
end
run method · ruby · L6-L14 (9 LOC)
lib/turbofan/check/dag_check.rb
      def self.run(pipeline:)
        dag = pipeline.turbofan_dag
        dag.sorted_steps
        Result.new(passed: true, errors: [], warnings: [], report: nil)
      rescue ArgumentError => e
        Result.new(passed: false, errors: [e.message], warnings: [], report: nil)
      rescue TSort::Cyclic => e
        Result.new(passed: false, errors: ["Cyclic dependency detected: #{e.message}"], warnings: [], report: nil)
      end
run method · ruby · L6-L35 (30 LOC)
lib/turbofan/check/instance_check.rb
      def self.run(steps:)
        report = {}
        warnings = []

        steps.each do |step_name, step_class|
          duckdb = step_class.turbofan_needs_duckdb?

          if step_class.turbofan_sizes.empty?
            report[step_name] = build_single_report(step_class, duckdb)
            check_narrow_pool(report[step_name][:instance_types], step_name, warnings)
          else
            sizes_report = {}
            step_class.turbofan_sizes.each do |size_name, derived|
              size_cpu = derived[:cpu]
              size_ram = derived[:ram]
              size_cpu ||= [size_ram / 2, 1].max if size_ram
              size_ram ||= size_cpu * 2 if size_cpu

              selector_result = InstanceSelector.select(
                cpu: size_cpu,
                ram: size_ram,
                duckdb: duckdb
              )
              sizes_report[size_name] = {
                instance_types: selector_result.instance_types,
                waste: build_waste_hash(selector_
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
build_single_report function · ruby · L43-L62 (20 LOC)
lib/turbofan/check/instance_check.rb
      def self.build_single_report(step_class, duckdb)
        cpu = step_class.turbofan_default_cpu
        ram = step_class.turbofan_default_ram
        return {instance_types: [], waste: {}, spot_availability: nil, note: "no cpu/ram set"} unless cpu || ram

        # Default missing dimension based on what was specified
        cpu ||= [(ram / 8.0).ceil, 1].max
        ram ||= cpu * 2

        selector_result = InstanceSelector.select(
          cpu: cpu,
          ram: ram,
          duckdb: duckdb
        )
        {
          instance_types: selector_result.instance_types,
          waste: build_waste_hash(selector_result),
          spot_availability: selector_result.spot_availability
        }
      end
build_waste_hash function · ruby · L65-L68 (4 LOC)
lib/turbofan/check/instance_check.rb
      def self.build_waste_hash(selector_result)
        selector_result.details.each_with_object({}) do |detail, hash|
          hash[detail[:type]] = detail[:waste]
        end
check_narrow_pool function · ruby · L72-L76 (5 LOC)
lib/turbofan/check/instance_check.rb
      def self.check_narrow_pool(instance_types, step_name, warnings)
        return if instance_types.size >= NARROW_POOL_THRESHOLD

        warnings << "Step :#{step_name} has a narrow instance pool (#{instance_types.size} types)"
      end
run method · ruby · L4-L12 (9 LOC)
lib/turbofan/check/pipeline_check.rb
      def self.run(pipeline:, steps:)
        errors = []
        warnings = []

        # 1. Pipeline name must be present
        name = pipeline.turbofan_name
        if name.nil? || name.to_s.strip.empty?
          errors << "Pipeline name is not set (turbofan_name is blank)"
        end
run method · ruby · L9-L25 (17 LOC)
lib/turbofan/check/resource_check.rb
      def self.run(pipeline:, steps:, resources:)
        errors = []
        warnings = []

        fan_out_step_names = detect_fan_out_steps(pipeline)

        steps.each do |step_name, step_class|
          keys = step_class.turbofan_resource_keys
          next if keys.empty?

          keys.each do |key|
            next if BUILT_IN_RESOURCES.include?(key)

            unless resources[key]
              errors << "Step :#{step_name} uses :#{key} but no matching Resource with `key :#{key}` was found"
              next
            end
detect_fan_out_steps function · ruby · L39-L44 (6 LOC)
lib/turbofan/check/resource_check.rb
      def self.detect_fan_out_steps(pipeline)
        dag = pipeline.turbofan_dag
        dag.steps.select(&:fan_out?).map(&:name).to_set
      rescue ArgumentError, Turbofan::SchemaIncompatibleError
        Set.new
      end
run method · ruby · L4-L18 (15 LOC)
lib/turbofan/check/router_check.rb
      def self.run(steps:, routers:)
        errors = []

        routers.each do |step_name, router_class|
          step_class = steps[step_name]
          next unless step_class

          step_sizes = step_class.turbofan_sizes.keys
          router_sizes = router_class.turbofan_sizes
          extra = router_sizes - step_sizes

          next if extra.empty?

          errors << "Router for :#{step_name} declares sizes #{extra.map { |s| ":#{s}" }} not found on step"
        end
CLI class · ruby · L4-L18 (15 LOC)
lib/turbofan/cli/add.rb
  class CLI < Thor
    module Add
      def self.call(step_name, duckdb: true, compute_environment: :compute, cpu: 1, extensions: [])
        Dir.chdir(Turbofan::CLI.project_root) do
          step_dir = File.join("turbofans", "steps", step_name)
          schemas_dir = File.join("turbofans", "schemas")
          class_name = step_name.split("_").map(&:capitalize).join

          FileUtils.mkdir_p(step_dir)
          FileUtils.mkdir_p(schemas_dir)
          CLI::New.write_step(step_dir, class_name, duckdb: duckdb, step_name: step_name, compute_environment: compute_environment, cpu: cpu, extensions: extensions)
          CLI::New.write_schema(schemas_dir, step_name)
        end
      end
    end
Powered by Repobility — scan your code at https://repobility.com
call method · ruby · L6-L16 (11 LOC)
lib/turbofan/cli/add.rb
      def self.call(step_name, duckdb: true, compute_environment: :compute, cpu: 1, extensions: [])
        Dir.chdir(Turbofan::CLI.project_root) do
          step_dir = File.join("turbofans", "steps", step_name)
          schemas_dir = File.join("turbofans", "schemas")
          class_name = step_name.split("_").map(&:capitalize).join

          FileUtils.mkdir_p(step_dir)
          FileUtils.mkdir_p(schemas_dir)
          CLI::New.write_step(step_dir, class_name, duckdb: duckdb, step_name: step_name, compute_environment: compute_environment, cpu: cpu, extensions: extensions)
          CLI::New.write_schema(schemas_dir, step_name)
        end
CLI class · ruby · L4-L31 (28 LOC)
lib/turbofan/cli/add_router.rb
  class CLI < Thor
    module AddRouter
      def self.call(step_name)
        Dir.chdir(Turbofan::CLI.project_root) do
          class_name = step_name.split("_").map(&:capitalize).join
          step_dir = File.join("turbofans", "steps", step_name)
          router_dir = File.join(step_dir, "router")

          FileUtils.mkdir_p(router_dir)
          write_router(router_dir, class_name)
          write_gemfile(router_dir)
        end
      end

      def self.write_router(dir, class_name)
        File.write(File.join(dir, "router.rb"), <<~RUBY)
          class #{class_name}Router
            include Turbofan::Router

            sizes :s, :m, :l

            def route(input)
              # TODO: implement routing logic
              :m
            end
          end
        RUBY
      end
call method · ruby · L6-L15 (10 LOC)
lib/turbofan/cli/add_router.rb
      def self.call(step_name)
        Dir.chdir(Turbofan::CLI.project_root) do
          class_name = step_name.split("_").map(&:capitalize).join
          step_dir = File.join("turbofans", "steps", step_name)
          router_dir = File.join(step_dir, "router")

          FileUtils.mkdir_p(router_dir)
          write_router(router_dir, class_name)
          write_gemfile(router_dir)
        end
write_router method · ruby · L18-L29 (12 LOC)
lib/turbofan/cli/add_router.rb
      def self.write_router(dir, class_name)
        File.write(File.join(dir, "router.rb"), <<~RUBY)
          class #{class_name}Router
            include Turbofan::Router

            sizes :s, :m, :l

            def route(input)
              # TODO: implement routing logic
              :m
            end
          end
route method · ruby · L25-L28 (4 LOC)
lib/turbofan/cli/add_router.rb
            def route(input)
              # TODO: implement routing logic
              :m
            end
write_gemfile method · ruby · L33-L39 (7 LOC)
lib/turbofan/cli/add_router.rb
      def self.write_gemfile(dir)
        File.write(File.join(dir, "Gemfile"), <<~RUBY)
          source "https://rubygems.org"
          gem "turbofan"
          # Keep lightweight — this runs on Lambda
        RUBY
      end
CLI class · ruby · L2-L21 (20 LOC)
lib/turbofan/cli/ce.rb
  class CLI < Thor
    module Ce
      def self.new_ce(name)
        Dir.chdir(Turbofan::CLI.project_root) do
          class_name = Turbofan::Naming.pascal_case(name)
          ce_dir = File.join("turbofans", "compute_environments")
          FileUtils.mkdir_p(ce_dir)

          File.write(File.join(ce_dir, "#{name}.rb"), <<~RUBY)
            module ComputeEnvironments
              class #{class_name}
                include Turbofan::ComputeEnvironment

                instance_types %w[optimal]
                max_vcpus 256
              end
            end
          RUBY
        end
      end
new_ce method · ruby · L4-L18 (15 LOC)
lib/turbofan/cli/ce.rb
      def self.new_ce(name)
        Dir.chdir(Turbofan::CLI.project_root) do
          class_name = Turbofan::Naming.pascal_case(name)
          ce_dir = File.join("turbofans", "compute_environments")
          FileUtils.mkdir_p(ce_dir)

          File.write(File.join(ce_dir, "#{name}.rb"), <<~RUBY)
            module ComputeEnvironments
              class #{class_name}
                include Turbofan::ComputeEnvironment

                instance_types %w[optimal]
                max_vcpus 256
              end
            end
Repobility analyzer · published findings · https://repobility.com
deploy method · ruby · L23-L36 (14 LOC)
lib/turbofan/cli/ce.rb
      def self.deploy(stage:)
        require "aws-sdk-cloudformation"
        load_all_definitions
        cf_client = Aws::CloudFormation::Client.new
        Turbofan::ComputeEnvironment.discover.each do |ce_class|
          template_body = ce_class.generate_template(stage: stage)
          stack_name = ce_class.stack_name(stage)
          Turbofan::Deploy::StackManager.deploy(
            cf_client,
            stack_name: stack_name,
            template_body: template_body,
            parameters: []
          )
        end
list function · ruby · L39-L43 (5 LOC)
lib/turbofan/cli/ce.rb
      def self.list
        load_all_definitions
        Turbofan::ComputeEnvironment.discover.each do |ce_class|
          puts ce_class.name
        end
load_all_definitions function · ruby · L46-L49 (4 LOC)
lib/turbofan/cli/ce.rb
      def self.load_all_definitions
        Dir.glob(File.join("turbofans", "compute_environments", "*.rb")).each do |path|
          Kernel.load(File.expand_path(path))
        end
CLI class · ruby · L2-L54 (53 LOC)
lib/turbofan/cli/check.rb
  class CLI < Thor
    module Check
      def self.call(pipeline_name:, stage:, load_result: nil)
        Turbofan::CLI::Ce.load_all_definitions
        turbofans_root = "turbofans"
        pipeline_file = File.join(turbofans_root, "pipelines", "#{pipeline_name}.rb")
        load_result ||= Turbofan::Deploy::PipelineLoader.load(pipeline_file, turbofans_root: turbofans_root)

        pipeline = load_result.pipeline
        steps = load_result.steps

        all_errors = []
        all_warnings = []

        # Run PipelineCheck
        pipeline_result = Turbofan::Check::PipelineCheck.run(pipeline: pipeline, steps: steps)
        all_errors.concat(pipeline_result.errors)
        all_warnings.concat(pipeline_result.warnings)

        # Run DagCheck
        dag_result = Turbofan::Check::DagCheck.run(pipeline: pipeline)
        all_errors.concat(dag_result.errors)
        all_warnings.concat(dag_result.warnings)

        # Run ResourceCheck
        resources = Turbofan.discover_components[:r
call method · ruby · L4-L45 (42 LOC)
lib/turbofan/cli/check.rb
      def self.call(pipeline_name:, stage:, load_result: nil)
        Turbofan::CLI::Ce.load_all_definitions
        turbofans_root = "turbofans"
        pipeline_file = File.join(turbofans_root, "pipelines", "#{pipeline_name}.rb")
        load_result ||= Turbofan::Deploy::PipelineLoader.load(pipeline_file, turbofans_root: turbofans_root)

        pipeline = load_result.pipeline
        steps = load_result.steps

        all_errors = []
        all_warnings = []

        # Run PipelineCheck
        pipeline_result = Turbofan::Check::PipelineCheck.run(pipeline: pipeline, steps: steps)
        all_errors.concat(pipeline_result.errors)
        all_warnings.concat(pipeline_result.warnings)

        # Run DagCheck
        dag_result = Turbofan::Check::DagCheck.run(pipeline: pipeline)
        all_errors.concat(dag_result.errors)
        all_warnings.concat(dag_result.warnings)

        # Run ResourceCheck
        resources = Turbofan.discover_components[:resources]
        resource_result = 
discover_routers function · ruby · L65-L71 (7 LOC)
lib/turbofan/cli/check.rb
      def self.discover_routers(steps)
        routers = {}
        steps.each_key do |step_name|
          router_path = File.join("turbofans", "steps", step_name.to_s, "router.rb")
          next unless File.exist?(router_path)
          Kernel.load(router_path)
        end
Database class · ruby · L10-L15 (6 LOC)
lib/turbofan/cli/cost.rb
      class Database
        def self.open = raise(LoadError, "duckdb gem not installed")
      end

      Error = Class.new(StandardError) unless defined?(DuckDB::Error)
    end
CLI class · ruby · L20-L121 (102 LOC)
lib/turbofan/cli/cost.rb
  class CLI < Thor
    module Cost
      def self.call(pipeline_name:, stage:)
        dash_name = pipeline_name.tr("_", "-")
        now = Time.now
        billing_period = now.strftime("%Y-%m")

        db = DuckDB::Database.open
        conn = db.connect

        exec_rows = query_executions(conn, dash_name, billing_period)
        step_rows = query_steps(conn, dash_name, billing_period)
        total_rows = query_total(conn, dash_name, billing_period)

        if exec_rows.empty? && step_rows.empty?
          $stdout.puts "No cost data found for #{dash_name} (#{stage})."
          return
        end

        print_header(dash_name, stage, now, billing_period)
        print_executions(exec_rows)
        print_steps(step_rows, total_rows)
        print_total(total_rows, now)
        export_parquet(conn, dash_name, billing_period, now)
      rescue DuckDB::Error => e
        warn e.message
        $stdout.puts "No cost data found for #{dash_name} (#{stage})."
      ensure
        conn
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
call method · ruby · L22-L37 (16 LOC)
lib/turbofan/cli/cost.rb
      def self.call(pipeline_name:, stage:)
        dash_name = pipeline_name.tr("_", "-")
        now = Time.now
        billing_period = now.strftime("%Y-%m")

        db = DuckDB::Database.open
        conn = db.connect

        exec_rows = query_executions(conn, dash_name, billing_period)
        step_rows = query_steps(conn, dash_name, billing_period)
        total_rows = query_total(conn, dash_name, billing_period)

        if exec_rows.empty? && step_rows.empty?
          $stdout.puts "No cost data found for #{dash_name} (#{stage})."
          return
        end
query_executions method · ruby · L51-L67 (17 LOC)
lib/turbofan/cli/cost.rb
      def self.query_executions(conn, pipeline_name, billing_period)
        conn.query(<<~SQL, pipeline_name, billing_period).to_a
          SELECT
            resource_tags_turbofan_pipeline AS pipeline,
            resource_tags_turbofan_execution AS execution,
            SUM(line_item_unblended_cost) AS cost,
            MIN(line_item_usage_start_date) AS min_time,
            MAX(line_item_usage_end_date) AS max_time
          FROM 'cur/*.parquet'
          WHERE resource_tags_turbofan_managed = 'true'
            AND resource_tags_turbofan_pipeline = $1
            AND billing_period = $2
          GROUP BY resource_tags_turbofan_pipeline, resource_tags_turbofan_execution
          ORDER BY min_time DESC
          LIMIT 10
        SQL
      end
query_steps method · ruby · L70-L83 (14 LOC)
lib/turbofan/cli/cost.rb
      def self.query_steps(conn, pipeline_name, billing_period)
        conn.query(<<~SQL, pipeline_name, billing_period).to_a
          SELECT
            resource_tags_turbofan_step AS step,
            SUM(line_item_unblended_cost) AS cost
          FROM 'cur/*.parquet'
          WHERE resource_tags_turbofan_managed = 'true'
            AND resource_tags_turbofan_pipeline = $1
            AND resource_tags_turbofan_execution IS NOT NULL
            AND billing_period = $2
          GROUP BY resource_tags_turbofan_step
          ORDER BY cost DESC
        SQL
      end
query_total method · ruby · L86-L97 (12 LOC)
lib/turbofan/cli/cost.rb
      def self.query_total(conn, pipeline_name, billing_period)
        conn.query(<<~SQL, pipeline_name, billing_period).to_a
          SELECT
            SUM(line_item_unblended_cost) AS cost
          FROM 'cur/*.parquet'
          WHERE resource_tags_turbofan_managed = 'true'
            AND resource_tags_turbofan_pipeline = $1
            AND resource_tags_turbofan_step IS NOT NULL
            AND resource_tags_turbofan_execution IS NOT NULL
            AND billing_period = $2
        SQL
      end
print_header method · ruby · L100-L106 (7 LOC)
lib/turbofan/cli/cost.rb
      def self.print_header(pipeline_name, stage, now, billing_period)
        period_start = "#{billing_period}-01"
        period_end = now.strftime("%Y-%m-%d")
        $stdout.puts "Pipeline: #{pipeline_name} (#{stage})"
        $stdout.puts "Period: #{period_start} to #{period_end}"
        $stdout.puts ""
      end
print_executions method · ruby · L109-L119 (11 LOC)
lib/turbofan/cli/cost.rb
      def self.print_executions(rows)
        $stdout.puts "Recent Executions:"
        rows.each do |row|
          execution = row["execution"]
          cost = format("%.2f", row["cost"])
          min_time = row["min_time"].to_s
          max_time = row["max_time"].to_s
          duration = format_duration(min_time, max_time)
          date_part = min_time[0, 16]
          $stdout.puts "  #{date_part}  #{execution}  $#{cost}  (#{duration})"
        end
print_steps method · ruby · L124-L133 (10 LOC)
lib/turbofan/cli/cost.rb
      def self.print_steps(rows, total_rows)
        total_cost = total_rows.first&.fetch("cost", 0).to_f
        $stdout.puts "By Step (current month):"
        rows.each do |row|
          step_name = row["step"]
          cost = row["cost"].to_f
          cost_str = format("%.2f", cost)
          pct = (total_cost > 0) ? (cost / total_cost * 100).round : 0
          $stdout.puts "  #{step_name}    $#{cost_str}  (#{pct}%)"
        end
print_total function · ruby · L138-L142 (5 LOC)
lib/turbofan/cli/cost.rb
      def self.print_total(total_rows, now)
        total_cost = total_rows.first&.fetch("cost", 0).to_f
        month_name = now.strftime("%b %Y")
        $stdout.puts "Total (#{month_name}):     $#{format("%.2f", total_cost)}"
      end
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
export_parquet function · ruby · L145-L165 (21 LOC)
lib/turbofan/cli/cost.rb
      def self.export_parquet(conn, pipeline_name, billing_period, now)
        filename = "cost-#{now.strftime("%Y-%m-%dT%H%M%S")}.parquet"
        conn.query(<<~SQL, pipeline_name, billing_period)
          CREATE OR REPLACE TEMP VIEW cost_export AS
          SELECT
            resource_tags_turbofan_pipeline AS pipeline,
            resource_tags_turbofan_step AS step,
            resource_tags_turbofan_execution AS execution,
            SUM(line_item_unblended_cost) AS cost
          FROM 'cur/*.parquet'
          WHERE resource_tags_turbofan_managed = 'true'
            AND resource_tags_turbofan_pipeline = $1
            AND billing_period = $2
          GROUP BY ALL
          ORDER BY cost DESC
        SQL
        # COPY TO does not support parameterized filenames; sanitize instead
        safe_filename = filename.gsub(/[^a-zA-Z0-9._\-\/]/, "_")
        conn.query("COPY cost_export TO '#{safe_filename}' (FORMAT PARQUET)")
        $stdout.puts "Saved to #{filename}"
      end
format_duration function · ruby · L168-L179 (12 LOC)
lib/turbofan/cli/cost.rb
      def self.format_duration(min_time_str, max_time_str)
        return "0m" if min_time_str.empty? || max_time_str.empty?
        min_t = Time.parse(min_time_str)
        max_t = Time.parse(max_time_str)
        seconds = (max_t - min_t).to_i
        hours = seconds / 3600
        minutes = (seconds % 3600) / 60
        if hours > 0
          "#{hours}h #{minutes.to_s.rjust(2, "0")}m"
        else
          "#{minutes}m"
        end
CLI class · ruby · L2-L28 (27 LOC)
lib/turbofan/cli/deploy/preflight.rb
  class CLI < Thor
    module Deploy
      module Preflight
        def self.buildkit_available?
          system("docker", "buildx", "version", out: File::NULL, err: File::NULL) == true
        end

        def self.aws_credentials_valid?
          Aws::STS::Client.new.get_caller_identity
          true
        rescue Aws::STS::Errors::ServiceError
          false
        end

        def self.git_clean?
          `git status --porcelain`.strip.empty?
        end

        def self.warn_running_executions(sfn_client, state_machine_arn)
          running = sfn_client.list_executions(state_machine_arn: state_machine_arn, status_filter: "RUNNING")
          if running.executions.any?
            puts "WARNING: #{running.executions.size} execution(s) currently running."
            puts "Running executions will continue with previous job definitions."
          end
        end
      end
    end
page 1 / 9next ›