mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
iec61850-9-2: allow publish/subscribe-only mode
This commit is contained in:
parent
6d49820218
commit
b811c8248d
1 changed files with 55 additions and 33 deletions
|
@ -22,7 +22,7 @@
|
|||
|
||||
#include <libiec61850/stack_config.h>
|
||||
|
||||
#if CONFIG_IEC61850_SAMPLED_VALUES_SUPPORT == 1
|
||||
//#if CONFIG_IEC61850_SAMPLED_VALUES_SUPPORT == 1
|
||||
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
|
@ -140,6 +140,8 @@ int iec61850_sv_parse(struct node *n, json_t *json)
|
|||
json_error_t err;
|
||||
|
||||
/* Default values */
|
||||
i->publisher.enabled = false;
|
||||
i->subscriber.enabled = false;
|
||||
i->publisher.smpmod = -1; /* do not set smpmod */
|
||||
i->publisher.smprate = -1; /* do not set smpmod */
|
||||
i->publisher.confrev = 1;
|
||||
|
@ -168,6 +170,8 @@ int iec61850_sv_parse(struct node *n, json_t *json)
|
|||
ether_aton_r(dst_address, &i->dst_address);
|
||||
|
||||
if (json_pub) {
|
||||
i->publisher.enabled = true;
|
||||
|
||||
ret = json_unpack_ex(json_pub, &err, 0, "{ s: o, s: s, s?: i, s?: s, s?: i, s?: i, s?: i }",
|
||||
"fields", &json_mapping,
|
||||
"svid", &svid,
|
||||
|
@ -201,6 +205,8 @@ int iec61850_sv_parse(struct node *n, json_t *json)
|
|||
}
|
||||
|
||||
if (json_sub) {
|
||||
i->subscriber.enabled = true;
|
||||
|
||||
ret = json_unpack_ex(json_sub, &err, 0, "{ s: o }",
|
||||
"fields", &json_mapping
|
||||
);
|
||||
|
@ -225,16 +231,19 @@ char * iec61850_sv_print(struct node *n)
|
|||
buf = strf("interface=%s, app_id=%#x, dst_address=%s", i->interface, i->app_id, ether_ntoa(&i->dst_address));
|
||||
|
||||
/* Publisher part */
|
||||
strcatf(&buf, ", pub.svid=%s, pub.vlan_prio=%d, pub.vlan_id=%#x, pub.confrev=%d, pub.#fields=%zu",
|
||||
i->publisher.svid,
|
||||
i->publisher.vlan_priority,
|
||||
i->publisher.vlan_id,
|
||||
i->publisher.confrev,
|
||||
list_length(&i->publisher.mapping)
|
||||
);
|
||||
if (i->publisher.enabled) {
|
||||
strcatf(&buf, ", pub.svid=%s, pub.vlan_prio=%d, pub.vlan_id=%#x, pub.confrev=%d, pub.#fields=%zu",
|
||||
i->publisher.svid,
|
||||
i->publisher.vlan_priority,
|
||||
i->publisher.vlan_id,
|
||||
i->publisher.confrev,
|
||||
list_length(&i->publisher.mapping)
|
||||
);
|
||||
}
|
||||
|
||||
/* Subscriber part */
|
||||
strcatf(&buf, ", sub.#fields=%zu", list_length(&i->subscriber.mapping));
|
||||
if (i->subscriber.enabled)
|
||||
strcatf(&buf, ", sub.#fields=%zu", list_length(&i->subscriber.mapping));
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
@ -266,28 +275,32 @@ int iec61850_sv_start(struct node *n)
|
|||
// if (s->publisher.smprate >= 0)
|
||||
// SV_ASDU_setSmpRate(i->publisher.asdu, i->publisher.smprate);
|
||||
|
||||
SVPublisher_setupComplete(i->publisher.publisher);
|
||||
/* Start publisher */
|
||||
if (i->publisher.enabled)
|
||||
SVPublisher_setupComplete(i->publisher.publisher);
|
||||
|
||||
/* Initialize subscriber */
|
||||
struct iec61850_receiver *r = iec61850_receiver_create(IEC61850_RECEIVER_SV, i->interface);
|
||||
/* Start subscriber */
|
||||
if (i->publisher.enabled) {
|
||||
struct iec61850_receiver *r = iec61850_receiver_create(IEC61850_RECEIVER_SV, i->interface);
|
||||
|
||||
i->subscriber.receiver = r->sv;
|
||||
i->subscriber.subscriber = SVSubscriber_create(i->dst_address.ether_addr_octet, i->app_id);
|
||||
i->subscriber.receiver = r->sv;
|
||||
i->subscriber.subscriber = SVSubscriber_create(i->dst_address.ether_addr_octet, i->app_id);
|
||||
|
||||
/* Install a callback handler for the subscriber */
|
||||
SVSubscriber_setListener(i->subscriber.subscriber, iec61850_sv_listener, n);
|
||||
/* Install a callback handler for the subscriber */
|
||||
SVSubscriber_setListener(i->subscriber.subscriber, iec61850_sv_listener, n);
|
||||
|
||||
/* Connect the subscriber to the receiver */
|
||||
SVReceiver_addSubscriber(i->subscriber.receiver, i->subscriber.subscriber);
|
||||
/* Connect the subscriber to the receiver */
|
||||
SVReceiver_addSubscriber(i->subscriber.receiver, i->subscriber.subscriber);
|
||||
|
||||
/* Initialize pool and queue to pass samples between threads */
|
||||
ret = pool_init(&i->subscriber.pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
/* Initialize pool and queue to pass samples between threads */
|
||||
ret = pool_init(&i->subscriber.pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_signalled_init(&i->subscriber.queue, 1024, &memtype_hugepage, 0);
|
||||
if (ret)
|
||||
return ret;
|
||||
ret = queue_signalled_init(&i->subscriber.queue, 1024, &memtype_hugepage, 0);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -296,7 +309,8 @@ int iec61850_sv_stop(struct node *n)
|
|||
{
|
||||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
|
||||
SVReceiver_removeSubscriber(i->subscriber.receiver, i->subscriber.subscriber);
|
||||
if (i->subscriber.enabled)
|
||||
SVReceiver_removeSubscriber(i->subscriber.receiver, i->subscriber.subscriber);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -307,17 +321,19 @@ int iec61850_sv_destroy(struct node *n)
|
|||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
|
||||
/* Deinitialize publisher */
|
||||
if (i->publisher.publisher)
|
||||
if (i->publisher.enabled && i->publisher.publisher)
|
||||
SVPublisher_destroy(i->publisher.publisher);
|
||||
|
||||
/* Deinitialise subscriber */
|
||||
ret = queue_signalled_destroy(&i->subscriber.queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
if (i->subscriber.enabled) {
|
||||
ret = queue_signalled_destroy(&i->subscriber.queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = pool_destroy(&i->subscriber.pool);
|
||||
if (ret)
|
||||
return ret;
|
||||
ret = pool_destroy(&i->subscriber.pool);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -328,6 +344,9 @@ int iec61850_sv_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
struct sample *smpt[cnt];
|
||||
|
||||
if (!i->subscriber.enabled)
|
||||
return 0;
|
||||
|
||||
pulled = queue_signalled_pull_many(&i->subscriber.queue, (void **) smpt, cnt);
|
||||
|
||||
sample_copy_many(smps, smpt, pulled);
|
||||
|
@ -339,6 +358,9 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
|
||||
if (!i->publisher.enabled)
|
||||
return 0;
|
||||
|
||||
for (unsigned j = 0; j < cnt; j++) {
|
||||
unsigned offset = 0;
|
||||
for (unsigned k = 0; k < MIN(smps[j]->length, list_length(&i->publisher.mapping)); k++) {
|
||||
|
|
Loading…
Add table
Reference in a new issue