Short introduction: Expiration vs Expires
RabbitMQ does support per-message TTL (as well as TTL for the queue), the behavior is documented here: https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers. The trick is to set the expiration Message Property (https://www.rabbitmq.com/publishers.html#message-properties) when the message is published (in milliseconds).
Celery on the other hand allows you to set the expires parameter (https://docs.celeryproject.org/en/stable/reference/celery.app.task.html) in seconds or as a datetime. The difference from the native RabbitMQ functionality is that the message remains in the queue after expiration. The expired message is delivered to the worker, which then reads the expires header to determine that the message has expired and rejects the message.
tl;dr: expiration != expires
How to pass a message property in Celery
This method is not documented in Celery. I figured it out by trial and error because I wanted a native TTL myself.
The send_task method (celery.app.base.Celery.send_task), which is called for example by apply_async, accepts the **options parameter. All **options unknown to Celery are then passed in the celery.app.amqp.Queues->send_task_message( ... ) method as **kwargs and then as message properties.
So if we can set the message property, there is nothing easier than setting the native expiration:
my_awesome_task.apply_async(args=(11,), expiration=42)

- Note that Celery automatically converts 42 seconds to 42000 milliseconds (which is correct).
- Expiration (in properties) and Expires (in headers) can be combined, the two functionalities are not affected in any way.