1
0
mirror of https://github.com/systemd/systemd synced 2026-04-12 18:14:51 +02:00

Compare commits

..

No commits in common. "b0083b2a5ed613bc8b2aba9cb922b061331b7beb" and "9bd72b612b76fab62ff6275c48ec19ced918e662" have entirely different histories.

18 changed files with 95 additions and 732 deletions

View File

@ -729,7 +729,7 @@
<option>--console=interactive</option>, <option>--console=read-only</option>, and
<option>--console=native</option> modes.</para>
<xi:include href="version-info.xml" xpointer="v261"/></listitem>
<xi:include href="version-info.xml" xpointer="v262"/></listitem>
</varlistentry>
<varlistentry>

View File

@ -73,14 +73,6 @@
<arg choice="plain"><replaceable>CMDLINE</replaceable></arg>
</cmdsynopsis>
<cmdsynopsis>
<command>varlinkctl</command>
<arg choice="opt" rep="repeat">OPTIONS</arg>
<arg choice="plain">serve</arg>
<arg choice="plain"><replaceable>METHOD</replaceable></arg>
<arg choice="req" rep="repeat"><replaceable>CMDLINE</replaceable></arg>
</cmdsynopsis>
<cmdsynopsis>
<command>varlinkctl</command>
<arg choice="opt" rep="repeat">OPTIONS</arg>
@ -189,28 +181,6 @@
<xi:include href="version-info.xml" xpointer="v255"/></listitem>
</varlistentry>
<varlistentry>
<term><command>serve</command> <replaceable>METHOD</replaceable> <replaceable>CMDLINE…</replaceable></term>
<listitem><para>Run a Varlink server that accepts protocol upgrade requests for the specified method
and connects the upgraded connection to the standard input and output of the specified command. This
can act as a server-side counterpart to <command>call</command> <option>--upgrade</option>.</para>
<para>The listening socket must be passed via socket activation (i.e. the
<varname>$LISTEN_FDS</varname> protocol), making this command suitable for use in socket-activated
service units. When a client calls the specified method with the upgrade flag, the server sends a
reply confirming the upgrade, then forks and executes the given command line with the upgraded
connection on its standard input and output.</para>
<para>This effectively turns any command that speaks a protocol over standard input/output into a
Varlink service, discoverable via the service registry and authenticated via socket credentials.
Because each connection is handled by a forked child process, the service unit can apply systemd's
sandboxing options (such as <varname>ProtectSystem=</varname>, etc.) and does not operate in the
caller's environment.</para>
<xi:include href="version-info.xml" xpointer="v261"/></listitem>
</varlistentry>
<varlistentry>
<term><command>list-registry</command></term>
@ -563,46 +533,6 @@ method Extend(
<programlisting># varlinkctl call ssh-exec:somehost:systemd-creds org.varlink.service.GetInfo '{}'</programlisting>
</example>
<example>
<title>Serving a Sandboxed Decompressor via Protocol Upgrade</title>
<para>The following socket and service units expose <command>xz</command> decompression as a Varlink
service. Clients connect and send compressed data over the upgraded connection, receiving decompressed
output in return.</para>
<programlisting># /etc/systemd/system/varlink-decompress-xz.socket
[Socket]
ListenStream=/run/varlink/registry/com.example.Decompress.XZ
[Install]
WantedBy=sockets.target
# /etc/systemd/system/varlink-decompress-xz.service
[Service]
ExecStart=varlinkctl serve com.example.Decompress.XZ xz -d
DynamicUser=yes
PrivateNetwork=yes
ProtectSystem=strict
ProtectHome=yes
NoNewPrivileges=yes
SystemCallFilter=~@privileged @resources
MemoryMax=256M</programlisting>
<para>A client can then decompress data through this service:</para>
<programlisting>$ echo "hello" | xz | varlinkctl call --upgrade \
unix:/run/varlink/registry/com.example.Decompress.XZ \
com.example.Decompress.XZ '{}'
hello</programlisting>
<para>For quick testing without unit files, <command>systemd-socket-activate</command> can be used
to provide the listening socket:</para>
<programlisting>$ systemd-socket-activate -l /tmp/decompress.sock -- varlinkctl serve com.example.Decompress.XZ xz -d &amp;
$ echo "hello" | xz | varlinkctl call --upgrade unix:/tmp/decompress.sock com.example.Decompress.XZ '{}'
hello</programlisting>
</example>
</refsect1>
<refsect1>

View File

@ -9,6 +9,8 @@ SplitArtifacts=yes
[Build]
Environment=SYSTEMD_REPART_OVERRIDE_FSTYPE=squashfs
Incremental=relaxed
CacheOnly=metadata
[Content]
BaseTrees=%O/minimal-base

View File

@ -9,6 +9,8 @@ SplitArtifacts=yes
[Build]
Environment=SYSTEMD_REPART_OVERRIDE_FSTYPE=squashfs
Incremental=relaxed
CacheOnly=metadata
[Content]
BaseTrees=%O/minimal-base

View File

@ -5,6 +5,7 @@ Format=directory
[Build]
Environment=SYSTEMD_REQUIRED_DEPS_ONLY=1
Incremental=relaxed
[Content]
Bootable=no

View File

@ -11,9 +11,6 @@ Packages=
iproute
nmap
VolatilePackages=
systemd-libs
RemoveFiles=
# Arch Linux doesn't split their gcc-libs package so we manually remove
# unneeded stuff here to make sure it doesn't end up in the image.

View File

@ -12,6 +12,3 @@ Packages=
iproute
iproute-tc
nmap-ncat
VolatilePackages=
systemd-libs

View File

@ -12,7 +12,3 @@ Packages=
iproute2
mount
ncat
VolatilePackages=
libsystemd0
libudev1

View File

@ -16,7 +16,3 @@ Packages=
patterns-base-minimal_base
sed
xz
VolatilePackages=
libsystemd0
libudev1

View File

@ -1094,6 +1094,5 @@ global:
LIBSYSTEMD_261 {
global:
sd_varlink_call_and_upgrade;
sd_varlink_reply_and_upgrade;
sd_varlink_set_sentinel;
} LIBSYSTEMD_260;

View File

@ -401,16 +401,6 @@ static int varlink_idl_format_symbol(
fputs("\n", f);
}
if ((symbol->symbol_flags & (SD_VARLINK_REQUIRES_UPGRADE|SD_VARLINK_SUPPORTS_UPGRADE)) != 0) {
fputs(colors[COLOR_COMMENT], f);
if (FLAGS_SET(symbol->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE))
fputs("# [Requires 'upgrade' flag]", f);
else
fputs("# [Supports 'upgrade' flag]", f);
fputs(colors[COLOR_RESET], f);
fputs("\n", f);
}
fputs(colors[COLOR_SYMBOL_TYPE], f);
fputs("method ", f);
fputs(colors[COLOR_IDENTIFIER], f);
@ -1955,10 +1945,6 @@ int varlink_idl_validate_method_call(const sd_varlink_symbol *method, sd_json_va
if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_MORE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_MORE))
return -EBADE;
/* Same for upgrade */
if (FLAGS_SET(method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) && !FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE))
return -EBADE;
return varlink_idl_validate_symbol(method, v, SD_VARLINK_INPUT, reterr_bad_field);
}

View File

@ -844,49 +844,10 @@ static int varlink_write(sd_varlink *v) {
#define VARLINK_FDS_MAX (16U*1024U)
static bool varlink_may_protocol_upgrade(sd_varlink *v) {
return v->protocol_upgrade || (v->server && FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
}
/* When a protocol upgrade might happen, peek at the socket data to find the \0 message
* boundary and return a read size that won't consume past it. This prevents over-reading
* raw post-upgrade data into the varlink input buffer. Falls back to byte-by-byte for
* non-socket fds where MSG_PEEK is not available. */
static ssize_t varlink_peek_upgrade_boundary(sd_varlink *v, void *p, size_t rs) {
assert(v);
if (!varlink_may_protocol_upgrade(v))
return rs;
if (v->prefer_read)
return 1;
ssize_t peeked = recv(v->input_fd, p, rs, MSG_PEEK|MSG_DONTWAIT);
if (peeked < 0) {
if (errno == ENOTSOCK) {
v->prefer_read = true;
return 1; /* Not a socket, fall back to byte-to-byte */
} else if (!ERRNO_IS_TRANSIENT(errno))
return -errno;
/* Transient error, this should not happen but fall back to byte-to-byte */
return 1;
}
/* EOF, the real recv() will also get it so what we return does not matter */
if (peeked == 0)
return rs;
void *nul_chr = memchr(p, 0, peeked);
if (nul_chr)
return (ssize_t) ((char*) nul_chr - (char*) p) + 1;
return peeked;
}
static int varlink_read(sd_varlink *v) {
struct iovec iov;
struct msghdr mh;
ssize_t rs;
size_t rs;
ssize_t n;
void *p;
@ -934,15 +895,12 @@ static int varlink_read(sd_varlink *v) {
p = v->input_buffer + v->input_buffer_index + v->input_buffer_size;
/* When a protocol upgrade is requested we can't consume any post-upgrade data from the socket buffer */
if (v->protocol_upgrade)
rs = 1;
else
rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size);
/* When a protocol upgrade is requested we can't consume any post-upgrade data from the socket
* buffer. Use MSG_PEEK to find the \0 message boundary and only consume up to it. For non-socket
* fds (pipes) MSG_PEEK is not available, so fall back to byte-by-byte reading. */
rs = varlink_peek_upgrade_boundary(v, p, rs);
if (rs < 0)
return varlink_log_errno(v, rs, "Failed to peek upgrade boundary: %m");
if (v->allow_fd_passing_input > 0) {
iov = IOVEC_MAKE(p, rs);
@ -1586,8 +1544,6 @@ static int varlink_dispatch_method(sd_varlink *v) {
(flags & SD_VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
VARLINK_PROCESSING_METHOD);
v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
assert(v->server);
/* First consult user supplied method implementations */
@ -1610,15 +1566,11 @@ static int varlink_dispatch_method(sd_varlink *v) {
r = varlink_idl_validate_method_call(v->current_method, parameters, flags, &bad_field);
if (r == -EBADE) {
bool missing_upgrade = FLAGS_SET(v->current_method->symbol_flags, SD_VARLINK_REQUIRES_UPGRADE) &&
!FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
varlink_log_errno(v, r, "Method %s() called without '%s' flag, but flag needs to be set.",
method, missing_upgrade ? "upgrade" : "more");
varlink_log_errno(v, r, "Method %s() called without 'more' flag, but flag needs to be set.",
method);
if (v->state == VARLINK_PROCESSING_METHOD) {
r = sd_varlink_error(v, missing_upgrade ? SD_VARLINK_ERROR_EXPECTED_UPGRADE
: SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
r = sd_varlink_error(v, SD_VARLINK_ERROR_EXPECTED_MORE, NULL);
/* If we didn't manage to enqueue an error response, then fail the
* connection completely. Otherwise ignore the error from
* sd_varlink_error() here, as it is synthesized from the function's
@ -2433,56 +2385,6 @@ _public_ int sd_varlink_call(
return sd_varlink_call_full(v, method, parameters, ret_parameters, ret_error_id, NULL);
}
static int varlink_handle_upgrade_fds(sd_varlink *v, int *ret_input_fd, int *ret_output_fd) {
int r;
assert(v);
assert(ret_input_fd || ret_output_fd);
/* Ensure no post-upgrade data was consumed into our input buffer (we ensure this via MSG_PEEK or
* byte-to-byte) and refuse the upgrade rather than silently losing the data. */
if (v->input_buffer_size != 0)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
"Unexpected buffered data during protocol upgrade, refusing.");
/* Pass the connection fds to the caller, it owns them now. Reset to blocking mode
* since callers of the upgraded protocol will generally expect normal blocking
* semantics. */
r = fd_nonblock(v->input_fd, false);
if (r < 0)
return varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m");
if (v->input_fd != v->output_fd) {
r = fd_nonblock(v->output_fd, false);
if (r < 0)
return varlink_log_errno(v, r, "Failed to set output fd to blocking mode: %m");
}
/* For bidirectional sockets (input_fd == output_fd), dup the fd so that callers
* always get two independent fds they can close separately. */
if (v->input_fd == v->output_fd) {
v->output_fd = fcntl(v->input_fd, F_DUPFD_CLOEXEC, 3);
if (v->output_fd < 0)
return varlink_log_errno(v, errno, "Failed to dup upgraded connection fd: %m");
}
/* Hand out requested fds, shut down unwanted directions. */
if (ret_input_fd)
*ret_input_fd = TAKE_FD(v->input_fd);
else {
(void) shutdown(v->input_fd, SHUT_RD);
v->input_fd = safe_close(v->input_fd);
}
if (ret_output_fd)
*ret_output_fd = TAKE_FD(v->output_fd);
else {
(void) shutdown(v->output_fd, SHUT_WR);
v->output_fd = safe_close(v->output_fd);
}
return 0;
}
_public_ int sd_varlink_call_and_upgrade(
sd_varlink *v,
const char *method,
@ -2534,12 +2436,45 @@ _public_ int sd_varlink_call_and_upgrade(
goto finish;
}
/* Even if setting up the fds fails we must disconnect: the server already accepted the
* upgrade, so the other side is speaking raw protocol while we expect JSON. */
r = varlink_handle_upgrade_fds(v, ret_input_fd, ret_output_fd);
/* Pass the connection fds to the caller, it owns them now. Reset to blocking mode
* since callers of the upgraded protocol will generally expect normal blocking
* semantics. */
r = fd_nonblock(v->input_fd, false);
if (r < 0) {
varlink_set_state(v, VARLINK_DISCONNECTED);
goto finish;
varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m");
goto disconnect;
}
if (v->input_fd != v->output_fd) {
r = fd_nonblock(v->output_fd, false);
if (r < 0) {
varlink_log_errno(v, r, "Failed to set output fd to blocking mode: %m");
goto disconnect;
}
}
/* Hand out the fds to the caller. When the caller doesn't want one direction, shut it
* down: but avoid closing the underlying fd if the other direction still needs it
* (i.e. when input_fd == output_fd). */
bool same_fd = v->input_fd == v->output_fd;
if (ret_input_fd)
*ret_input_fd = TAKE_FD(v->input_fd);
else {
(void) shutdown(v->input_fd, SHUT_RD);
if (same_fd && ret_output_fd)
TAKE_FD(v->input_fd); /* don't close yet, output branch needs it */
else
v->input_fd = safe_close(v->input_fd);
}
if (ret_output_fd)
*ret_output_fd = TAKE_FD(v->output_fd);
else {
(void) shutdown(v->output_fd, SHUT_WR);
if (same_fd && ret_input_fd)
TAKE_FD(v->output_fd);
else
v->output_fd = safe_close(v->output_fd);
}
varlink_set_state(v, VARLINK_DISCONNECTED);
@ -2553,6 +2488,10 @@ _public_ int sd_varlink_call_and_upgrade(
return 1;
disconnect:
/* If we fail after the server already accepted the upgrade, nothing can be done but disconnect.
* The other side is speaking raw protocol while we expect JSON. */
varlink_set_state(v, VARLINK_DISCONNECTED);
finish:
v->protocol_upgrade = false;
assert(v->n_pending == 1);
@ -2867,97 +2806,6 @@ _public_ int sd_varlink_replyb(sd_varlink *v, ...) {
return sd_varlink_reply(v, parameters);
}
_public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd) {
int r;
assert_return(v, -EINVAL);
assert_return(ret_input_fd || ret_output_fd, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
if (!IN_SET(v->state,
VARLINK_PROCESSING_METHOD,
VARLINK_PENDING_METHOD))
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
/* Verify the client actually requested a protocol upgrade */
if (!v->protocol_upgrade)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
"Method call did not request a protocol upgrade.");
/* Ensure we did not buffer any data beyond the upgrade request. Check this before sending the
* reply so that we can return a normal error (the framework will send an error reply to the
* client). In normal operation this cannot happen because the client waits for our reply before
* sending raw data, and we set protocol_upgrade=true in dispatch to limit subsequent reads to
* single bytes. But a misbehaving client could pipeline data early. */
if (v->input_buffer_size > 0)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADMSG),
"Unexpected buffered data from client during protocol upgrade.");
/* Validate parameters BEFORE sanitization (same validation as sd_varlink_reply(), but upgrade
* replies never carry the 'continues' flag so we always pass flags=0) */
if (v->current_method) {
const char *bad_field = NULL;
r = varlink_idl_validate_method_reply(v->current_method, parameters, /* flags= */ 0, &bad_field);
if (r < 0)
/* Please adjust test/units/end.sh when updating the log message. */
varlink_log_errno(v, r, "Return parameters for method reply %s() didn't pass validation on field '%s', ignoring: %m",
v->current_method->name, strna(bad_field));
}
_cleanup_(sd_json_variant_unrefp) sd_json_variant *m = NULL;
r = sd_json_buildo(&m, JSON_BUILD_PAIR_VARIANT_NON_EMPTY("parameters", parameters));
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
r = varlink_enqueue_json(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
/* Flush the reply to the socket before stealing the fds. The reply must be fully written
* before the caller starts speaking the upgraded protocol. */
for (;;) {
r = varlink_write(v);
if (r < 0) {
varlink_log_errno(v, r, "Failed to flush reply: %m");
goto disconnect;
}
if (v->output_buffer_size == 0 && !v->output_queue)
break;
if (v->write_disconnected) {
r = varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET),
"Write disconnected during upgrade reply flush.");
goto disconnect;
}
r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY);
if (ERRNO_IS_NEG_TRANSIENT(r))
continue;
if (r < 0) {
varlink_log_errno(v, r, "Failed to wait for writable fd: %m");
goto disconnect;
}
assert(r > 0);
handle_revents(v, r);
}
/* Detach from the event loop before stealing the fds */
varlink_detach_event_sources(v);
/* Now hand the original FDs over to the caller, from this point on we have nothing to do with the
* connection anymore, it's up to the caller and we close the connection below */
r = varlink_handle_upgrade_fds(v, ret_input_fd, ret_output_fd);
disconnect:
/* This also sets the connection state to VARLINK_DISCONNECTED */
sd_varlink_close(v);
return r < 0 ? r : 1;
}
_public_ int sd_varlink_reset_fds(sd_varlink *v) {
assert_return(v, -EINVAL);
@ -3752,8 +3600,7 @@ _public_ int sd_varlink_server_new(sd_varlink_server **ret, sd_varlink_server_fl
SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT|
SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT|
SD_VARLINK_SERVER_HANDLE_SIGINT|
SD_VARLINK_SERVER_HANDLE_SIGTERM|
SD_VARLINK_SERVER_UPGRADABLE)) == 0, -EINVAL);
SD_VARLINK_SERVER_HANDLE_SIGTERM)) == 0, -EINVAL);
s = new(sd_varlink_server, 1);
if (!s)
@ -4710,7 +4557,6 @@ _public_ int sd_varlink_error_to_errno(const char *error, sd_json_variant *param
{ SD_VARLINK_ERROR_INVALID_PARAMETER, -EINVAL },
{ SD_VARLINK_ERROR_PERMISSION_DENIED, -EACCES },
{ SD_VARLINK_ERROR_EXPECTED_MORE, -EBADE },
{ SD_VARLINK_ERROR_EXPECTED_UPGRADE, -EPROTOTYPE },
};
int r;

View File

@ -52,9 +52,7 @@ __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_symbol_type_t) {
__extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_symbol_flags_t) {
SD_VARLINK_SUPPORTS_MORE = 1 << 0, /* Call supports "more" flag */
SD_VARLINK_REQUIRES_MORE = 1 << 1, /* Call requires "more" flag */
SD_VARLINK_SUPPORTS_UPGRADE = 1 << 2, /* Call supports "upgrade" flag */
SD_VARLINK_REQUIRES_UPGRADE = 1 << 3, /* Call requires "upgrade" flag */
_SD_VARLINK_SYMBOL_FLAGS_MAX = (1 << 4) - 1,
_SD_VARLINK_SYMBOL_FLAGS_MAX = (1 << 2) - 1,
_SD_VARLINK_SYMBOL_FLAGS_INVALID = -EINVAL,
_SD_ENUM_FORCE_S64(SD_VARLINK_SYMBOL_FLAGS)
} sd_varlink_symbol_flags_t;

View File

@ -72,7 +72,6 @@ __extension__ typedef enum _SD_ENUM_TYPE_S64(sd_varlink_server_flags_t) {
SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT = 1 << 7, /* Reject input messages with fds if fd passing is disabled (needs kernel v6.16+) */
SD_VARLINK_SERVER_HANDLE_SIGINT = 1 << 8, /* Exit cleanly on SIGINT */
SD_VARLINK_SERVER_HANDLE_SIGTERM = 1 << 9, /* Exit cleanly on SIGTERM */
SD_VARLINK_SERVER_UPGRADABLE = 1 << 10, /* Server has upgrade methods; avoid consuming post-upgrade data during reads */
_SD_ENUM_FORCE_S64(SD_VARLINK_SERVER)
} sd_varlink_server_flags_t;
@ -138,9 +137,8 @@ int sd_varlink_callb(sd_varlink *v, const char *method, sd_json_variant **ret_pa
sd_varlink_callb((v), (method), (ret_parameters), (ret_error_id), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
/* Send method call with upgrade, wait for reply, then steal the connection fds for raw I/O.
* For bidirectional sockets ret_input_fd and ret_output_fd will be separate (dupped) fds
* referring to the same underlying socket. ret_parameters and ret_error_id are borrowed
* references valid only until v is closed or unreffed.
* For bidirectional sockets ret_input_fd and ret_output_fd will be the same fd.
* ret_parameters and ret_error_id are borrowed references valid only until v is closed or unreffed.
* Returns > 0 if the connection was upgraded, 0 if a Varlink error occurred (and ret_error_id was set),
* or < 0 on local failure. */
int sd_varlink_call_and_upgrade(sd_varlink *v, const char *method, sd_json_variant *parameters, sd_json_variant **ret_parameters, const char **ret_error_id, int *ret_input_fd, int *ret_output_fd);
@ -170,18 +168,6 @@ int sd_varlink_replyb(sd_varlink *v, ...);
#define sd_varlink_replybo(v, ...) \
sd_varlink_replyb((v), SD_JSON_BUILD_OBJECT(__VA_ARGS__))
/* Send a final reply to an upgrade request, then steal the connection fds for raw I/O.
* The fds are returned in blocking mode. The varlink connection is disconnected afterwards.
* For bidirectional sockets ret_input_fd and ret_output_fd will be separate (dupped) fds
* referring to the same underlying socket. For pipe pairs (e.g. ssh-exec transport) they
* will differ. Either ret pointer may be NULL.
*
* Note: this call synchronously blocks until the reply is flushed to the socket. This is
* usually fine as flush is fast but a misbehaving/adversary client that stops reading
* could stall the caller. So do not use in servers that multiplex many varlink
* connections. */
int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parameters, int *ret_input_fd, int *ret_output_fd);
/* Enqueue a (final) error */
int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_variant *parameters);
int sd_varlink_errorb(sd_varlink *v, const char *error_id, ...);
@ -336,7 +322,6 @@ _SD_DEFINE_POINTER_CLEANUP_FUNC(sd_varlink_server, sd_varlink_server_unref);
#define SD_VARLINK_ERROR_INVALID_PARAMETER "org.varlink.service.InvalidParameter"
#define SD_VARLINK_ERROR_PERMISSION_DENIED "org.varlink.service.PermissionDenied"
#define SD_VARLINK_ERROR_EXPECTED_MORE "org.varlink.service.ExpectedMore"
#define SD_VARLINK_ERROR_EXPECTED_UPGRADE "org.varlink.service.ExpectedUpgrade"
_SD_END_DECLARATIONS;

View File

@ -11,11 +11,9 @@
#include "sd-varlink.h"
#include "fd-util.h"
#include "io-util.h"
#include "json-util.h"
#include "memfd-util.h"
#include "rm-rf.h"
#include "socket-util.h"
#include "tests.h"
#include "tmpfile-util.h"
#include "varlink-util.h"
@ -727,7 +725,7 @@ static int reply_notify_then_error(sd_varlink *link, sd_json_variant *parameters
TEST(notify_then_error) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_default(&e));
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
ASSERT_OK(sd_varlink_server_new(&s, 0));
@ -754,186 +752,4 @@ TEST(notify_then_error) {
ASSERT_OK(sd_event_loop(e));
}
static int method_upgrade(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
_cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
int r;
ASSERT_TRUE(FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE));
r = sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd);
if (r < 0)
return r;
/* After upgrade, do raw I/O: read until EOF, reverse, write back.
* The client shuts down its write side after sending, so we get a clean EOF. */
char buf[64] = {};
ssize_t n = ASSERT_OK(loop_read(input_fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
ASSERT_GT(n, 0);
/* Reverse the received bytes */
for (ssize_t i = 0; i < n / 2; i++)
SWAP_TWO(buf[i], buf[n - 1 - i]);
ASSERT_OK(loop_write(output_fd, buf, n));
return 0;
}
static int method_upgrade_without_flag(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
int input_fd = -EBADF, output_fd = -EBADF;
/* Calling reply_and_upgrade without the client requesting it should fail with -EPROTO */
ASSERT_ERROR(sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd), EPROTO);
sd_event_exit(sd_varlink_get_event(link), EXIT_SUCCESS);
return sd_varlink_reply(link, /* parameters= */ NULL);
}
static void *upgrade_thread(void *arg) {
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c = NULL;
_cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
sd_json_variant *o = NULL;
const char *error_id = NULL;
ASSERT_OK(sd_varlink_connect_address(&c, arg));
ASSERT_OK(sd_varlink_set_description(c, "upgrade-client"));
ASSERT_OK(sd_varlink_call_and_upgrade(c, "io.test.Upgrade", /* parameters= */ NULL, &o, &error_id, &input_fd, &output_fd));
ASSERT_NULL(error_id);
ASSERT_GE(input_fd, 0);
ASSERT_GE(output_fd, 0);
ASSERT_NE(input_fd, output_fd); /* library dups for bidirectional sockets */
/* Send a test string, shut down write side so server sees EOF, then read the reversed reply */
static const char msg[] = "Hello!";
ASSERT_OK(loop_write(output_fd, msg, strlen(msg)));
ASSERT_OK_ERRNO(shutdown(output_fd, SHUT_WR));
char buf[64] = {};
ssize_t n = ASSERT_OK(loop_read(input_fd, buf, strlen(msg), /* do_poll= */ true));
ASSERT_EQ((size_t) n, strlen(msg));
ASSERT_STREQ(buf, "!olleH");
/* Also test that a regular call (without upgrade flag) correctly rejects reply_and_upgrade on
* the server side, and still works as a normal call */
_cleanup_(sd_varlink_flush_close_unrefp) sd_varlink *c2 = NULL;
ASSERT_OK(sd_varlink_connect_address(&c2, arg));
ASSERT_OK(sd_varlink_set_description(c2, "no-upgrade-client"));
ASSERT_OK(sd_varlink_call(c2, "io.test.UpgradeWithoutFlag", /* parameters= */ NULL, &o, &error_id));
ASSERT_NULL(error_id);
return NULL;
}
TEST(upgrade) {
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
_cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
pthread_t t;
const char *sp;
ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
sp = strjoina(tmpdir, "/socket");
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE));
ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-server"));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.UpgradeWithoutFlag", method_upgrade_without_flag));
ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
ASSERT_OK(-pthread_create(&t, NULL, upgrade_thread, (void*) sp));
/* Run the event loop until no more connections (the thread will disconnect when done) */
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(-pthread_join(t, NULL));
}
static int method_upgrade_and_exit(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
sd_event *event = ASSERT_PTR(userdata);
int r = method_upgrade(link, parameters, flags, /* userdata= */ NULL);
/* Exit the event loop after the upgrade is handled. We can't use sd_varlink_get_event()
* here because the connection is already disconnected after reply_and_upgrade. */
(void) sd_event_exit(event, r < 0 ? r : EXIT_SUCCESS);
return r;
}
static void *upgrade_pipelining_thread(void *arg) {
union sockaddr_union sa = {};
_cleanup_close_ int fd = -EBADF;
/* Connect a raw socket and pipeline: upgrade JSON + \0 + raw data in a single write.
* This tests that the server's byte-by-byte reading (SD_VARLINK_SERVER_UPGRADABLE)
* doesn't consume the raw data into the varlink input buffer. */
fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
ASSERT_FD(fd);
int addrlen = sockaddr_un_set_path(&sa.un, arg);
ASSERT_OK(addrlen);
ASSERT_OK_ERRNO(connect(fd, &sa.sa, addrlen));
/* Build pipelined message: upgrade JSON + \0 + raw payload, all in one write */
static const char upgrade_msg[] = "{\"method\":\"io.test.Upgrade\",\"upgrade\":true}";
static const char raw_payload[] = "Pipelined!";
char send_buf[sizeof(upgrade_msg) + sizeof(raw_payload)]; /* includes \0 from upgrade_msg as delimiter */
memcpy(send_buf, upgrade_msg, sizeof(upgrade_msg)); /* copies trailing \0 = varlink delimiter */
memcpy(send_buf + sizeof(upgrade_msg), raw_payload, sizeof(raw_payload) - 1);
size_t total = sizeof(upgrade_msg) + strlen(raw_payload);
ASSERT_OK(loop_write(fd, send_buf, total));
/* Shut down write side so server's method_upgrade sees EOF after raw payload */
ASSERT_OK_ERRNO(shutdown(fd, SHUT_WR));
/* Read everything: upgrade reply (JSON + \0) + reversed raw payload. The server closes
* the connection after writing, so loop_read() reads until EOF and gets it all. */
char buf[256] = {};
ssize_t n = ASSERT_OK(loop_read(fd, buf, sizeof(buf) - 1, /* do_poll= */ true));
ASSERT_GT(n, 0);
/* Split at the \0 delimiter between JSON reply and raw payload */
char *delim = memchr(buf, 0, n);
ASSERT_NOT_NULL(delim);
char *raw = delim + 1;
size_t raw_size = (size_t) n - (size_t)(raw - buf);
ASSERT_EQ(raw_size, strlen(raw_payload));
ASSERT_STREQ(strndupa_safe(raw, raw_size), "!denilepiP");
return NULL;
}
TEST(upgrade_pipelining) {
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
_cleanup_(rm_rf_physical_and_freep) char *tmpdir = NULL;
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
pthread_t t;
const char *sp;
ASSERT_OK(mkdtemp_malloc("/tmp/varlink-test-XXXXXX", &tmpdir));
sp = strjoina(tmpdir, "/socket");
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_varlink_server_new(&s, SD_VARLINK_SERVER_UPGRADABLE|SD_VARLINK_SERVER_INHERIT_USERDATA));
ASSERT_OK(sd_varlink_server_set_description(s, "upgrade-pipelining-server"));
ASSERT_OK(sd_varlink_server_bind_method(s, "io.test.Upgrade", method_upgrade_and_exit));
ASSERT_OK(sd_varlink_server_listen_address(s, sp, 0600));
ASSERT_OK(sd_varlink_server_attach_event(s, e, 0));
sd_varlink_server_set_userdata(s, e);
ASSERT_OK(-pthread_create(&t, NULL, upgrade_pipelining_thread, (void*) sp));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(-pthread_join(t, NULL));
}
DEFINE_TEST_MAIN(LOG_DEBUG);

View File

@ -672,6 +672,15 @@ static int varlink_call_and_upgrade(const char *url, const char *method, sd_json
if (!isempty(error_id))
return log_error_errno(SYNTHETIC_ERRNO(EBADE), "Upgrade via %s() failed with error: %s", method, error_id);
/* For bidirectional sockets input_fd == output_fd. Dup immediately so that _cleanup_close_
* on both variables can never double-close the same fd. Note that on fcntl() failure
* output_fd is overwritten with -1, so only input_fd holds the real fd at cleanup time. */
if (input_fd == output_fd) {
output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3);
if (output_fd < 0)
return log_error_errno(errno, "Failed to dup upgraded connection fd: %m");
}
if (!strv_isempty(exec_cmdline)) {
/* --exec mode: place the upgraded connection on stdin/stdout so that the child
* process can just read/write naturally. */
@ -1160,155 +1169,6 @@ static int verb_list_registry(int argc, char *argv[], uintptr_t _data, void *use
return 0;
}
/* Build a minimal IDL from a qualified method name so that introspection works. The parsed interface is
* returned to the caller who must keep it alive for the lifetime of the server
* (sd_varlink_server_add_interface() borrows the pointer). */
static int varlink_server_add_interface_from_method(sd_varlink_server *s, const char *method, sd_varlink_interface **ret_interface) {
assert(s);
assert(method);
assert(ret_interface);
const char *dot = strrchr(method, '.');
assert(dot);
_cleanup_free_ char *interface_name = strndup(method, dot - method);
if (!interface_name)
return log_oom();
/* Note that we do not need to put the upgrade flag comment here, it is added automatically
* by varlink_idl_format_symbol() because of the SD_VARLINK_REQUIRES_UPGRADE flag. */
_cleanup_free_ char *idl_text = strjoin(
"interface ", interface_name, "\n"
"\n"
"method ", dot + 1, " () -> ()\n");
if (!idl_text)
return log_oom();
_cleanup_(sd_varlink_interface_freep) sd_varlink_interface *iface = NULL;
int r = sd_varlink_idl_parse(idl_text, /* reterr_line= */ NULL, /* reterr_column= */ NULL, &iface);
if (r < 0)
return log_error_errno(r, "Failed to parse IDL for method '%s': %m", method);
/* Mark the method as requiring the upgrade flag so introspection shows the annotation */
assert(iface->symbols[0] && iface->symbols[0]->symbol_type == SD_VARLINK_METHOD);
((sd_varlink_symbol*) iface->symbols[0])->symbol_flags |= SD_VARLINK_REQUIRES_UPGRADE;
r = sd_varlink_server_add_interface(s, iface);
if (r < 0)
return r;
*ret_interface = TAKE_PTR(iface);
return 0;
}
static int method_serve_upgrade(sd_varlink *link, sd_json_variant *parameters, sd_varlink_method_flags_t flags, void *userdata) {
char **exec_cmdline = ASSERT_PTR(userdata);
_cleanup_close_ int input_fd = -EBADF, output_fd = -EBADF;
int r;
if (!FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE))
return sd_varlink_error(link, SD_VARLINK_ERROR_EXPECTED_UPGRADE, NULL);
r = sd_varlink_reply_and_upgrade(link, /* parameters= */ NULL, &input_fd, &output_fd);
if (r < 0)
return log_error_errno(r, "Failed to upgrade connection: %m");
/* Copy exec_cmdline before forking: pidref_safe_fork() calls rename_process() which
* overwrites the argv area that exec_cmdline points into. */
_cleanup_strv_free_ char **cmdline_copy = strv_copy(exec_cmdline);
if (!cmdline_copy)
return log_oom();
r = pidref_safe_fork_full(
"(serve)",
(int[]) { input_fd, output_fd, STDERR_FILENO },
/* except_fds= */ NULL, /* n_except_fds= */ 0,
FORK_RESET_SIGNALS|FORK_CLOSE_ALL_FDS|FORK_REARRANGE_STDIO|FORK_DETACH|FORK_LOG,
/* ret= */ NULL);
if (r < 0)
return r;
if (r == 0) {
execvp(cmdline_copy[0], cmdline_copy);
log_error_errno(errno, "Failed to execute '%s': %m", cmdline_copy[0]);
_exit(EXIT_FAILURE);
}
return 0;
}
VERB(verb_serve, "serve", "METHOD CMDLINE…", 3, VERB_ANY, 0, "Serve a command via varlink protocol upgrade");
static int verb_serve(int argc, char *argv[], uintptr_t _data, void *userdata) {
_cleanup_(sd_varlink_server_unrefp) sd_varlink_server *s = NULL;
_cleanup_(sd_event_unrefp) sd_event *event = NULL;
const char *method;
char **exec_cmdline;
int r, n;
assert(argc >= 3); /* Guaranteed by verb dispatch table */
method = argv[1];
exec_cmdline = argv + 2;
r = varlink_idl_qualified_symbol_name_is_valid(method);
if (r < 0)
return log_error_errno(r, "Failed to validate method name '%s': %m", method);
if (r == 0)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Not a valid qualified method name: '%s'", method);
/* Require socket activation */
n = sd_listen_fds(/* unset_environment= */ true);
if (n < 0)
return log_error_errno(n, "Failed to determine passed file descriptors: %m");
if (n == 0)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "No file descriptors passed via socket activation.");
if (n > 1)
return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Expected exactly one socket activation fd, got %d.", n);
r = sd_event_default(&event);
if (r < 0)
return log_error_errno(r, "Failed to get event loop: %m");
r = sd_varlink_server_new(&s, SD_VARLINK_SERVER_INHERIT_USERDATA|SD_VARLINK_SERVER_UPGRADABLE);
if (r < 0)
return log_error_errno(r, "Failed to allocate varlink server: %m");
_cleanup_free_ char *description = strjoin("serve:", method);
if (!description)
return log_oom();
r = sd_varlink_server_set_description(s, description);
if (r < 0)
return log_error_errno(r, "Failed to set server description: %m");
r = sd_varlink_server_bind_method(s, method, method_serve_upgrade);
if (r < 0)
return log_error_errno(r, "Failed to bind method '%s': %m", method);
_cleanup_(sd_varlink_interface_freep) sd_varlink_interface *iface = NULL;
r = varlink_server_add_interface_from_method(s, method, &iface);
if (r < 0)
return log_error_errno(r, "Failed to add interface for method '%s': %m", method);
sd_varlink_server_set_userdata(s, exec_cmdline);
r = sd_varlink_server_listen_fd(s, SD_LISTEN_FDS_START);
if (r < 0)
return log_error_errno(r, "Failed to listen on socket activation fd: %m");
r = sd_varlink_server_attach_event(s, event, SD_EVENT_PRIORITY_NORMAL);
if (r < 0)
return log_error_errno(r, "Failed to attach varlink server to event loop: %m");
(void) sd_notify(/* unset_environment= */ false, "READY=1");
r = sd_event_loop(event);
if (r < 0)
return log_error_errno(r, "Failed to run event loop: %m");
return 0;
}
static int run(int argc, char *argv[]) {
int r;

View File

@ -215,22 +215,6 @@ static int verify_vc_display_mode(int fd) {
return mode != KD_TEXT ? -EBUSY : 0;
}
static int verify_vc_support_font(int fd) {
struct console_font_op cfo = {
.op = KD_FONT_OP_GET,
.width = UINT_MAX,
.height = UINT_MAX,
.charcount = UINT_MAX,
};
assert(fd >= 0);
if (ioctl(fd, KDFONTOP, &cfo) < 0)
return ERRNO_IS_NOT_SUPPORTED(errno) ? 0 : -errno;
return 1;
}
static int toggle_utf8_vc(const char *name, int fd, bool utf8) {
int r;
struct termios tc = {};
@ -331,7 +315,7 @@ static int keyboard_load_and_wait(const char *vc, Context *c, bool utf8) {
return 1; /* Report that we did something */
}
static int font_load_and_wait(int fd, const char *vc, Context *c) {
static int font_load_and_wait(const char *vc, Context *c) {
const char* args[9];
unsigned i = 0;
int r;
@ -356,16 +340,6 @@ static int font_load_and_wait(int fd, const char *vc, Context *c) {
return 0; /* Report that we skipped this */
}
/* May be called on the dummy console (e.g. during keymap setup with fbcon deferred takeover). Font
* changes are not supported here and will fail. */
r = verify_vc_support_font(fd);
if (r < 0)
return log_error_errno(r, "Failed to check '%s' has font support: %m", vc);
if (r == 0) {
log_notice("'%s' has no font support, skipping.", vc);
return 0; /* Report that we skipped this */
}
args[i++] = KBD_SETFONT;
args[i++] = "-C";
args[i++] = vc;
@ -397,8 +371,9 @@ static int font_load_and_wait(int fd, const char *vc, Context *c) {
_exit(EXIT_FAILURE);
}
/* setfont returns EX_OSERR when ioctl(KDFONTOP/PIO_FONTX/PIO_FONTX) fails. Let's be generous and not
* treat this as an error. */
/* setfont returns EX_OSERR when ioctl(KDFONTOP/PIO_FONTX/PIO_FONTX) fails. This might mean various
* things, but in particular lack of a graphical console. Let's be generous and not treat this as an
* error. */
r = pidref_wait_for_terminate_and_check(KBD_SETFONT, &pidref, WAIT_LOG_ABNORMAL);
if (r < 0)
return r; /* WAIT_LOG_ABNORMAL means we already have logged about these kinds of errors */
@ -429,7 +404,7 @@ static void setup_remaining_vcs(int src_fd, unsigned src_idx, bool utf8) {
struct unimapdesc unimapd;
_cleanup_free_ struct unipair* unipairs = NULL;
_cleanup_free_ void *fontbuf = NULL;
int r;
int log_level = LOG_WARNING, r;
assert(src_fd >= 0);
@ -440,7 +415,14 @@ static void setup_remaining_vcs(int src_fd, unsigned src_idx, bool utf8) {
/* get metadata of the current font (width, height, count) */
r = ioctl(src_fd, KDFONTOP, &cfo);
if (r < 0) {
log_warning_errno(errno, "KD_FONT_OP_GET failed while trying to get the font metadata: %m");
/* We might be called to operate on the dummy console (to setup keymap
* mainly) when fbcon deferred takeover is used for example. In such case,
* setting font is not supported and is expected to fail. */
if (errno == ENOSYS)
log_level = LOG_DEBUG;
log_full_errno(log_level, errno,
"KD_FONT_OP_GET failed while trying to get the font metadata: %m");
} else {
/* verify parameter sanity first */
if (cfo.width > 32 || cfo.height > 32 || cfo.charcount > 512)
@ -476,7 +458,7 @@ static void setup_remaining_vcs(int src_fd, unsigned src_idx, bool utf8) {
}
if (cfo.op != KD_FONT_OP_SET)
log_warning("Fonts will not be copied to remaining consoles");
log_full(log_level, "Fonts will not be copied to remaining consoles");
for (unsigned i = 1; i <= 63; i++) {
char ttyname[sizeof("/dev/tty63")];
@ -674,7 +656,7 @@ static int run(int argc, char **argv) {
(void) toggle_utf8_vc(vc, fd, utf8);
int setfont_status = font_load_and_wait(fd, vc, &c);
int setfont_status = font_load_and_wait(vc, &c);
int loadkeys_status = keyboard_load_and_wait(vc, &c, utf8);
if (idx > 0) {

View File

@ -257,9 +257,6 @@ systemd-run --wait --pipe --user --machine testuser@ \
varlinkctl --more call "/run/user/$testuser_uid/systemd/io.systemd.Manager" io.systemd.Unit.List '{}'
# test --upgrade (protocol upgrade)
# The basic --upgrade proxy test is covered by the "varlinkctl serve" tests below (which use
# serve+rev/gunzip as the server). The tests here exercise features that need the Python
# server: file-input (defer fallback), ssh-exec transport (pipe pairs) and --exec mode.
UPGRADE_SOCKET="$(mktemp -d)/upgrade.sock"
UPGRADE_SERVER="$(mktemp)"
cat >"$UPGRADE_SERVER" <<'PYEOF'
@ -323,6 +320,15 @@ if sock:
PYEOF
chmod +x "$UPGRADE_SERVER"
# Start the server in the background, wait for readiness via sd_notify
systemd-notify --fork -q -- python3 "$UPGRADE_SERVER" "$UPGRADE_SOCKET"
# Test proxy mode: pipe data through --upgrade, passing parameters and validate
result="$(echo "hello world" | varlinkctl call --upgrade "unix:$UPGRADE_SOCKET" io.systemd.test.Reverse '{"foo":"bar"}')"
echo "$result" | grep "<<< UPGRADED >>>" >/dev/null
echo "$result" | grep '"foo": "bar"' >/dev/null
echo "$result" | grep "dlrow olleh" >/dev/null
# Test --upgrade with stdin redirected from a regular file (epoll can't poll regular files,
# so this exercises the sd_event_add_defer fallback path)
UPGRADE_SOCKET2="$(mktemp -d)/upgrade.sock"
@ -364,39 +370,3 @@ rm -f "$EXEC_RESULT"
rm -f "$UPGRADE_SOCKET" "$UPGRADE_SOCKET2" "$UPGRADE_SERVER" /tmp/test-upgrade-input
rm -rf "$(dirname "$UPGRADE_SOCKET")" "$(dirname "$UPGRADE_SOCKET2")"
# Test varlinkctl serve: expose a stdio command via varlink protocol upgrade with socket activation.
# This is the "inetd for varlink" pattern: any stdio tool becomes a varlink service.
SERVE_SOCKET="$(mktemp -d)/serve.sock"
# Test 1: serve rev: proves bidirectional data flow through the upgrade
SERVE_PID=$(systemd-notify --fork -- \
systemd-socket-activate -l "$SERVE_SOCKET" -- \
varlinkctl serve io.systemd.test.Reverse rev)
# Verify introspection works on the serve endpoint and shows the upgrade annotation
varlinkctl introspect "unix:$SERVE_SOCKET" io.systemd.test | grep "method Reverse" >/dev/null
varlinkctl introspect "unix:$SERVE_SOCKET" io.systemd.test | grep "Requires 'upgrade' flag" >/dev/null
result="$(echo "hello world" | varlinkctl call --upgrade "unix:$SERVE_SOCKET" io.systemd.test.Reverse '{}')"
echo "$result" | grep "dlrow olleh" >/dev/null
kill "$SERVE_PID" 2>/dev/null || true
wait "$SERVE_PID" 2>/dev/null || true
rm -f "$SERVE_SOCKET"
# Test 2: decompress via serve: the "sandboxed decompressor" use-case (the real thing would be a proper
# unit with real sandboxing).
# Pipe gzip-compressed data through a varlinkctl serve + gunzip endpoint and verify round-trip.
SERVE_PID=$(systemd-notify --fork -- \
systemd-socket-activate -l "$SERVE_SOCKET" -- \
varlinkctl serve io.systemd.Compress.Decompress gunzip)
SERVE_TMPDIR="$(mktemp -d)"
echo "untrusted data decompressed safely via varlink serve" | gzip > "$SERVE_TMPDIR/compressed.gz"
result="$(varlinkctl call --upgrade "unix:$SERVE_SOCKET" io.systemd.Compress.Decompress '{}' < "$SERVE_TMPDIR/compressed.gz")"
echo "$result" | grep "untrusted data decompressed safely" >/dev/null
kill "$SERVE_PID" 2>/dev/null || true
wait "$SERVE_PID" 2>/dev/null || true
rm -f "$SERVE_SOCKET"
rm -rf "$(dirname "$SERVE_SOCKET")" "$SERVE_TMPDIR"