Celery Integration
Getting Started with Celery
In order to be able to support celery you need to configure both your webapp and your workers
Warning
If json is used to serialize your celery tasks, the log context in use when executing a task (through apply_async
or delay
) should only contain JSON-serializable data. You can use modify_context_before_task_publish to ensure this is the case.
Replace your requirements
First of all, make sure your django-structlog
installation knows you use celery
in order to validate compatibility with your installed version. See Installing “Extras” for more information.
Replace django-structlog
with django-structlog[celery]
in your requirements.txt
.
django-structlog[celery]==X.Y.Z
Enable celery integration in your web app
In your settings.py
MIDDLEWARE = [
# ...
'django_structlog.middlewares.RequestMiddleware',
]
DJANGO_STRUCTLOG_CELERY_ENABLED = True
Initialize Celery Worker with DjangoStructLogInitStep
In your celery AppConfig’s module.
import logging
import structlog
from celery import Celery
from celery.signals import setup_logging
from django_structlog.celery.steps import DjangoStructLogInitStep
app = Celery("your_celery_project")
# A step to initialize django-structlog
app.steps['worker'].add(DjangoStructLogInitStep)
Warning
If you use celery
’s task_protocol v1, django-structlog
will not be able to transfer metadata to child task.
Ex:
app = Celery("your_celery_project", task_protocol=1)
Configure celery’s logger
In the same file as before
@setup_logging.connect
def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs): # pragma: no cover
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(),
},
"plain_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer(),
},
"key_value": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.KeyValueRenderer(key_order=['timestamp', 'level', 'event', 'logger']),
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "plain_console",
},
"json_file": {
"class": "logging.handlers.WatchedFileHandler",
"filename": "logs/json.log",
"formatter": "json_formatter",
},
"flat_line_file": {
"class": "logging.handlers.WatchedFileHandler",
"filename": "logs/flat_line.log",
"formatter": "key_value",
},
},
"loggers": {
"django_structlog": {
"handlers": ["console", "flat_line_file", "json_file"],
"level": "INFO",
},
"django_structlog_demo_project": {
"handlers": ["console", "flat_line_file", "json_file"],
"level": "INFO",
},
}
}
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
Signals
modify_context_before_task_publish
You can connect to modify_context_before_task_publish
signal in order to modify the metadata before it is stored in the task’s message.
By example you can strip down the context
to keep only some of the keys:
@receiver(signals.modify_context_before_task_publish)
def receiver_modify_context_before_task_publish(sender, signal, context, task_routing_key=None, task_properties=None, **kwargs):
keys_to_keep = {"request_id", "parent_task_id"}
new_dict = {key_to_keep: context[key_to_keep] for key_to_keep in keys_to_keep if key_to_keep in context}
context.clear()
context.update(new_dict)
bind_extra_task_metadata
You can optionally connect to bind_extra_task_metadata
signal in order to bind more metadata to the logger or override existing bound metadata. This is called
in celery’s receiver_task_pre_run
.
from django_structlog.celery import signals
import structlog
@receiver(signals.bind_extra_task_metadata)
def receiver_bind_extra_request_metadata(sender, signal, task=None, logger=None, **kwargs):
structlog.contextvars.bind_contextvars(correlation_id=task.request.correlation_id)