← back to dataPlor__turbofan

Function bodies 405 total

All specs Real LLM only Function bodies
DagBuilder class · ruby · L100-L144 (45 LOC)
lib/turbofan/dag.rb
  class DagBuilder
    attr_reader :dag

    def initialize
      @dag = Dag.new
      @trigger_input_override = nil
    end

    def trigger_input
      @trigger_input_override || DagProxy.new(:trigger)
    end

    def fan_out(proxy, batch_size: nil, **rest)
      raise ArgumentError, "unknown keyword: group (use batch_size: instead)" if rest.key?(:group)
      raise ArgumentError, "unknown keyword: concurrency (use batch_size: instead)" if rest.key?(:concurrency)
      raise ArgumentError, "unknown keyword(s): #{rest.keys.join(", ")}" if rest.any?
      raise ArgumentError, "fan_out expects a DagProxy, got #{proxy.class}" unless proxy.is_a?(DagProxy)
      raise ArgumentError, "fan_out requires batch_size: parameter" unless batch_size
      raise ArgumentError, "batch_size must be a positive integer" unless batch_size.is_a?(Integer) && batch_size > 0
      step = @dag.steps.find { |s| s.name == proxy.step_name }
      raise ArgumentError, "step :#{proxy.step_name} not found in DAG" 
initialize method · ruby · L103-L106 (4 LOC)
lib/turbofan/dag.rb
    def initialize
      @dag = Dag.new
      @trigger_input_override = nil
    end
trigger_input method · ruby · L108-L110 (3 LOC)
lib/turbofan/dag.rb
    def trigger_input
      @trigger_input_override || DagProxy.new(:trigger)
    end
fan_out method · ruby · L112-L124 (13 LOC)
lib/turbofan/dag.rb
    def fan_out(proxy, batch_size: nil, **rest)
      raise ArgumentError, "unknown keyword: group (use batch_size: instead)" if rest.key?(:group)
      raise ArgumentError, "unknown keyword: concurrency (use batch_size: instead)" if rest.key?(:concurrency)
      raise ArgumentError, "unknown keyword(s): #{rest.keys.join(", ")}" if rest.any?
      raise ArgumentError, "fan_out expects a DagProxy, got #{proxy.class}" unless proxy.is_a?(DagProxy)
      raise ArgumentError, "fan_out requires batch_size: parameter" unless batch_size
      raise ArgumentError, "batch_size must be a positive integer" unless batch_size.is_a?(Integer) && batch_size > 0
      step = @dag.steps.find { |s| s.name == proxy.step_name }
      raise ArgumentError, "step :#{proxy.step_name} not found in DAG" unless step
      step.fan_out = true
      step.batch_size = batch_size
      proxy
    end
validate_unique_name! method · ruby · L128-L132 (5 LOC)
lib/turbofan/dag.rb
    def validate_unique_name!(name)
      return unless @dag.steps.any? { |s| s.name == name }

      raise ArgumentError, "duplicate step name #{name.inspect}"
    end
validate_schema_edge! method · ruby · L134-L139 (6 LOC)
lib/turbofan/dag.rb
    def validate_schema_edge!(source_proxy, target_class)
      return if source_proxy.step_name == :trigger
      unless source_proxy.schema
        raise SchemaIncompatibleError,
          "Step :#{source_proxy.step_name} has no output schema"
      end
start method · ruby · L6-L12 (7 LOC)
lib/turbofan/deploy/execution.rb
      def self.start(sfn_client, state_machine_arn:, input:)
        response = sfn_client.start_execution(
          state_machine_arn: state_machine_arn,
          input: input
        )
        response.execution_arn
      end
Repobility · MCP-ready · https://repobility.com
describe method · ruby · L14-L22 (9 LOC)
lib/turbofan/deploy/execution.rb
      def self.describe(sfn_client, execution_arn:)
        response = sfn_client.describe_execution(execution_arn: execution_arn)
        {
          status: response.status,
          start_date: response.start_date,
          stop_date: response.stop_date,
          name: response.name
        }
      end
wait_for_completion method · ruby · L24-L33 (10 LOC)
lib/turbofan/deploy/execution.rb
      def self.wait_for_completion(sfn_client, execution_arn:, timeout: 600, poll_interval: 10)
        deadline = Time.now + timeout
        attempt = 0
        loop do
          info = describe(sfn_client, execution_arn: execution_arn)
          case info[:status]
          when "SUCCEEDED" then return info
          when "FAILED", "TIMED_OUT", "ABORTED"
            raise "Execution #{info[:status]}: #{execution_arn}"
          end
step_statuses method · ruby · L42-L65 (24 LOC)
lib/turbofan/deploy/execution.rb
      def self.step_statuses(sfn_client, execution_arn:)
        events = collect_events(sfn_client, execution_arn)
        statuses = {}
        current_running = nil

        events.each do |event|
          case event.type
          when "TaskStateEntered"
            name = event.state_entered_event_details.name
            statuses[name] = {
              status: "RUNNING",
              started_at: event.timestamp
            }
            current_running = name
          when "TaskFailed"
            statuses[current_running][:status] = "FAILED" if current_running && statuses[current_running]
          when "TaskStateExited"
            name = event.state_exited_event_details.name
            next unless statuses.key?(name)

            statuses[name][:status] = "SUCCEEDED" unless statuses[name][:status] == "FAILED"
            statuses[name][:ended_at] = event.timestamp
            current_running = nil if current_running == name
          end
collect_events function · ruby · L71-L79 (9 LOC)
lib/turbofan/deploy/execution.rb
      def self.collect_events(sfn_client, execution_arn)
        events = []
        params = {execution_arn: execution_arn, reverse_order: false}
        loop do
          response = sfn_client.get_execution_history(**params)
          events.concat(response.events)
          break unless response.next_token
          params[:next_token] = response.next_token
        end
ImageBuilder class · ruby · L8-L16 (9 LOC)
lib/turbofan/deploy/image_builder.rb
    class ImageBuilder
      def self.content_tag(step_dir, schemas_dir)
        digest = Digest::SHA256.new
        [step_dir, schemas_dir].each do |dir|
          Dir.glob("#{dir}/**/*").select { |f| File.file?(f) }.sort.each do |f|
            digest.update(Pathname.new(f).relative_path_from(dir).to_s)
            digest.update(File.binread(f))
          end
        end
content_tag method · ruby · L9-L15 (7 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.content_tag(step_dir, schemas_dir)
        digest = Digest::SHA256.new
        [step_dir, schemas_dir].each do |dir|
          Dir.glob("#{dir}/**/*").select { |f| File.file?(f) }.sort.each do |f|
            digest.update(Pathname.new(f).relative_path_from(dir).to_s)
            digest.update(File.binread(f))
          end
image_exists? method · ruby · L20-L28 (9 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.image_exists?(ecr_client, repository_name, image_tag)
        ecr_client.describe_images(
          repository_name: repository_name,
          image_ids: [{image_tag: image_tag}]
        )
        true
      rescue Aws::ECR::Errors::ImageNotFoundException
        false
      end
build method · ruby · L30-L35 (6 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.build(step_dir, schemas_dir, tag:, repository_uri:)
        cmd = ["docker", "build", "--build-context", "schemas=#{schemas_dir}"]
        proxy_ca = ENV.fetch("TURBOFAN_PROXY_CA", "/usr/local/share/ca-certificates/proxy-ca.crt")
        if File.exist?(proxy_ca)
          cmd.push("--build-context", "proxy-ca=#{File.dirname(proxy_ca)}")
        end
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
push function · ruby · L43-L45 (3 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.push(tag:, repository_uri:)
        run_cmd("docker", "push", "#{repository_uri}:#{tag}")
      end
authenticate_ecr function · ruby · L47-L59 (13 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.authenticate_ecr(ecr_client)
        auth = ecr_client.get_authorization_token.authorization_data.first
        password = Base64.decode64(auth.authorization_token).split(":").last
        registry = auth.proxy_endpoint

        _out, status = Open3.capture2(
          "docker", "login", "--username", "AWS", "--password-stdin", registry,
          stdin_data: password
        )
        raise "ECR authentication failed" unless status.success?

        registry.sub(%r{\Ahttps?://}, "")
      end
build_and_push function · ruby · L61-L67 (7 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.build_and_push(step_dir:, schemas_dir:, ecr_client:, repository_name:, repository_uri:, tag: nil)
        tag ||= content_tag(step_dir, schemas_dir)

        if image_exists?(ecr_client, repository_name, tag)
          puts "Image #{repository_name}:#{tag} already exists, skipping build"
          return tag
        end
build_and_push_all function · ruby · L74-L83 (10 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.build_and_push_all(step_configs:)
        results = {}
        threads = step_configs.map do |config|
          Thread.new do
            tag = build_and_push(**config)
            [config[:step_dir], tag]
          rescue => e
            step_name = File.basename(config[:step_dir])
            raise "Build failed for step '#{step_name}': #{e.message}"
          end
garbage_collect function · ruby · L92-L100 (9 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.garbage_collect(ecr_client, repository_name, keep:)
        images = []
        params = {repository_name: repository_name}
        loop do
          response = ecr_client.describe_images(**params)
          images.concat(response.image_details)
          break unless response.next_token
          params[:next_token] = response.next_token
        end
run_cmd function · ruby · L115-L118 (4 LOC)
lib/turbofan/deploy/image_builder.rb
      def self.run_cmd(*cmd)
        success = system(*cmd)
        raise "Command failed: #{cmd.first(3).join(" ")}" unless success
      end
PipelineLoader class · ruby · L3-L32 (30 LOC)
lib/turbofan/deploy/pipeline_loader.rb
    class PipelineLoader
      LoadResult = Struct.new(:pipeline, :steps, :step_dirs, keyword_init: true)

      def self.load(pipeline_file, turbofans_root:)
        Turbofan.schemas_path = File.join(turbofans_root, "schemas")

        config_file = File.join(turbofans_root, "config", "turbofan.rb")
        Kernel.load(File.expand_path(config_file)) if File.exist?(config_file)

        raise "Pipeline file not found: #{pipeline_file}" unless File.exist?(pipeline_file)

        before = Set.new
        ObjectSpace.each_object(Class) do |c|
          class_name = Turbofan::GET_CLASS_NAME.bind_call(c)
          before << c if class_name && c < Pipeline
        end

        Kernel.load(File.expand_path(pipeline_file))

        components = Turbofan.discover_components
        new_pipelines = components[:pipelines].values.reject { |c| before.include?(c) }
        raise "No pipeline class found after loading #{pipeline_file}" if new_pipelines.empty?
        raise "Multiple pipeline classes 
load method · ruby · L6-L18 (13 LOC)
lib/turbofan/deploy/pipeline_loader.rb
      def self.load(pipeline_file, turbofans_root:)
        Turbofan.schemas_path = File.join(turbofans_root, "schemas")

        config_file = File.join(turbofans_root, "config", "turbofan.rb")
        Kernel.load(File.expand_path(config_file)) if File.exist?(config_file)

        raise "Pipeline file not found: #{pipeline_file}" unless File.exist?(pipeline_file)

        before = Set.new
        ObjectSpace.each_object(Class) do |c|
          class_name = Turbofan::GET_CLASS_NAME.bind_call(c)
          before << c if class_name && c < Pipeline
        end
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
resolve_steps method · ruby · L34-L44 (11 LOC)
lib/turbofan/deploy/pipeline_loader.rb
      def self.resolve_steps(dag, components, turbofans_root)
        steps = {}
        step_dirs = {}
        dag.steps.each do |dag_step|
          step_dir = File.join(turbofans_root, "steps", dag_step.name.to_s)
          klass = components[:steps][dag_step.name]
          raise "No loaded class for step :#{dag_step.name}" unless klass
          raise "Step directory not found: #{step_dir}" unless klass.turbofan_external? || Dir.exist?(step_dir)
          steps[dag_step.name] = klass
          step_dirs[dag_step.name] = step_dir
        end
StackManager class · ruby · L5-L34 (30 LOC)
lib/turbofan/deploy/stack_manager.rb
    class StackManager
      UPDATABLE_STATES = %i[create_complete update_complete update_rollback_complete].freeze
      CHANGESET_TIMEOUT = 300
      STACK_TIMEOUT = 1800

      def self.backoff_sleep(attempt, base:, max:, jitter: true)
        delay = [base * (2**attempt), max].min
        delay += rand(0.0..1.0) if jitter
        sleep(delay)
      end

      def self.stack_output(cf_client, stack_name, output_key)
        outputs = cf_client.describe_stacks(stack_name: stack_name).stacks.first.outputs
        entry = outputs.find { |o| o.output_key == output_key }
        raise "Output #{output_key} not found on stack #{stack_name}" unless entry
        entry.output_value
      end

      def self.detect_state(cf_client, stack_name)
        response = cf_client.describe_stacks(stack_name: stack_name)
        status = response.stacks.first.stack_status

        if status.end_with?("_IN_PROGRESS")
          :in_progress
        else
          status.downcase.to_sym
        end
     
backoff_sleep method · ruby · L10-L14 (5 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.backoff_sleep(attempt, base:, max:, jitter: true)
        delay = [base * (2**attempt), max].min
        delay += rand(0.0..1.0) if jitter
        sleep(delay)
      end
stack_output method · ruby · L16-L21 (6 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.stack_output(cf_client, stack_name, output_key)
        outputs = cf_client.describe_stacks(stack_name: stack_name).stacks.first.outputs
        entry = outputs.find { |o| o.output_key == output_key }
        raise "Output #{output_key} not found on stack #{stack_name}" unless entry
        entry.output_value
      end
detect_state method · ruby · L23-L31 (9 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.detect_state(cf_client, stack_name)
        response = cf_client.describe_stacks(stack_name: stack_name)
        status = response.stacks.first.stack_status

        if status.end_with?("_IN_PROGRESS")
          :in_progress
        else
          status.downcase.to_sym
        end
deploy method · ruby · L36-L52 (17 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.deploy(cf_client, stack_name:, template_body:, parameters: [], s3_client: nil, artifacts: [])
        state = detect_state(cf_client, stack_name)

        case state
        when :does_not_exist
          changeset_type = "CREATE"
        when *UPDATABLE_STATES
          changeset_type = "UPDATE"
        when :rollback_complete, :delete_failed
          cf_client.delete_stack(stack_name: stack_name)
          wait_for_stack(cf_client, stack_name: stack_name, target_states: ["DELETE_COMPLETE"])
          changeset_type = "CREATE"
        when :in_progress
          raise "Another operation is in progress on stack #{stack_name}"
        else
          raise "Unhandled stack state: #{state} for #{stack_name}"
        end
create_changeset function · ruby · L79-L90 (12 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.create_changeset(cf_client, stack_name:, template_body:, type:, parameters: [], s3_client: nil)
        changeset_name = "turbofan-deploy-#{Time.now.to_i}"
        template_param = if template_body.bytesize > 51_200
          s3 = s3_client || Aws::S3::Client.new
          bucket = Turbofan.config.bucket
          key = "turbofan-cfn-templates/#{stack_name}/#{changeset_name}.json"
          s3.put_object(bucket: bucket, key: key, body: template_body)
          region = cf_client.config.region
          {template_url: "https://#{bucket}.s3.#{region}.amazonaws.com/#{key}"}
        else
          {template_body: template_body}
        end
wait_for_changeset function · ruby · L103-L114 (12 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.wait_for_changeset(cf_client, stack_name:, changeset_name:)
        deadline = Time.now + CHANGESET_TIMEOUT
        attempt = 0
        loop do
          raise "Timed out waiting for changeset #{changeset_name}" if Time.now > deadline
          response = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name)
          case response.status
          when "CREATE_COMPLETE"
            return :create_complete
          when "FAILED"
            return :failed
          end
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
wait_for_stack function · ruby · L120-L131 (12 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.wait_for_stack(cf_client, stack_name:, target_states:)
        deadline = Time.now + STACK_TIMEOUT
        attempt = 0
        loop do
          raise "Timed out waiting for stack #{stack_name}" if Time.now > deadline
          response = cf_client.describe_stacks(stack_name: stack_name)
          status = response.stacks.first.stack_status
          return if target_states.include?(status)
          if status.match?(/_FAILED|ROLLBACK_COMPLETE/)
            reason = failure_reason(cf_client, stack_name)
            raise "Stack #{stack_name} entered failure state: #{status}#{reason}"
          end
failure_reason function · ruby · L140-L151 (12 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.failure_reason(cf_client, stack_name)
        events = cf_client.describe_stack_events(stack_name: stack_name).stack_events
        failed = events.select { |e|
          e.resource_status&.include?("_FAILED") &&
            e.resource_status_reason && !e.resource_status_reason.match?(/cancelled/i)
        }
        return "" if failed.empty?
        reasons = failed.first(5).map { |e| "#{e.logical_resource_id}: #{e.resource_status_reason}" }
        "\n  #{reasons.join("\n  ")}"
      rescue StandardError
        ""
      end
dry_run function · ruby · L154-L164 (11 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.dry_run(cf_client, stack_name:, template_body:)
        state = detect_state(cf_client, stack_name)
        changeset_type = (state == :does_not_exist) ? "CREATE" : "UPDATE"
        changeset_name = create_changeset(cf_client, stack_name: stack_name, template_body: template_body, type: changeset_type)
        result = wait_for_changeset(cf_client, stack_name: stack_name, changeset_name: changeset_name)
        if result == :failed
          reason = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name).status_reason
          puts "  Changeset failed: #{reason}"
        else
          describe_changes(cf_client, stack_name: stack_name, changeset_name: changeset_name)
        end
describe_changes function · ruby · L168-L173 (6 LOC)
lib/turbofan/deploy/stack_manager.rb
      def self.describe_changes(cf_client, stack_name:, changeset_name:)
        response = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name)
        response.changes.each do |change|
          rc = change.resource_change
          puts "  #{rc.action}  #{rc.resource_type}  #{rc.logical_resource_id}"
        end
version method · ruby · L10-L12 (3 LOC)
lib/turbofan/extensions.rb
    def self.version
      "v#{Turbofan.config.duckdb_version}"
    end
repo_url method · ruby · L14-L17 (4 LOC)
lib/turbofan/extensions.rb
    def self.repo_url(ext)
      repo = COMMUNITY.include?(ext.to_sym) ? COMMUNITY_REPO : CORE_REPO
      "#{repo}/#{version}/#{PLATFORM}/#{ext}.duckdb_extension.gz"
    end
install_path method · ruby · L19-L21 (3 LOC)
lib/turbofan/extensions.rb
    def self.install_path
      "/root/.duckdb/extensions/#{version}/#{PLATFORM}"
    end
ASL class · ruby · L5-L55 (51 LOC)
lib/turbofan/generators/asl.rb
    class ASL
      BATCH_RESOURCE = "arn:aws:states:::batch:submitJob.sync"
      SNS_RESOURCE = "arn:aws:states:::sns:publish"
      # Maximum items per Batch ArrayProperties. Not enforced here;
      # the chunking Lambda is responsible for respecting this limit.
      MAX_ARRAY_SIZE = 10_000

      def initialize(pipeline:, stage:, steps: {})
        @pipeline = pipeline
        @stage = stage
        @steps = steps
      end

      def generate
        dag = @pipeline.turbofan_dag
        sorted = dag.sorted_steps
        pipeline_name = @pipeline.turbofan_name

        prefix = "turbofan-#{pipeline_name}-#{@stage}"
        topic_arn = "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:#{prefix}-notifications"

        # Detect forks (steps with >1 children) and compute join points
        forks = {}     # fork_name => [branch_child_names]
        join_info = {} # join_name => [branch_child_names]

        sorted.each_with_index do |step, idx|
          children = dag.children_of(step.
Repobility · MCP-ready · https://repobility.com
initialize method · ruby · L12-L16 (5 LOC)
lib/turbofan/generators/asl.rb
      def initialize(pipeline:, stage:, steps: {})
        @pipeline = pipeline
        @stage = stage
        @steps = steps
      end
generate method · ruby · L18-L37 (20 LOC)
lib/turbofan/generators/asl.rb
      def generate
        dag = @pipeline.turbofan_dag
        sorted = dag.sorted_steps
        pipeline_name = @pipeline.turbofan_name

        prefix = "turbofan-#{pipeline_name}-#{@stage}"
        topic_arn = "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:#{prefix}-notifications"

        # Detect forks (steps with >1 children) and compute join points
        forks = {}     # fork_name => [branch_child_names]
        join_info = {} # join_name => [branch_child_names]

        sorted.each_with_index do |step, idx|
          children = dag.children_of(step.name)
          next unless children.size > 1

          forks[step.name] = children
          join_step = find_join_point(dag, children, sorted, idx)
          join_info[join_step.name] = children if join_step
        end
to_json function · ruby · L154-L156 (3 LOC)
lib/turbofan/generators/asl.rb
      def to_json(*)
        JSON.generate(generate)
      end
base_env function · ruby · L160-L170 (11 LOC)
lib/turbofan/generators/asl.rb
      def base_env(pipeline_name)
        [
          {"Name" => "TURBOFAN_EXECUTION_ID", "Value.$" => "$$.Execution.Id"},
          {"Name" => "TURBOFAN_STAGE", "Value" => @stage},
          {"Name" => "TURBOFAN_PIPELINE", "Value" => pipeline_name},
          {"Name" => "TURBOFAN_BUCKET", "Value" => Turbofan.config.bucket},
          {"Name" => "TURBOFAN_BUCKET_PREFIX", "Value" => Naming.bucket_prefix(pipeline_name, @stage)},
          {"Name" => "AWS_REGION", "Value" => "${AWS::Region}"},
          {"Name" => "AWS_DEFAULT_REGION", "Value" => "${AWS::Region}"}
        ]
      end
resolve_job_refs function · ruby · L172-L178 (7 LOC)
lib/turbofan/generators/asl.rb
      def resolve_job_refs(prefix, step_name)
        step_class = @steps[step_name]
        suffix = if step_class&.turbofan_sizes&.any?
          "#{step_name}-#{step_class.turbofan_sizes.keys.first}"
        else
          step_name
        end
find_prev function · ruby · L185-L189 (5 LOC)
lib/turbofan/generators/asl.rb
      def find_prev(sorted, index, visited)
        return nil if index == 0
        (index - 1).downto(0).each do |i|
          return sorted[i] unless visited.include?(sorted[i].name)
        end
find_join_point function · ruby · L193-L204 (12 LOC)
lib/turbofan/generators/asl.rb
      def find_join_point(dag, fork_children, sorted, fork_index)
        # Count fork children that have at least one descendant (non-dead-end branches)
        active_children = fork_children.count { |child| dag.children_of(child).any? }
        active_children = 1 if active_children == 0

        sorted[(fork_index + 1)..].each do |step|
          next if fork_children.include?(step.name)

          ancestors = all_ancestors(dag, step.name)
          fork_ancestor_count = fork_children.count { |child| ancestors.include?(child) }
          return step if fork_ancestor_count >= active_children
        end
all_ancestors function · ruby · L208-L215 (8 LOC)
lib/turbofan/generators/asl.rb
      def all_ancestors(dag, step_name)
        visited = Set.new
        queue = dag.parents_of(step_name).dup
        while (parent = queue.shift)
          next if visited.include?(parent)
          visited << parent
          queue.concat(dag.parents_of(parent))
        end
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
branch_steps_for function · ruby · L219-L226 (8 LOC)
lib/turbofan/generators/asl.rb
      def branch_steps_for(dag, branch_start, join_point, sorted)
        reachable = Set.new
        queue = [branch_start]
        while (step_name = queue.shift)
          next if reachable.include?(step_name) || step_name == join_point
          reachable << step_name
          dag.children_of(step_name).each { |c| queue << c }
        end
build_branch_chain function · ruby · L230-L247 (18 LOC)
lib/turbofan/generators/asl.rb
      def build_branch_chain(pipeline_name, chain, fork_step_name)
        states = {}
        chain.each_with_index do |step, idx|
          last_in_branch = (idx == chain.size - 1)
          prev_step_name = (idx == 0) ? fork_step_name : chain[idx - 1].name
          state = build_branch_state(pipeline_name, step, prev_step_name)
          if last_in_branch
            state["End"] = true
          else
            state.delete("End")
            state["Next"] = chain[idx + 1].name.to_s
            state["ResultSelector"] = {
              "JobId.$" => "$.JobId",
              "JobName.$" => "$.JobName",
              "Status.$" => "$.Status"
            }
            state["ResultPath"] = "$.steps.#{step.name}"
          end
notification_states function · ruby · L256-L277 (22 LOC)
lib/turbofan/generators/asl.rb
      def notification_states(topic_arn, pipeline_name)
        {
          "NotifySuccess" => {
            "Type" => "Task",
            "Resource" => SNS_RESOURCE,
            "Parameters" => {
              "TopicArn" => topic_arn,
              "Message" => "Pipeline #{pipeline_name} completed successfully."
            },
            "End" => true
          },
          "NotifyFailure" => {
            "Type" => "Task",
            "Resource" => SNS_RESOURCE,
            "Parameters" => {
              "TopicArn" => topic_arn,
              "Message" => "Pipeline #{pipeline_name} failed."
            },
            "End" => true
          }
        }
      end
‹ prevpage 4 / 9next ›