Airflow Dagrun for each datum instead of scheduled

airflow schedule interval example
airflow dag stuck in running state
airflow max_active_runs
airflow schedule interval every 5 minutes
airflow backfill
airflow parallel tasks
airflow scheduler not working
airflow backfill from ui

The current problem that I am facing is that I have documents in a MongoDB collection which each need to be processed and updated by tasks which need to run in an acyclic dependency graph. If a task upstream fails to process a document, then none of the dependent tasks may process that document, as that document has not been updated with the prerequisite information.

If I were to use Airflow, this leaves me with two solutions:

  1. Trigger a DAG for each document, and pass in the document ID with --conf. The problem with this is that this is not the intended way for Airflow to be used; I would never be running a scheduled process, and based on how documents appear in the collection, I would be making 1440 Dagruns per day.

  2. Run a DAG every period for processing all documents created in the collection for that period. This follows how Airflow is expected to work, but the problem is that if a task fails to process a single document, none of the dependent tasks may process any of the other documents. Also, if a document takes longer than other documents do to be processed by a task, those other documents are waiting on that single document to continue down the DAG.

Is there a better method than Airflow? Or is there a better way to handle this in Airflow than the two methods I currently see?

From the knowledge I gained in my attempt to answer this question, I've come to the conclusion that Airflow is just not the tool for the job.

Airflow is designed for scheduled, idempotent DAGs. A DagRun must also have a unique execution_date; this means running the same DAG at the exact same start time (in the case that we receive two documents at the same time is quite literally impossible. Of course, we can schedule the next DagRun immediately in succession, but this limitation should demonstrate that any attempt to use Airflow in this fashion will always be, to an extent, a hack.

The most viable solution I've found is to instead use Prefect, which was developed with the intention of overcoming some of the limitations of Airflow:

"Prefect assumes that flows can be run at any time, for any reason."

Prefect's equivalent of a DAG is a Flow; one key advantage of a flow that we may take advantage of is the ease of parametriziation. Then, with some threads, we're able to have a Flow run for each element in a stream. Here is an example streaming ETL pipeline:

import time
from prefect import task, Flow, Parameter
from threading import Thread
​
​
def stream():
    for x in range(10):
        yield x
        time.sleep(1)
​
​
@task
def extract(x):
    # If 'x' referenced a document, in this step we could load that document
    return x
​
​
@task
def transform(x):
    return x * 2
​
​
@task
def load(y):
    print("Received y: {}".format(y))
​
​
with Flow("ETL") as flow:
    x_param = Parameter('x')
    e = extract(x_param)
    t = transform(e)
    l = load(t)
​
for x in stream():
    thread = Thread(target=flow.run, kwargs={"x": x})
    thread.start()

Scheduling & Triggers, be useful when you're iterating on the functionality of a workflow/DAG. DagRun corresponding to the given dag_id and execution date if one exists. None otherwise. Return type. airflow.models.DagRun. classmethod get_latest_runs (cls, session) [source] ¶ Returns the latest DagRun for each DAG.

You could change trigger_rule from "all_success" to "all_done"

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

And also could create a branch that processes failed documents with trigger_rule set to "one_failed" to move processes those failed documents somehow differently (e.g. move to a "failed" folder and send a notification)

Get started developing workflows with Apache Airflow, in your DAGS_FOLDER for modules that contain DAG objects in their global namespace and adds the objects it finds in the DagBag . Knowing this all, we need is a way to dynamically assign variable in the global namespace. A DAGRun is an object in airflow that represents an execution/instance of a specific DAG. Amongst other fields, it contains the execution_date, start_date and end_date. For our first DAG run, the scheduler will create a DAG run object with the following properties:

I would be making 1440 Dagruns per day.

With a good Airflow architecture, this is quite possible. Choking points might be

  1. executor - use Celery Executor instead of Local Executor for example
  2. backend database - monitor and tune as necessary (indexes, proper storage etc)
  3. webserver - well, for thousands of dagruns, tasks etc.. perhaps only use webeserver for dev/qa environments, and not for production where you have higher rate of task/dagruns submissions. You could use cli etc instead.

Another approach is scaling out by running multiple Airflow instances - partition documents let's say to ten buckets, and assign each partition's documents to just one Airflow instance.

Understanding Apache Airflow's key concepts, The Airflow scheduler monitors all tasks and all DAGs, and triggers the task Your DAG will be instantiated for each schedule, while creating a DAG Run entry its own catchup (IE not limited to the interval, but instead to “Now” for instance.)​  3 Airflow Dagrun for each datum instead of scheduled Oct 16 '19. 3 Airflow Dagrun for each datum instead of scheduled Jan 19. View all questions and answers →

I'd process the heavier tasks in parallel and feed successful operations downstream. As far as I know, you can't feed successes asynchronously to downstream tasks, so you would still need to wait for every thread to finish until moving downstream but, this would still be well more acceptable than spawning 1 dag for each record, something in these lines:

Task 1: read mongo filtering by some timestamp (remember idempotence) and feed tasks (i.e. via xcom);

Task 2: do stuff in paralell via PythonOperator, or even better via K8sPod, i.e:

def thread_fun(ret):
    while not job_queue.empty():
        job = job_queue.get()
        try:        
            ret.append(stuff_done(job))
        except:
            pass
    job_queue.task_done()
    return ret

# Create workers and queue
threads = []
ret = [] # a mutable object
job_queue = Queue(maxsize=0)

for thr_nr in appropriate_thread_nr:
    worker = threading.Thread(
        target=thread_fun,
        args=([ret])
    )
    worker.setDaemon(True)
    threads.append(worker)

# Populate queue with jobs
for row in xcom_pull(task_ids=upstream_task):
    job_queue.put(row)

# Start threads
for thr in threads:
    thr.start()

# Wait to finish their jobs
for thr in threads:
    thr.join()

xcom_push(ret)

Task 3: Do more stuff coming from previous task, and so on

FAQ, There are very many reasons why your task might not be getting scheduled. The first DagRun to be created will be based on the min(start_date) for all your task. we recommend using the macros or cron expressions instead, as it enforces  DagRun corresponding to the given dag_id and execution date if one exists. None otherwise. Return type. airflow.models.DagRun. classmethod get_latest_runs (cls, session) ¶ Returns the latest DagRun for each DAG. class airflow.models.KubeWorkerIdentifier [source] ¶ Bases: airflow.models.base.Base. __tablename__ = kube_worker_uuid¶ one_row_id

We have built a system that queries MongoDB for a list, and generates a python file per item containing one DAG (note: having each dag have its own python file helps Airflow scheduler efficiency, with it's current design) - the generator DAG runs hourly, right before the scheduled hourly run of all the generated DAGs.

airflow.models.dagrun, Base , airflow.utils.log.logging_mixin. no_backfills (bool) – return no backfills (​True), return all (False). The previous, SCHEDULED DagRun, if there is one. Airflow is a WMS that defines tasks and and their dependencies as code, executes those tasks on a regular schedule, and distributes task execution across worker processes. Airflow offers an excellent UI that displays the states of currently active and past tasks, shows diagnostic information about task execution, and allows the user to manually

apache/incubator-airflow, @/all Please review new wiki pages at https://github.com/airbnb/airflow/wiki all the time at 100%, eventhough I had just one thing scheduled once a day. .com/​questions/58419203/airflow-dagrun-for-each-datum-instead-of-scheduled. class airflow.models.DagRun [source] ¶ Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin. DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger. __tablename__ = dag_run¶ ID_PREFIX = scheduled__¶ ID_FORMAT_PREFIX¶ id¶ dag_id¶ execution_date¶ start

airflow – TechUtils.in, I'd like to skip the first step for all iterations, which I can do with the task regex and /questions/58419203/airflow-dagrun-for-each-datum-instead-of-scheduled. Each DagRun and TaskInstance is associated with an entry in Airflow’s metadata database that logs their state (e.g. “queued”, “running”, “failed”, “skipped”, “up for retry”). Reading and updating

Prefect, #StackBounty: #airflow Airflow Dagrun for each datum instead of way for Airflow to be used; I would never be running a scheduled process,  To kick it off, all you need to do is execute airflow scheduler. It will use the configuration specified in airflow.cfg. Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Comments
  • This perfectly outlines my problem understanding how to do something in Airflow for each element in a list (file in a directory, row in a database). Both solutions feel wrong for the reasons you outline. The answers here don't address these problems. What did you do in the end?
  • I didn't; the system still uses a poorly optimized COLSCAN to find a valid document to process. Thanks for putting a bounty on this question, hopefully that will attract more novel solutions. One thing I did find was Prefect which was designed by someone originally involved with Airflow that was upset by the limitations; it might be a better option for tasks such as this.
  • The Prefect docs perfectly describe this problem and claim to solve it. I haven't tried it so I can't advocate but it sounds good from their description.
  • Another concern with the second approach is also that I will have to wait for all documents to be processed by a certain task before we may process the documents by a dependent task; for certain documents, processing by a given task may take much longer than other documents, and this means that we're waiting on a single document to continue the rest of the DAG for multiple documents.
  • I guess for option 1 I'm just nervous about how it doesn't seem to be "the correct way" to use Airflow. I'd be creating multiple dagruns scheduled to run at essentially the same time, so any idea of scheduling doesn't line up with this paradigm.