Schedules#
Launch plans can be set to run automatically on a schedule using the Flyte Native Scheduler. For workflows that depend on knowing the kick-off time, Flyte supports passing in the scheduled time (not the actual time, which may be a few seconds off) as an argument to the workflow.
Check out a demo of how the Native Scheduler works:
Note
Native scheduler doesn’t support AWS syntax.
Note
To clone and run the example code on this page, see the Flytesnacks repo.
Consider the following example workflow:
from datetime import datetime
from flytekit import task, workflow
@task
def format_date(run_date: datetime) -> str:
return run_date.strftime("%Y-%m-%d %H:%M")
@workflow
def date_formatter_wf(kickoff_time: datetime):
formatted_kickoff_time = format_date(run_date=kickoff_time)
print(formatted_kickoff_time)
The date_formatter_wf
workflow can be scheduled using either the CronSchedule
or the FixedRate
object.
Cron schedules#
Cron expression strings use this syntax. An incorrect cron schedule expression would lead to failure in triggering the schedule.
from flytekit import CronSchedule, LaunchPlan # noqa: E402
# creates a launch plan that runs every minute.
cron_lp = LaunchPlan.get_or_create(
name="my_cron_scheduled_lp",
workflow=date_formatter_wf,
schedule=CronSchedule(
# Note that the ``kickoff_time_input_arg`` matches the workflow input we defined above: kickoff_time
# But in case you are using the AWS scheme of schedules and not using the native scheduler then switch over the schedule parameter with cron_expression
schedule="*/1 * * * *", # Following schedule runs every min
kickoff_time_input_arg="kickoff_time",
),
)
The kickoff_time_input_arg
corresponds to the workflow input kickoff_time
.
Specifying this argument means that Flyte will pass in the kick-off time of the
cron schedule into the kickoff_time
argument of the date_formatter_wf
workflow.
Fixed rate intervals#
If you prefer to use an interval rather than a cron scheduler to schedule your workflows, you can use the fixed-rate scheduler. A fixed-rate scheduler runs at the specified interval.
Here’s an example:
from datetime import timedelta # noqa: E402
from flytekit import FixedRate, LaunchPlan # noqa: E402
@task
def be_positive(name: str) -> str:
return f"You're awesome, {name}"
@workflow
def positive_wf(name: str):
reminder = be_positive(name=name)
print(f"{reminder}")
fixed_rate_lp = LaunchPlan.get_or_create(
name="my_fixed_rate_lp",
workflow=positive_wf,
# Note that the workflow above doesn't accept any kickoff time arguments.
# We just omit the ``kickoff_time_input_arg`` from the FixedRate schedule invocation
schedule=FixedRate(duration=timedelta(minutes=10)),
fixed_inputs={"name": "you"},
)
This fixed-rate scheduler runs every ten minutes. Similar to a cron scheduler, a fixed-rate scheduler also accepts kickoff_time_input_arg
(which is omitted in this example).
Activating a schedule#
After initializing your launch plan, activate the specific version of the launch plan so that the schedule runs.
flytectl update launchplan -p flyteexamples -d development {{ name_of_lp }} --version <foo> --activate
Verify if your launch plan was activated:
flytectl get launchplan -p flytesnacks -d development
Deactivating a schedule#
You can archive/deactivate the launch plan to deschedule any scheduled job associated with it.
flytectl update launchplan -p flyteexamples -d development {{ name_of_lp }} --version <foo> --archive