Skip to content

Commit

Permalink
Merge pull request #15 from mkindahl/copyable-worker-args
Browse files Browse the repository at this point in the history
Pass worker data correctly
  • Loading branch information
mkindahl authored Mar 5, 2023
2 parents 52faea7 + 4678a79 commit ae600eb
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 25 deletions.
1 change: 0 additions & 1 deletion expected/inval.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ CREATE TABLE db_worker.cpu(_time timestamptz, _tags jsonb, _fields jsonb);
\set VERBOSITY terse
\x on
SELECT pg_sleep(1) FROM worker_launch('db_worker', 4711::text);
NOTICE: background worker started
-[ RECORD 1 ]
pg_sleep |

Expand Down
1 change: 0 additions & 1 deletion expected/worker.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ CREATE TABLE db_worker.system(_time timestamp, host text, uptime int, _tags json
\set VERBOSITY terse
\x on
SELECT pg_sleep(1) FROM worker_launch('db_worker', 4711::text);
NOTICE: background worker started
-[ RECORD 1 ]
pg_sleep |

Expand Down
15 changes: 11 additions & 4 deletions influx.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,19 @@ static void StartBackgroundWorkers(const char *database_name,
const char *role_name,
const char *service_name, int worker_count) {
MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext);
WorkerArgs args = {.namespace = schema_name ? pstrdup(schema_name) : NULL,
.database = database_name ? pstrdup(database_name) : NULL,
.role = role_name ? pstrdup(role_name) : NULL,
.service = service_name ? pstrdup(service_name) : NULL};
WorkerArgs args = {0};
int i;
BackgroundWorker worker;

if (schema_name)
strncpy(args.namespace, schema_name, sizeof(args.namespace));
if (database_name)
strncpy(args.database, database_name, sizeof(args.database));
if (role_name)
strncpy(args.role, role_name, sizeof(args.role));
if (service_name)
strncpy(args.service, service_name, sizeof(args.service));

elog(LOG, "starting influx workers");

InfluxWorkerInit(&worker, &args);
Expand Down
2 changes: 0 additions & 2 deletions metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ static void PrepareRecord(Relation rel, Oid *argtypes, PreparedInsert record) {
const Oid relid = RelationGetRelid(rel);
int i;

elog(NOTICE, "preparing statement for %s", SPI_getrelname(rel));

/* Using the tuple descriptor and the parsed package, build the
* insert statement and collect the null array for the prepare
* call. */
Expand Down
4 changes: 2 additions & 2 deletions network.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ struct SocketMethod UdpSendSocket = {
int SocketPort(struct sockaddr* addr, socklen_t addrlen) {
switch (addr->sa_family) {
case AF_INET:
return ((struct sockaddr_in*)addr)->sin_port;
return ntohs(((struct sockaddr_in*)addr)->sin_port);
case AF_INET6:
return ((struct sockaddr_in6*)addr)->sin6_port;
return ntohs(((struct sockaddr_in6*)addr)->sin6_port);
default:
return -1;
}
Expand Down
25 changes: 14 additions & 11 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,13 @@ void InfluxWorkerMain(Datum arg) {
crash. */
namespace_id = get_namespace_oid(args->namespace, false);

ereport(LOG,
(errmsg("worker listening on port %d (service %s)",
SocketPort((struct sockaddr *)&sockaddr, sizeof(sockaddr)),
args->service),
errdetail("database=%s, namespace=%s, user=%s", args->database,
args->namespace, args->role)));
ereport(
LOG,
(errmsg("worker listening on port %d",
SocketPort((struct sockaddr *)&sockaddr, sizeof(sockaddr))),
errdetail(
"Connected to database %s as user %s. Metrics written to schema %s.",
args->database, args->role, args->namespace)));

pgstat_report_activity(STATE_RUNNING, "reading events");

Expand Down Expand Up @@ -255,15 +256,18 @@ Datum worker_launch(PG_FUNCTION_ARGS) {
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
WorkerArgs args = {.service = service};
WorkerArgs args = {0};

/* Check that we have a valid namespace id */
if (get_namespace_name(nspid) == NULL)
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_SCHEMA),
errmsg("schema with OID %d does not exist", nspid)));

args.namespace = get_namespace_name(nspid);
args.database = get_database_name(MyDatabaseId);
strncpy(args.role, GetUserNameFromId(GetUserId(), true), sizeof(args.role));
strncpy(args.service, service, sizeof(args.service));
strncpy(args.namespace, get_namespace_name(nspid), sizeof(args.namespace));
strncpy(args.database, get_database_name(MyDatabaseId),
sizeof(args.database));

InfluxWorkerInit(&worker, &args);

Expand Down Expand Up @@ -293,8 +297,7 @@ Datum worker_launch(PG_FUNCTION_ARGS) {
errhint("Kill all remaining database processes and restart the "
"database.")));

ereport(NOTICE,
(errmsg("background worker started"), errdetail("pid=%d", pid)));
ereport(LOG, (errmsg("background worker started"), errdetail("pid=%d", pid)));

Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
Expand Down
8 changes: 4 additions & 4 deletions worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
#define INFLUX_FUNCTION_NAME "InfluxWorkerMain"

typedef struct WorkerArgs {
const char *role;
const char *namespace;
const char *database;
const char *service;
char role[32];
char namespace[32];
char database[32];
char service[32];
} WorkerArgs;

void InfluxWorkerInit(BackgroundWorker *worker, WorkerArgs *args);
Expand Down

0 comments on commit ae600eb

Please sign in to comment.