Celery Integration

Getting Started with Celery

In order to be able to support celery you need to configure both your webapp and your workers

Warning

If json is used to serialize your celery tasks, the log context in use when executing a task (through apply_async or delay) should only contain JSON-serializable data. You can use modify_context_before_task_publish to ensure this is the case.

Replace your requirements

First of all, make sure your django-structlog installation knows you use celery in order to validate compatibility with your installed version. See Installing “Extras” for more information.

Replace django-structlog with django-structlog[celery] in your requirements.txt.

django-structlog[celery]==X.Y.Z

Enable celery integration in your web app

In your settings.py

MIDDLEWARE = [
    # ...
    'django_structlog.middlewares.RequestMiddleware',
]

DJANGO_STRUCTLOG_CELERY_ENABLED = True

Initialize Celery Worker with DjangoStructLogInitStep

In your celery AppConfig’s module.

import logging

import structlog
from celery import Celery
from celery.signals import setup_logging
from django_structlog.celery.steps import DjangoStructLogInitStep

app = Celery("your_celery_project")

# A step to initialize django-structlog
app.steps['worker'].add(DjangoStructLogInitStep)

Warning

If you use celery’s task_protocol v1, django-structlog will not be able to transfer metadata to child task.

Ex:

app = Celery("your_celery_project", task_protocol=1)

Configure celery’s logger

In the same file as before

@setup_logging.connect
def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs):  # pragma: no cover
    logging.config.dictConfig(
        {
           "version": 1,
           "disable_existing_loggers": False,
           "formatters": {
               "json_formatter": {
                   "()": structlog.stdlib.ProcessorFormatter,
                   "processor": structlog.processors.JSONRenderer(),
               },
               "plain_console": {
                   "()": structlog.stdlib.ProcessorFormatter,
                   "processor": structlog.dev.ConsoleRenderer(),
               },
               "key_value": {
                   "()": structlog.stdlib.ProcessorFormatter,
                   "processor": structlog.processors.KeyValueRenderer(key_order=['timestamp', 'level', 'event', 'logger']),
               },
           },
           "handlers": {
               "console": {
                   "class": "logging.StreamHandler",
                   "formatter": "plain_console",
               },
               "json_file": {
                   "class": "logging.handlers.WatchedFileHandler",
                   "filename": "logs/json.log",
                   "formatter": "json_formatter",
               },
               "flat_line_file": {
                   "class": "logging.handlers.WatchedFileHandler",
                   "filename": "logs/flat_line.log",
                   "formatter": "key_value",
               },
           },
           "loggers": {
               "django_structlog": {
                   "handlers": ["console", "flat_line_file", "json_file"],
                   "level": "INFO",
               },
               "django_structlog_demo_project": {
                   "handlers": ["console", "flat_line_file", "json_file"],
                   "level": "INFO",
               },
           }
       }
    )

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.filter_by_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

Signals

modify_context_before_task_publish

You can connect to modify_context_before_task_publish signal in order to modify the metadata before it is stored in the task’s message.

By example you can strip down the context to keep only some of the keys:

@receiver(signals.modify_context_before_task_publish)
def receiver_modify_context_before_task_publish(sender, signal, context, task_routing_key=None, task_properties=None, **kwargs):
    keys_to_keep = {"request_id", "parent_task_id"}
    new_dict = {key_to_keep: context[key_to_keep] for key_to_keep in keys_to_keep if key_to_keep in context}
    context.clear()
    context.update(new_dict)

bind_extra_task_metadata

You can optionally connect to bind_extra_task_metadata signal in order to bind more metadata to the logger or override existing bound metadata. This is called in celery’s receiver_task_pre_run.

from django_structlog.celery import signals
import structlog

@receiver(signals.bind_extra_task_metadata)
def receiver_bind_extra_request_metadata(sender, signal, task=None, logger=None, **kwargs):
    structlog.contextvars.bind_contextvars(correlation_id=task.request.correlation_id)