Function bodies 405 total
DagBuilder class · ruby · L100-L144 (45 LOC)lib/turbofan/dag.rb
class DagBuilder
attr_reader :dag
def initialize
@dag = Dag.new
@trigger_input_override = nil
end
def trigger_input
@trigger_input_override || DagProxy.new(:trigger)
end
def fan_out(proxy, batch_size: nil, **rest)
raise ArgumentError, "unknown keyword: group (use batch_size: instead)" if rest.key?(:group)
raise ArgumentError, "unknown keyword: concurrency (use batch_size: instead)" if rest.key?(:concurrency)
raise ArgumentError, "unknown keyword(s): #{rest.keys.join(", ")}" if rest.any?
raise ArgumentError, "fan_out expects a DagProxy, got #{proxy.class}" unless proxy.is_a?(DagProxy)
raise ArgumentError, "fan_out requires batch_size: parameter" unless batch_size
raise ArgumentError, "batch_size must be a positive integer" unless batch_size.is_a?(Integer) && batch_size > 0
step = @dag.steps.find { |s| s.name == proxy.step_name }
raise ArgumentError, "step :#{proxy.step_name} not found in DAG" initialize method · ruby · L103-L106 (4 LOC)lib/turbofan/dag.rb
def initialize
@dag = Dag.new
@trigger_input_override = nil
endtrigger_input method · ruby · L108-L110 (3 LOC)lib/turbofan/dag.rb
def trigger_input
@trigger_input_override || DagProxy.new(:trigger)
endfan_out method · ruby · L112-L124 (13 LOC)lib/turbofan/dag.rb
def fan_out(proxy, batch_size: nil, **rest)
raise ArgumentError, "unknown keyword: group (use batch_size: instead)" if rest.key?(:group)
raise ArgumentError, "unknown keyword: concurrency (use batch_size: instead)" if rest.key?(:concurrency)
raise ArgumentError, "unknown keyword(s): #{rest.keys.join(", ")}" if rest.any?
raise ArgumentError, "fan_out expects a DagProxy, got #{proxy.class}" unless proxy.is_a?(DagProxy)
raise ArgumentError, "fan_out requires batch_size: parameter" unless batch_size
raise ArgumentError, "batch_size must be a positive integer" unless batch_size.is_a?(Integer) && batch_size > 0
step = @dag.steps.find { |s| s.name == proxy.step_name }
raise ArgumentError, "step :#{proxy.step_name} not found in DAG" unless step
step.fan_out = true
step.batch_size = batch_size
proxy
endvalidate_unique_name! method · ruby · L128-L132 (5 LOC)lib/turbofan/dag.rb
def validate_unique_name!(name)
return unless @dag.steps.any? { |s| s.name == name }
raise ArgumentError, "duplicate step name #{name.inspect}"
endvalidate_schema_edge! method · ruby · L134-L139 (6 LOC)lib/turbofan/dag.rb
def validate_schema_edge!(source_proxy, target_class)
return if source_proxy.step_name == :trigger
unless source_proxy.schema
raise SchemaIncompatibleError,
"Step :#{source_proxy.step_name} has no output schema"
endstart method · ruby · L6-L12 (7 LOC)lib/turbofan/deploy/execution.rb
def self.start(sfn_client, state_machine_arn:, input:)
response = sfn_client.start_execution(
state_machine_arn: state_machine_arn,
input: input
)
response.execution_arn
endRepobility · MCP-ready · https://repobility.com
describe method · ruby · L14-L22 (9 LOC)lib/turbofan/deploy/execution.rb
def self.describe(sfn_client, execution_arn:)
response = sfn_client.describe_execution(execution_arn: execution_arn)
{
status: response.status,
start_date: response.start_date,
stop_date: response.stop_date,
name: response.name
}
endwait_for_completion method · ruby · L24-L33 (10 LOC)lib/turbofan/deploy/execution.rb
def self.wait_for_completion(sfn_client, execution_arn:, timeout: 600, poll_interval: 10)
deadline = Time.now + timeout
attempt = 0
loop do
info = describe(sfn_client, execution_arn: execution_arn)
case info[:status]
when "SUCCEEDED" then return info
when "FAILED", "TIMED_OUT", "ABORTED"
raise "Execution #{info[:status]}: #{execution_arn}"
endstep_statuses method · ruby · L42-L65 (24 LOC)lib/turbofan/deploy/execution.rb
def self.step_statuses(sfn_client, execution_arn:)
events = collect_events(sfn_client, execution_arn)
statuses = {}
current_running = nil
events.each do |event|
case event.type
when "TaskStateEntered"
name = event.state_entered_event_details.name
statuses[name] = {
status: "RUNNING",
started_at: event.timestamp
}
current_running = name
when "TaskFailed"
statuses[current_running][:status] = "FAILED" if current_running && statuses[current_running]
when "TaskStateExited"
name = event.state_exited_event_details.name
next unless statuses.key?(name)
statuses[name][:status] = "SUCCEEDED" unless statuses[name][:status] == "FAILED"
statuses[name][:ended_at] = event.timestamp
current_running = nil if current_running == name
endcollect_events function · ruby · L71-L79 (9 LOC)lib/turbofan/deploy/execution.rb
def self.collect_events(sfn_client, execution_arn)
events = []
params = {execution_arn: execution_arn, reverse_order: false}
loop do
response = sfn_client.get_execution_history(**params)
events.concat(response.events)
break unless response.next_token
params[:next_token] = response.next_token
endImageBuilder class · ruby · L8-L16 (9 LOC)lib/turbofan/deploy/image_builder.rb
class ImageBuilder
def self.content_tag(step_dir, schemas_dir)
digest = Digest::SHA256.new
[step_dir, schemas_dir].each do |dir|
Dir.glob("#{dir}/**/*").select { |f| File.file?(f) }.sort.each do |f|
digest.update(Pathname.new(f).relative_path_from(dir).to_s)
digest.update(File.binread(f))
end
endcontent_tag method · ruby · L9-L15 (7 LOC)lib/turbofan/deploy/image_builder.rb
def self.content_tag(step_dir, schemas_dir)
digest = Digest::SHA256.new
[step_dir, schemas_dir].each do |dir|
Dir.glob("#{dir}/**/*").select { |f| File.file?(f) }.sort.each do |f|
digest.update(Pathname.new(f).relative_path_from(dir).to_s)
digest.update(File.binread(f))
endimage_exists? method · ruby · L20-L28 (9 LOC)lib/turbofan/deploy/image_builder.rb
def self.image_exists?(ecr_client, repository_name, image_tag)
ecr_client.describe_images(
repository_name: repository_name,
image_ids: [{image_tag: image_tag}]
)
true
rescue Aws::ECR::Errors::ImageNotFoundException
false
endbuild method · ruby · L30-L35 (6 LOC)lib/turbofan/deploy/image_builder.rb
def self.build(step_dir, schemas_dir, tag:, repository_uri:)
cmd = ["docker", "build", "--build-context", "schemas=#{schemas_dir}"]
proxy_ca = ENV.fetch("TURBOFAN_PROXY_CA", "/usr/local/share/ca-certificates/proxy-ca.crt")
if File.exist?(proxy_ca)
cmd.push("--build-context", "proxy-ca=#{File.dirname(proxy_ca)}")
endRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
push function · ruby · L43-L45 (3 LOC)lib/turbofan/deploy/image_builder.rb
def self.push(tag:, repository_uri:)
run_cmd("docker", "push", "#{repository_uri}:#{tag}")
endauthenticate_ecr function · ruby · L47-L59 (13 LOC)lib/turbofan/deploy/image_builder.rb
def self.authenticate_ecr(ecr_client)
auth = ecr_client.get_authorization_token.authorization_data.first
password = Base64.decode64(auth.authorization_token).split(":").last
registry = auth.proxy_endpoint
_out, status = Open3.capture2(
"docker", "login", "--username", "AWS", "--password-stdin", registry,
stdin_data: password
)
raise "ECR authentication failed" unless status.success?
registry.sub(%r{\Ahttps?://}, "")
endbuild_and_push function · ruby · L61-L67 (7 LOC)lib/turbofan/deploy/image_builder.rb
def self.build_and_push(step_dir:, schemas_dir:, ecr_client:, repository_name:, repository_uri:, tag: nil)
tag ||= content_tag(step_dir, schemas_dir)
if image_exists?(ecr_client, repository_name, tag)
puts "Image #{repository_name}:#{tag} already exists, skipping build"
return tag
endbuild_and_push_all function · ruby · L74-L83 (10 LOC)lib/turbofan/deploy/image_builder.rb
def self.build_and_push_all(step_configs:)
results = {}
threads = step_configs.map do |config|
Thread.new do
tag = build_and_push(**config)
[config[:step_dir], tag]
rescue => e
step_name = File.basename(config[:step_dir])
raise "Build failed for step '#{step_name}': #{e.message}"
endgarbage_collect function · ruby · L92-L100 (9 LOC)lib/turbofan/deploy/image_builder.rb
def self.garbage_collect(ecr_client, repository_name, keep:)
images = []
params = {repository_name: repository_name}
loop do
response = ecr_client.describe_images(**params)
images.concat(response.image_details)
break unless response.next_token
params[:next_token] = response.next_token
endrun_cmd function · ruby · L115-L118 (4 LOC)lib/turbofan/deploy/image_builder.rb
def self.run_cmd(*cmd)
success = system(*cmd)
raise "Command failed: #{cmd.first(3).join(" ")}" unless success
endPipelineLoader class · ruby · L3-L32 (30 LOC)lib/turbofan/deploy/pipeline_loader.rb
class PipelineLoader
LoadResult = Struct.new(:pipeline, :steps, :step_dirs, keyword_init: true)
def self.load(pipeline_file, turbofans_root:)
Turbofan.schemas_path = File.join(turbofans_root, "schemas")
config_file = File.join(turbofans_root, "config", "turbofan.rb")
Kernel.load(File.expand_path(config_file)) if File.exist?(config_file)
raise "Pipeline file not found: #{pipeline_file}" unless File.exist?(pipeline_file)
before = Set.new
ObjectSpace.each_object(Class) do |c|
class_name = Turbofan::GET_CLASS_NAME.bind_call(c)
before << c if class_name && c < Pipeline
end
Kernel.load(File.expand_path(pipeline_file))
components = Turbofan.discover_components
new_pipelines = components[:pipelines].values.reject { |c| before.include?(c) }
raise "No pipeline class found after loading #{pipeline_file}" if new_pipelines.empty?
raise "Multiple pipeline classes load method · ruby · L6-L18 (13 LOC)lib/turbofan/deploy/pipeline_loader.rb
def self.load(pipeline_file, turbofans_root:)
Turbofan.schemas_path = File.join(turbofans_root, "schemas")
config_file = File.join(turbofans_root, "config", "turbofan.rb")
Kernel.load(File.expand_path(config_file)) if File.exist?(config_file)
raise "Pipeline file not found: #{pipeline_file}" unless File.exist?(pipeline_file)
before = Set.new
ObjectSpace.each_object(Class) do |c|
class_name = Turbofan::GET_CLASS_NAME.bind_call(c)
before << c if class_name && c < Pipeline
endCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
resolve_steps method · ruby · L34-L44 (11 LOC)lib/turbofan/deploy/pipeline_loader.rb
def self.resolve_steps(dag, components, turbofans_root)
steps = {}
step_dirs = {}
dag.steps.each do |dag_step|
step_dir = File.join(turbofans_root, "steps", dag_step.name.to_s)
klass = components[:steps][dag_step.name]
raise "No loaded class for step :#{dag_step.name}" unless klass
raise "Step directory not found: #{step_dir}" unless klass.turbofan_external? || Dir.exist?(step_dir)
steps[dag_step.name] = klass
step_dirs[dag_step.name] = step_dir
endStackManager class · ruby · L5-L34 (30 LOC)lib/turbofan/deploy/stack_manager.rb
class StackManager
UPDATABLE_STATES = %i[create_complete update_complete update_rollback_complete].freeze
CHANGESET_TIMEOUT = 300
STACK_TIMEOUT = 1800
def self.backoff_sleep(attempt, base:, max:, jitter: true)
delay = [base * (2**attempt), max].min
delay += rand(0.0..1.0) if jitter
sleep(delay)
end
def self.stack_output(cf_client, stack_name, output_key)
outputs = cf_client.describe_stacks(stack_name: stack_name).stacks.first.outputs
entry = outputs.find { |o| o.output_key == output_key }
raise "Output #{output_key} not found on stack #{stack_name}" unless entry
entry.output_value
end
def self.detect_state(cf_client, stack_name)
response = cf_client.describe_stacks(stack_name: stack_name)
status = response.stacks.first.stack_status
if status.end_with?("_IN_PROGRESS")
:in_progress
else
status.downcase.to_sym
end
backoff_sleep method · ruby · L10-L14 (5 LOC)lib/turbofan/deploy/stack_manager.rb
def self.backoff_sleep(attempt, base:, max:, jitter: true)
delay = [base * (2**attempt), max].min
delay += rand(0.0..1.0) if jitter
sleep(delay)
endstack_output method · ruby · L16-L21 (6 LOC)lib/turbofan/deploy/stack_manager.rb
def self.stack_output(cf_client, stack_name, output_key)
outputs = cf_client.describe_stacks(stack_name: stack_name).stacks.first.outputs
entry = outputs.find { |o| o.output_key == output_key }
raise "Output #{output_key} not found on stack #{stack_name}" unless entry
entry.output_value
enddetect_state method · ruby · L23-L31 (9 LOC)lib/turbofan/deploy/stack_manager.rb
def self.detect_state(cf_client, stack_name)
response = cf_client.describe_stacks(stack_name: stack_name)
status = response.stacks.first.stack_status
if status.end_with?("_IN_PROGRESS")
:in_progress
else
status.downcase.to_sym
enddeploy method · ruby · L36-L52 (17 LOC)lib/turbofan/deploy/stack_manager.rb
def self.deploy(cf_client, stack_name:, template_body:, parameters: [], s3_client: nil, artifacts: [])
state = detect_state(cf_client, stack_name)
case state
when :does_not_exist
changeset_type = "CREATE"
when *UPDATABLE_STATES
changeset_type = "UPDATE"
when :rollback_complete, :delete_failed
cf_client.delete_stack(stack_name: stack_name)
wait_for_stack(cf_client, stack_name: stack_name, target_states: ["DELETE_COMPLETE"])
changeset_type = "CREATE"
when :in_progress
raise "Another operation is in progress on stack #{stack_name}"
else
raise "Unhandled stack state: #{state} for #{stack_name}"
endcreate_changeset function · ruby · L79-L90 (12 LOC)lib/turbofan/deploy/stack_manager.rb
def self.create_changeset(cf_client, stack_name:, template_body:, type:, parameters: [], s3_client: nil)
changeset_name = "turbofan-deploy-#{Time.now.to_i}"
template_param = if template_body.bytesize > 51_200
s3 = s3_client || Aws::S3::Client.new
bucket = Turbofan.config.bucket
key = "turbofan-cfn-templates/#{stack_name}/#{changeset_name}.json"
s3.put_object(bucket: bucket, key: key, body: template_body)
region = cf_client.config.region
{template_url: "https://#{bucket}.s3.#{region}.amazonaws.com/#{key}"}
else
{template_body: template_body}
endwait_for_changeset function · ruby · L103-L114 (12 LOC)lib/turbofan/deploy/stack_manager.rb
def self.wait_for_changeset(cf_client, stack_name:, changeset_name:)
deadline = Time.now + CHANGESET_TIMEOUT
attempt = 0
loop do
raise "Timed out waiting for changeset #{changeset_name}" if Time.now > deadline
response = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name)
case response.status
when "CREATE_COMPLETE"
return :create_complete
when "FAILED"
return :failed
endProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
wait_for_stack function · ruby · L120-L131 (12 LOC)lib/turbofan/deploy/stack_manager.rb
def self.wait_for_stack(cf_client, stack_name:, target_states:)
deadline = Time.now + STACK_TIMEOUT
attempt = 0
loop do
raise "Timed out waiting for stack #{stack_name}" if Time.now > deadline
response = cf_client.describe_stacks(stack_name: stack_name)
status = response.stacks.first.stack_status
return if target_states.include?(status)
if status.match?(/_FAILED|ROLLBACK_COMPLETE/)
reason = failure_reason(cf_client, stack_name)
raise "Stack #{stack_name} entered failure state: #{status}#{reason}"
endfailure_reason function · ruby · L140-L151 (12 LOC)lib/turbofan/deploy/stack_manager.rb
def self.failure_reason(cf_client, stack_name)
events = cf_client.describe_stack_events(stack_name: stack_name).stack_events
failed = events.select { |e|
e.resource_status&.include?("_FAILED") &&
e.resource_status_reason && !e.resource_status_reason.match?(/cancelled/i)
}
return "" if failed.empty?
reasons = failed.first(5).map { |e| "#{e.logical_resource_id}: #{e.resource_status_reason}" }
"\n #{reasons.join("\n ")}"
rescue StandardError
""
enddry_run function · ruby · L154-L164 (11 LOC)lib/turbofan/deploy/stack_manager.rb
def self.dry_run(cf_client, stack_name:, template_body:)
state = detect_state(cf_client, stack_name)
changeset_type = (state == :does_not_exist) ? "CREATE" : "UPDATE"
changeset_name = create_changeset(cf_client, stack_name: stack_name, template_body: template_body, type: changeset_type)
result = wait_for_changeset(cf_client, stack_name: stack_name, changeset_name: changeset_name)
if result == :failed
reason = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name).status_reason
puts " Changeset failed: #{reason}"
else
describe_changes(cf_client, stack_name: stack_name, changeset_name: changeset_name)
enddescribe_changes function · ruby · L168-L173 (6 LOC)lib/turbofan/deploy/stack_manager.rb
def self.describe_changes(cf_client, stack_name:, changeset_name:)
response = cf_client.describe_change_set(stack_name: stack_name, change_set_name: changeset_name)
response.changes.each do |change|
rc = change.resource_change
puts " #{rc.action} #{rc.resource_type} #{rc.logical_resource_id}"
endversion method · ruby · L10-L12 (3 LOC)lib/turbofan/extensions.rb
def self.version
"v#{Turbofan.config.duckdb_version}"
endrepo_url method · ruby · L14-L17 (4 LOC)lib/turbofan/extensions.rb
def self.repo_url(ext)
repo = COMMUNITY.include?(ext.to_sym) ? COMMUNITY_REPO : CORE_REPO
"#{repo}/#{version}/#{PLATFORM}/#{ext}.duckdb_extension.gz"
endinstall_path method · ruby · L19-L21 (3 LOC)lib/turbofan/extensions.rb
def self.install_path
"/root/.duckdb/extensions/#{version}/#{PLATFORM}"
endASL class · ruby · L5-L55 (51 LOC)lib/turbofan/generators/asl.rb
class ASL
BATCH_RESOURCE = "arn:aws:states:::batch:submitJob.sync"
SNS_RESOURCE = "arn:aws:states:::sns:publish"
# Maximum items per Batch ArrayProperties. Not enforced here;
# the chunking Lambda is responsible for respecting this limit.
MAX_ARRAY_SIZE = 10_000
def initialize(pipeline:, stage:, steps: {})
@pipeline = pipeline
@stage = stage
@steps = steps
end
def generate
dag = @pipeline.turbofan_dag
sorted = dag.sorted_steps
pipeline_name = @pipeline.turbofan_name
prefix = "turbofan-#{pipeline_name}-#{@stage}"
topic_arn = "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:#{prefix}-notifications"
# Detect forks (steps with >1 children) and compute join points
forks = {} # fork_name => [branch_child_names]
join_info = {} # join_name => [branch_child_names]
sorted.each_with_index do |step, idx|
children = dag.children_of(step.Repobility · MCP-ready · https://repobility.com
initialize method · ruby · L12-L16 (5 LOC)lib/turbofan/generators/asl.rb
def initialize(pipeline:, stage:, steps: {})
@pipeline = pipeline
@stage = stage
@steps = steps
endgenerate method · ruby · L18-L37 (20 LOC)lib/turbofan/generators/asl.rb
def generate
dag = @pipeline.turbofan_dag
sorted = dag.sorted_steps
pipeline_name = @pipeline.turbofan_name
prefix = "turbofan-#{pipeline_name}-#{@stage}"
topic_arn = "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:#{prefix}-notifications"
# Detect forks (steps with >1 children) and compute join points
forks = {} # fork_name => [branch_child_names]
join_info = {} # join_name => [branch_child_names]
sorted.each_with_index do |step, idx|
children = dag.children_of(step.name)
next unless children.size > 1
forks[step.name] = children
join_step = find_join_point(dag, children, sorted, idx)
join_info[join_step.name] = children if join_step
endto_json function · ruby · L154-L156 (3 LOC)lib/turbofan/generators/asl.rb
def to_json(*)
JSON.generate(generate)
endbase_env function · ruby · L160-L170 (11 LOC)lib/turbofan/generators/asl.rb
def base_env(pipeline_name)
[
{"Name" => "TURBOFAN_EXECUTION_ID", "Value.$" => "$$.Execution.Id"},
{"Name" => "TURBOFAN_STAGE", "Value" => @stage},
{"Name" => "TURBOFAN_PIPELINE", "Value" => pipeline_name},
{"Name" => "TURBOFAN_BUCKET", "Value" => Turbofan.config.bucket},
{"Name" => "TURBOFAN_BUCKET_PREFIX", "Value" => Naming.bucket_prefix(pipeline_name, @stage)},
{"Name" => "AWS_REGION", "Value" => "${AWS::Region}"},
{"Name" => "AWS_DEFAULT_REGION", "Value" => "${AWS::Region}"}
]
endresolve_job_refs function · ruby · L172-L178 (7 LOC)lib/turbofan/generators/asl.rb
def resolve_job_refs(prefix, step_name)
step_class = @steps[step_name]
suffix = if step_class&.turbofan_sizes&.any?
"#{step_name}-#{step_class.turbofan_sizes.keys.first}"
else
step_name
endfind_prev function · ruby · L185-L189 (5 LOC)lib/turbofan/generators/asl.rb
def find_prev(sorted, index, visited)
return nil if index == 0
(index - 1).downto(0).each do |i|
return sorted[i] unless visited.include?(sorted[i].name)
endfind_join_point function · ruby · L193-L204 (12 LOC)lib/turbofan/generators/asl.rb
def find_join_point(dag, fork_children, sorted, fork_index)
# Count fork children that have at least one descendant (non-dead-end branches)
active_children = fork_children.count { |child| dag.children_of(child).any? }
active_children = 1 if active_children == 0
sorted[(fork_index + 1)..].each do |step|
next if fork_children.include?(step.name)
ancestors = all_ancestors(dag, step.name)
fork_ancestor_count = fork_children.count { |child| ancestors.include?(child) }
return step if fork_ancestor_count >= active_children
endall_ancestors function · ruby · L208-L215 (8 LOC)lib/turbofan/generators/asl.rb
def all_ancestors(dag, step_name)
visited = Set.new
queue = dag.parents_of(step_name).dup
while (parent = queue.shift)
next if visited.include?(parent)
visited << parent
queue.concat(dag.parents_of(parent))
endRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
branch_steps_for function · ruby · L219-L226 (8 LOC)lib/turbofan/generators/asl.rb
def branch_steps_for(dag, branch_start, join_point, sorted)
reachable = Set.new
queue = [branch_start]
while (step_name = queue.shift)
next if reachable.include?(step_name) || step_name == join_point
reachable << step_name
dag.children_of(step_name).each { |c| queue << c }
endbuild_branch_chain function · ruby · L230-L247 (18 LOC)lib/turbofan/generators/asl.rb
def build_branch_chain(pipeline_name, chain, fork_step_name)
states = {}
chain.each_with_index do |step, idx|
last_in_branch = (idx == chain.size - 1)
prev_step_name = (idx == 0) ? fork_step_name : chain[idx - 1].name
state = build_branch_state(pipeline_name, step, prev_step_name)
if last_in_branch
state["End"] = true
else
state.delete("End")
state["Next"] = chain[idx + 1].name.to_s
state["ResultSelector"] = {
"JobId.$" => "$.JobId",
"JobName.$" => "$.JobName",
"Status.$" => "$.Status"
}
state["ResultPath"] = "$.steps.#{step.name}"
endnotification_states function · ruby · L256-L277 (22 LOC)lib/turbofan/generators/asl.rb
def notification_states(topic_arn, pipeline_name)
{
"NotifySuccess" => {
"Type" => "Task",
"Resource" => SNS_RESOURCE,
"Parameters" => {
"TopicArn" => topic_arn,
"Message" => "Pipeline #{pipeline_name} completed successfully."
},
"End" => true
},
"NotifyFailure" => {
"Type" => "Task",
"Resource" => SNS_RESOURCE,
"Parameters" => {
"TopicArn" => topic_arn,
"Message" => "Pipeline #{pipeline_name} failed."
},
"End" => true
}
}
end