619 lines
14 KiB
C
619 lines
14 KiB
C
/* $OpenBSD: rrdp.c,v 1.33 2024/02/16 11:46:57 tb Exp $ */
|
|
/*
|
|
* Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
|
|
* Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
#include <sys/queue.h>
|
|
#include <sys/stat.h>
|
|
|
|
#include <err.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <limits.h>
|
|
#include <poll.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <imsg.h>
|
|
|
|
#include <expat.h>
|
|
#include <openssl/sha.h>
|
|
|
|
#include "extern.h"
|
|
#include "rrdp.h"
|
|
|
|
#define MAX_SESSIONS 12
|
|
#define READ_BUF_SIZE (32 * 1024)
|
|
|
|
static struct msgbuf msgq;
|
|
|
|
#define RRDP_STATE_REQ 0x01
|
|
#define RRDP_STATE_WAIT 0x02
|
|
#define RRDP_STATE_PARSE 0x04
|
|
#define RRDP_STATE_PARSE_ERROR 0x08
|
|
#define RRDP_STATE_PARSE_DONE 0x10
|
|
#define RRDP_STATE_HTTP_DONE 0x20
|
|
#define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
|
|
|
|
struct rrdp {
|
|
TAILQ_ENTRY(rrdp) entry;
|
|
unsigned int id;
|
|
char *notifyuri;
|
|
char *local;
|
|
char *last_mod;
|
|
|
|
struct pollfd *pfd;
|
|
int infd;
|
|
int state;
|
|
int aborted;
|
|
unsigned int file_pending;
|
|
unsigned int file_failed;
|
|
enum http_result res;
|
|
enum rrdp_task task;
|
|
|
|
char hash[SHA256_DIGEST_LENGTH];
|
|
SHA256_CTX ctx;
|
|
|
|
struct rrdp_session *repository;
|
|
struct rrdp_session *current;
|
|
XML_Parser parser;
|
|
struct notification_xml *nxml;
|
|
struct snapshot_xml *sxml;
|
|
struct delta_xml *dxml;
|
|
};
|
|
|
|
static TAILQ_HEAD(, rrdp) states = TAILQ_HEAD_INITIALIZER(states);
|
|
|
|
char *
|
|
xstrdup(const char *s)
|
|
{
|
|
char *r;
|
|
if ((r = strdup(s)) == NULL)
|
|
err(1, "strdup");
|
|
return r;
|
|
}
|
|
|
|
/*
|
|
* Report back that a RRDP request finished.
|
|
* ok should only be set to 1 if the cache is now up-to-date.
|
|
*/
|
|
static void
|
|
rrdp_done(unsigned int id, int ok)
|
|
{
|
|
enum rrdp_msg type = RRDP_END;
|
|
struct ibuf *b;
|
|
|
|
b = io_new_buffer();
|
|
io_simple_buffer(b, &type, sizeof(type));
|
|
io_simple_buffer(b, &id, sizeof(id));
|
|
io_simple_buffer(b, &ok, sizeof(ok));
|
|
io_close_buffer(&msgq, b);
|
|
}
|
|
|
|
/*
|
|
* Request an URI to be fetched via HTTPS.
|
|
* The main process will respond with a RRDP_HTTP_INI which includes
|
|
* the file descriptor to read from. RRDP_HTTP_FIN is sent at the
|
|
* end of the request with the HTTP status code and last modified timestamp.
|
|
* If the request should not set the If-Modified-Since: header then last_mod
|
|
* should be set to NULL, else it should point to a proper date string.
|
|
*/
|
|
static void
|
|
rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
|
|
{
|
|
enum rrdp_msg type = RRDP_HTTP_REQ;
|
|
struct ibuf *b;
|
|
|
|
b = io_new_buffer();
|
|
io_simple_buffer(b, &type, sizeof(type));
|
|
io_simple_buffer(b, &id, sizeof(id));
|
|
io_str_buffer(b, uri);
|
|
io_str_buffer(b, last_mod);
|
|
io_close_buffer(&msgq, b);
|
|
}
|
|
|
|
/*
|
|
* Send the session state to the main process so it gets stored.
|
|
*/
|
|
static void
|
|
rrdp_state_send(struct rrdp *s)
|
|
{
|
|
enum rrdp_msg type = RRDP_SESSION;
|
|
struct ibuf *b;
|
|
|
|
b = io_new_buffer();
|
|
io_simple_buffer(b, &type, sizeof(type));
|
|
io_simple_buffer(b, &s->id, sizeof(s->id));
|
|
rrdp_session_buffer(b, s->current);
|
|
io_close_buffer(&msgq, b);
|
|
}
|
|
|
|
/*
|
|
* Inform parent to clear the RRDP repository before start of snapshot.
|
|
*/
|
|
static void
|
|
rrdp_clear_repo(struct rrdp *s)
|
|
{
|
|
enum rrdp_msg type = RRDP_CLEAR;
|
|
struct ibuf *b;
|
|
|
|
b = io_new_buffer();
|
|
io_simple_buffer(b, &type, sizeof(type));
|
|
io_simple_buffer(b, &s->id, sizeof(s->id));
|
|
io_close_buffer(&msgq, b);
|
|
}
|
|
|
|
/*
|
|
* Send a blob of data to the main process to store it in the repository.
|
|
*/
|
|
void
|
|
rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
|
|
unsigned char *data, size_t datasz)
|
|
{
|
|
enum rrdp_msg type = RRDP_FILE;
|
|
struct ibuf *b;
|
|
|
|
/* only send files if the fetch did not fail already */
|
|
if (s->file_failed == 0) {
|
|
b = io_new_buffer();
|
|
io_simple_buffer(b, &type, sizeof(type));
|
|
io_simple_buffer(b, &s->id, sizeof(s->id));
|
|
io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
|
|
if (pxml->type != PUB_ADD)
|
|
io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
|
|
io_str_buffer(b, pxml->uri);
|
|
io_buf_buffer(b, data, datasz);
|
|
io_close_buffer(&msgq, b);
|
|
s->file_pending++;
|
|
}
|
|
}
|
|
|
|
static void
|
|
rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
|
|
{
|
|
struct rrdp *s;
|
|
|
|
if ((s = calloc(1, sizeof(*s))) == NULL)
|
|
err(1, NULL);
|
|
|
|
s->infd = -1;
|
|
s->id = id;
|
|
s->local = local;
|
|
s->notifyuri = notify;
|
|
s->repository = state;
|
|
if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
|
|
err(1, NULL);
|
|
|
|
s->state = RRDP_STATE_REQ;
|
|
if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
|
|
err(1, "XML_ParserCreate");
|
|
|
|
s->nxml = new_notification_xml(s->parser, s->repository, s->current,
|
|
notify);
|
|
|
|
TAILQ_INSERT_TAIL(&states, s, entry);
|
|
}
|
|
|
|
static void
|
|
rrdp_free(struct rrdp *s)
|
|
{
|
|
if (s == NULL)
|
|
return;
|
|
|
|
TAILQ_REMOVE(&states, s, entry);
|
|
|
|
free_notification_xml(s->nxml);
|
|
free_snapshot_xml(s->sxml);
|
|
free_delta_xml(s->dxml);
|
|
|
|
if (s->parser)
|
|
XML_ParserFree(s->parser);
|
|
if (s->infd != -1)
|
|
close(s->infd);
|
|
free(s->notifyuri);
|
|
free(s->local);
|
|
free(s->last_mod);
|
|
rrdp_session_free(s->repository);
|
|
rrdp_session_free(s->current);
|
|
|
|
free(s);
|
|
}
|
|
|
|
static struct rrdp *
|
|
rrdp_get(unsigned int id)
|
|
{
|
|
struct rrdp *s;
|
|
|
|
TAILQ_FOREACH(s, &states, entry)
|
|
if (s->id == id)
|
|
break;
|
|
return s;
|
|
}
|
|
|
|
static void
|
|
rrdp_failed(struct rrdp *s)
|
|
{
|
|
unsigned int id = s->id;
|
|
|
|
/* reset file state before retrying */
|
|
s->file_failed = 0;
|
|
|
|
if (s->task == DELTA && !s->aborted) {
|
|
/* fallback to a snapshot as per RFC8182 */
|
|
free_delta_xml(s->dxml);
|
|
s->dxml = NULL;
|
|
rrdp_clear_repo(s);
|
|
s->sxml = new_snapshot_xml(s->parser, s->current, s);
|
|
s->task = SNAPSHOT;
|
|
s->state = RRDP_STATE_REQ;
|
|
logx("%s: delta sync failed, fallback to snapshot", s->local);
|
|
} else {
|
|
/*
|
|
* TODO: update state to track recurring failures
|
|
* and fall back to rsync after a while.
|
|
* This should probably happen in the main process.
|
|
*/
|
|
rrdp_free(s);
|
|
rrdp_done(id, 0);
|
|
}
|
|
}
|
|
|
|
static void
|
|
rrdp_finished(struct rrdp *s)
|
|
{
|
|
unsigned int id = s->id;
|
|
|
|
/* check if all parts of the process have finished */
|
|
if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
|
|
return;
|
|
|
|
/* still some files pending */
|
|
if (s->file_pending > 0)
|
|
return;
|
|
|
|
if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
|
|
rrdp_failed(s);
|
|
return;
|
|
}
|
|
|
|
if (s->res == HTTP_OK) {
|
|
XML_Parser p = s->parser;
|
|
|
|
/*
|
|
* Finalize parsing on success to be sure that
|
|
* all of the XML is correct. Needs to be done here
|
|
* since the call would most probably fail for non
|
|
* successful data fetches.
|
|
*/
|
|
if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
|
|
warnx("%s: XML error at line %llu: %s", s->local,
|
|
(unsigned long long)XML_GetCurrentLineNumber(p),
|
|
XML_ErrorString(XML_GetErrorCode(p)));
|
|
rrdp_failed(s);
|
|
return;
|
|
}
|
|
|
|
/* If a file caused an error fail the update */
|
|
if (s->file_failed > 0) {
|
|
rrdp_failed(s);
|
|
return;
|
|
}
|
|
|
|
switch (s->task) {
|
|
case NOTIFICATION:
|
|
s->task = notification_done(s->nxml, s->last_mod);
|
|
s->last_mod = NULL;
|
|
switch (s->task) {
|
|
case NOTIFICATION:
|
|
logx("%s: repository not modified (%s#%lld)",
|
|
s->local, s->repository->session_id,
|
|
s->repository->serial);
|
|
rrdp_state_send(s);
|
|
rrdp_free(s);
|
|
rrdp_done(id, 1);
|
|
break;
|
|
case SNAPSHOT:
|
|
logx("%s: downloading snapshot (%s#%lld)",
|
|
s->local, s->current->session_id,
|
|
s->current->serial);
|
|
rrdp_clear_repo(s);
|
|
s->sxml = new_snapshot_xml(p, s->current, s);
|
|
s->state = RRDP_STATE_REQ;
|
|
break;
|
|
case DELTA:
|
|
logx("%s: downloading %lld deltas (%s#%lld)",
|
|
s->local,
|
|
s->repository->serial - s->current->serial,
|
|
s->current->session_id, s->current->serial);
|
|
s->dxml = new_delta_xml(p, s->current, s);
|
|
s->state = RRDP_STATE_REQ;
|
|
break;
|
|
}
|
|
break;
|
|
case SNAPSHOT:
|
|
rrdp_state_send(s);
|
|
rrdp_free(s);
|
|
rrdp_done(id, 1);
|
|
break;
|
|
case DELTA:
|
|
if (notification_delta_done(s->nxml)) {
|
|
/* finished */
|
|
rrdp_state_send(s);
|
|
rrdp_free(s);
|
|
rrdp_done(id, 1);
|
|
} else {
|
|
/* reset delta parser for next delta */
|
|
free_delta_xml(s->dxml);
|
|
s->dxml = new_delta_xml(p, s->current, s);
|
|
s->state = RRDP_STATE_REQ;
|
|
}
|
|
break;
|
|
}
|
|
} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
|
|
logx("%s: notification file not modified (%s#%lld)", s->local,
|
|
s->repository->session_id, s->repository->serial);
|
|
/* no need to update state file */
|
|
rrdp_free(s);
|
|
rrdp_done(id, 1);
|
|
} else {
|
|
rrdp_failed(s);
|
|
}
|
|
}
|
|
|
|
static void
|
|
rrdp_abort_req(struct rrdp *s)
|
|
{
|
|
unsigned int id = s->id;
|
|
|
|
s->aborted = 1;
|
|
if (s->state == RRDP_STATE_REQ) {
|
|
/* nothing is pending, just abort */
|
|
rrdp_free(s);
|
|
rrdp_done(id, 1);
|
|
return;
|
|
}
|
|
if (s->state == RRDP_STATE_WAIT)
|
|
/* wait for HTTP_INI which will progress the state */
|
|
return;
|
|
|
|
/*
|
|
* RRDP_STATE_PARSE or later, close infd, abort parser but
|
|
* wait for HTTP_FIN and file_pending to drop to 0.
|
|
*/
|
|
if (s->infd != -1) {
|
|
close(s->infd);
|
|
s->infd = -1;
|
|
s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
|
|
}
|
|
rrdp_finished(s);
|
|
}
|
|
|
|
static void
|
|
rrdp_input_handler(int fd)
|
|
{
|
|
static struct ibuf *inbuf;
|
|
struct rrdp_session *state;
|
|
char *local, *notify, *last_mod;
|
|
struct ibuf *b;
|
|
struct rrdp *s;
|
|
enum rrdp_msg type;
|
|
enum http_result res;
|
|
unsigned int id;
|
|
int ok;
|
|
|
|
b = io_buf_recvfd(fd, &inbuf);
|
|
if (b == NULL)
|
|
return;
|
|
|
|
io_read_buf(b, &type, sizeof(type));
|
|
io_read_buf(b, &id, sizeof(id));
|
|
|
|
switch (type) {
|
|
case RRDP_START:
|
|
if (ibuf_fd_avail(b))
|
|
errx(1, "received unexpected fd");
|
|
io_read_str(b, &local);
|
|
io_read_str(b, ¬ify);
|
|
state = rrdp_session_read(b);
|
|
rrdp_new(id, local, notify, state);
|
|
break;
|
|
case RRDP_HTTP_INI:
|
|
s = rrdp_get(id);
|
|
if (s == NULL)
|
|
errx(1, "http ini, rrdp session %u does not exist", id);
|
|
if (s->state != RRDP_STATE_WAIT)
|
|
errx(1, "%s: bad internal state", s->local);
|
|
s->infd = ibuf_fd_get(b);
|
|
if (s->infd == -1)
|
|
errx(1, "expected fd not received");
|
|
s->state = RRDP_STATE_PARSE;
|
|
if (s->aborted) {
|
|
rrdp_abort_req(s);
|
|
break;
|
|
}
|
|
break;
|
|
case RRDP_HTTP_FIN:
|
|
io_read_buf(b, &res, sizeof(res));
|
|
io_read_str(b, &last_mod);
|
|
if (ibuf_fd_avail(b))
|
|
errx(1, "received unexpected fd");
|
|
|
|
s = rrdp_get(id);
|
|
if (s == NULL)
|
|
errx(1, "http fin, rrdp session %u does not exist", id);
|
|
if (!(s->state & RRDP_STATE_PARSE))
|
|
errx(1, "%s: bad internal state", s->local);
|
|
s->state |= RRDP_STATE_HTTP_DONE;
|
|
s->res = res;
|
|
free(s->last_mod);
|
|
s->last_mod = last_mod;
|
|
rrdp_finished(s);
|
|
break;
|
|
case RRDP_FILE:
|
|
s = rrdp_get(id);
|
|
if (s == NULL)
|
|
errx(1, "file, rrdp session %u does not exist", id);
|
|
if (ibuf_fd_avail(b))
|
|
errx(1, "received unexpected fd");
|
|
io_read_buf(b, &ok, sizeof(ok));
|
|
if (ok != 1)
|
|
s->file_failed++;
|
|
s->file_pending--;
|
|
if (s->file_pending == 0)
|
|
rrdp_finished(s);
|
|
break;
|
|
case RRDP_ABORT:
|
|
if (ibuf_fd_avail(b))
|
|
errx(1, "received unexpected fd");
|
|
s = rrdp_get(id);
|
|
if (s != NULL)
|
|
rrdp_abort_req(s);
|
|
break;
|
|
default:
|
|
errx(1, "unexpected message %d", type);
|
|
}
|
|
ibuf_free(b);
|
|
}
|
|
|
|
static void
|
|
rrdp_data_handler(struct rrdp *s)
|
|
{
|
|
char buf[READ_BUF_SIZE];
|
|
XML_Parser p = s->parser;
|
|
ssize_t len;
|
|
|
|
len = read(s->infd, buf, sizeof(buf));
|
|
if (len == -1) {
|
|
warn("%s: read failure", s->local);
|
|
rrdp_abort_req(s);
|
|
return;
|
|
}
|
|
if ((s->state & RRDP_STATE_PARSE) == 0)
|
|
errx(1, "%s: bad parser state", s->local);
|
|
if (len == 0) {
|
|
/* parser stage finished */
|
|
close(s->infd);
|
|
s->infd = -1;
|
|
|
|
if (s->task != NOTIFICATION) {
|
|
char h[SHA256_DIGEST_LENGTH];
|
|
|
|
SHA256_Final(h, &s->ctx);
|
|
if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
|
|
s->state |= RRDP_STATE_PARSE_ERROR;
|
|
warnx("%s: bad message digest", s->local);
|
|
}
|
|
}
|
|
|
|
s->state |= RRDP_STATE_PARSE_DONE;
|
|
rrdp_finished(s);
|
|
return;
|
|
}
|
|
|
|
/* parse and maybe hash the bytes just read */
|
|
if (s->task != NOTIFICATION)
|
|
SHA256_Update(&s->ctx, buf, len);
|
|
if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
|
|
XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
|
|
warnx("%s: parse error at line %llu: %s", s->local,
|
|
(unsigned long long)XML_GetCurrentLineNumber(p),
|
|
XML_ErrorString(XML_GetErrorCode(p)));
|
|
s->state |= RRDP_STATE_PARSE_ERROR;
|
|
}
|
|
}
|
|
|
|
void
|
|
proc_rrdp(int fd)
|
|
{
|
|
struct pollfd pfds[MAX_SESSIONS + 1];
|
|
struct rrdp *s, *ns;
|
|
size_t i;
|
|
|
|
if (pledge("stdio recvfd", NULL) == -1)
|
|
err(1, "pledge");
|
|
|
|
msgbuf_init(&msgq);
|
|
msgq.fd = fd;
|
|
|
|
for (;;) {
|
|
i = 1;
|
|
memset(&pfds, 0, sizeof(pfds));
|
|
TAILQ_FOREACH(s, &states, entry) {
|
|
if (i >= MAX_SESSIONS + 1) {
|
|
/* not enough sessions, wait for better times */
|
|
s->pfd = NULL;
|
|
continue;
|
|
}
|
|
/* request new assets when there are free sessions */
|
|
if (s->state == RRDP_STATE_REQ) {
|
|
const char *uri;
|
|
switch (s->task) {
|
|
case NOTIFICATION:
|
|
rrdp_http_req(s->id, s->notifyuri,
|
|
s->repository->last_mod);
|
|
break;
|
|
case SNAPSHOT:
|
|
case DELTA:
|
|
uri = notification_get_next(s->nxml,
|
|
s->hash, sizeof(s->hash),
|
|
s->task);
|
|
SHA256_Init(&s->ctx);
|
|
rrdp_http_req(s->id, uri, NULL);
|
|
break;
|
|
}
|
|
s->state = RRDP_STATE_WAIT;
|
|
}
|
|
s->pfd = pfds + i++;
|
|
s->pfd->fd = s->infd;
|
|
s->pfd->events = POLLIN;
|
|
}
|
|
|
|
/*
|
|
* Update main fd last.
|
|
* The previous loop may have enqueue messages.
|
|
*/
|
|
pfds[0].fd = fd;
|
|
pfds[0].events = POLLIN;
|
|
if (msgq.queued)
|
|
pfds[0].events |= POLLOUT;
|
|
|
|
if (poll(pfds, i, INFTIM) == -1) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
err(1, "poll");
|
|
}
|
|
|
|
if (pfds[0].revents & POLLHUP)
|
|
break;
|
|
if (pfds[0].revents & POLLOUT) {
|
|
switch (msgbuf_write(&msgq)) {
|
|
case 0:
|
|
errx(1, "write: connection closed");
|
|
case -1:
|
|
err(1, "write");
|
|
}
|
|
}
|
|
if (pfds[0].revents & POLLIN)
|
|
rrdp_input_handler(fd);
|
|
|
|
TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
|
|
if (s->pfd == NULL)
|
|
continue;
|
|
if (s->pfd->revents != 0)
|
|
rrdp_data_handler(s);
|
|
}
|
|
}
|
|
|
|
exit(0);
|
|
}
|