Added RTMP protocol (#163)

This commit is contained in:
Alfred E. Heggestad 2018-10-24 11:18:52 +02:00 committed by Richard Aas
parent 7394ee407a
commit 68c0323fbd
19 changed files with 3334 additions and 0 deletions

View file

@ -32,6 +32,7 @@ MODULES += bfcp
MODULES += aes srtp
MODULES += odict
MODULES += json
MODULES += rtmp
INSTALL := install
ifeq ($(DESTDIR),)

View file

@ -41,6 +41,7 @@ extern "C" {
#include "re_net.h"
#include "re_odict.h"
#include "re_json.h"
#include "re_rtmp.h"
#include "re_rtp.h"
#include "re_sdp.h"
#include "re_uri.h"

View file

@ -45,3 +45,12 @@ int odict_entry_debug(struct re_printf *pf, const struct odict_entry *e);
bool odict_type_iscontainer(enum odict_type type);
bool odict_type_isreal(enum odict_type type);
const char *odict_type_name(enum odict_type type);
/* Helpers */
const struct odict_entry *odict_get_type(const struct odict *o,
enum odict_type type, const char *key);
const char *odict_string(const struct odict *o, const char *key);
bool odict_get_number(const struct odict *o, uint64_t *num, const char *key);
bool odict_get_boolean(const struct odict *o, bool *value, const char *key);

136
include/re_rtmp.h Normal file
View file

@ -0,0 +1,136 @@
/**
* @file re_rtmp.h Interface to Real Time Messaging Protocol (RTMP)
*
* Copyright (C) 2010 Creytiv.com
*/
/** RTMP Protocol values */
enum {
RTMP_PORT = 1935,
};
/** RTMP Stream IDs */
enum {
/* User Control messages SHOULD use message stream ID 0
(known as the control stream) */
RTMP_CONTROL_STREAM_ID = 0
};
/** RTMP Packet types */
enum rtmp_packet_type {
RTMP_TYPE_SET_CHUNK_SIZE = 1, /**< Set Chunk Size */
RTMP_TYPE_ACKNOWLEDGEMENT = 3, /**< Acknowledgement */
RTMP_TYPE_USER_CONTROL_MSG = 4, /**< User Control Messages */
RTMP_TYPE_WINDOW_ACK_SIZE = 5, /**< Window Acknowledgement Size */
RTMP_TYPE_SET_PEER_BANDWIDTH = 6, /**< Set Peer Bandwidth */
RTMP_TYPE_AUDIO = 8, /**< Audio Message */
RTMP_TYPE_VIDEO = 9, /**< Video Message */
RTMP_TYPE_DATA = 18, /**< Data Message */
RTMP_TYPE_AMF0 = 20, /**< Action Message Format (AMF) */
};
/** RTMP AMF types */
enum rtmp_amf_type {
RTMP_AMF_TYPE_ROOT = -1, /**< Special internal type */
RTMP_AMF_TYPE_NUMBER = 0x00, /**< Number Type */
RTMP_AMF_TYPE_BOOLEAN = 0x01, /**< Boolean Type */
RTMP_AMF_TYPE_STRING = 0x02, /**< String Type */
RTMP_AMF_TYPE_OBJECT = 0x03, /**< Object Type */
RTMP_AMF_TYPE_NULL = 0x05, /**< Null type */
RTMP_AMF_TYPE_ECMA_ARRAY = 0x08, /**< ECMA 'associative' Array */
RTMP_AMF_TYPE_OBJECT_END = 0x09, /**< Object End Type */
RTMP_AMF_TYPE_STRICT_ARRAY = 0x0a, /**< Array with ordinal indices */
};
/** RTMP Event types */
enum rtmp_event_type {
RTMP_EVENT_STREAM_BEGIN = 0, /**< Stream begin */
RTMP_EVENT_STREAM_EOF = 1, /**< Stream End-Of-File */
RTMP_EVENT_STREAM_DRY = 2, /**< No more data on the stream */
RTMP_EVENT_SET_BUFFER_LENGTH = 3, /**< Set buffer size in [ms] */
RTMP_EVENT_STREAM_IS_RECORDED = 4, /**< Stream is recorded */
RTMP_EVENT_PING_REQUEST = 6, /**< Ping Request from server */
RTMP_EVENT_PING_RESPONSE = 7, /**< Ping Response to server */
};
/* forward declarations */
struct dnsc;
struct odict;
struct tcp_sock;
/*
* RTMP High-level API (connection, stream)
*/
/* conn */
struct rtmp_conn;
typedef void (rtmp_estab_h)(void *arg);
typedef void (rtmp_command_h)(const struct odict *msg, void *arg);
typedef void (rtmp_close_h)(int err, void *arg);
int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
rtmp_estab_h *estabh, rtmp_command_h *cmdh,
rtmp_close_h *closeh, void *arg);
int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg);
int rtmp_control(const struct rtmp_conn *conn,
enum rtmp_packet_type type, ...);
struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn);
const char *rtmp_conn_stream(const struct rtmp_conn *conn);
int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn);
typedef void (rtmp_resp_h)(bool success, const struct odict *msg,
void *arg);
/* amf */
int rtmp_amf_command(const struct rtmp_conn *conn, uint32_t stream_id,
const char *command,
unsigned body_propc, ...);
int rtmp_amf_request(struct rtmp_conn *conn, uint32_t stream_id,
const char *command,
rtmp_resp_h *resph, void *arg, unsigned body_propc, ...);
int rtmp_amf_reply(struct rtmp_conn *conn, uint32_t stream_id, bool success,
const struct odict *req,
unsigned body_propc, ...);
int rtmp_amf_data(struct rtmp_conn *conn, uint32_t stream_id,
const char *command, unsigned body_propc, ...);
/* stream */
struct rtmp_stream;
typedef void (rtmp_control_h)(enum rtmp_event_type event, struct mbuf *mb,
void *arg);
typedef void (rtmp_audio_h)(uint32_t timestamp,
const uint8_t *pld, size_t len, void *arg);
typedef void (rtmp_video_h)(uint32_t timestamp,
const uint8_t *pld, size_t len, void *arg);
int rtmp_stream_alloc(struct rtmp_stream **strmp, struct rtmp_conn *conn,
uint32_t stream_id, rtmp_command_h *cmdh,
rtmp_control_h *ctrlh, rtmp_audio_h *auh,
rtmp_video_h *vidh, rtmp_command_h *datah,
void *arg);
int rtmp_stream_create(struct rtmp_stream **strmp, struct rtmp_conn *conn,
rtmp_resp_h *resph, rtmp_command_h *cmdh,
rtmp_control_h *ctrlh, rtmp_audio_h *auh,
rtmp_video_h *vidh, rtmp_command_h *datah,
void *arg);
int rtmp_play(struct rtmp_stream *strm, const char *name);
int rtmp_publish(struct rtmp_stream *strm, const char *name);
int rtmp_send_audio(struct rtmp_stream *strm, uint32_t timestamp,
const uint8_t *pld, size_t len);
int rtmp_send_video(struct rtmp_stream *strm, uint32_t timestamp,
const uint8_t *pld, size_t len);
struct rtmp_stream *rtmp_stream_find(const struct rtmp_conn *conn,
uint32_t stream_id);
const char *rtmp_event_name(enum rtmp_event_type event);

89
src/odict/get.c Normal file
View file

@ -0,0 +1,89 @@
/**
* @file get.c Ordered Dictionary -- high level accessors
*
* Copyright (C) 2010 Creytiv.com
*/
#include "re_types.h"
#include "re_fmt.h"
#include "re_mem.h"
#include "re_list.h"
#include "re_hash.h"
#include "re_odict.h"
const struct odict_entry *odict_get_type(const struct odict *o,
enum odict_type type, const char *key)
{
const struct odict_entry *entry;
if (!o || !key)
return NULL;
entry = odict_lookup(o, key);
if (!entry)
return NULL;
if (entry->type != type)
return NULL;
return entry;
}
const char *odict_string(const struct odict *o, const char *key)
{
const struct odict_entry *entry;
entry = odict_get_type(o, ODICT_STRING, key);
if (!entry)
return NULL;
return entry->u.str;
}
bool odict_get_number(const struct odict *o, uint64_t *num, const char *key)
{
const struct odict_entry *entry;
if (!o || !key)
return false;
entry = odict_lookup(o, key);
if (!entry)
return false;
switch (entry->type) {
case ODICT_DOUBLE:
if (num)
*num = (uint64_t)entry->u.dbl;
break;
case ODICT_INT:
if (num)
*num = entry->u.integer;
break;
default:
return false;
}
return true;
}
bool odict_get_boolean(const struct odict *o, bool *value, const char *key)
{
const struct odict_entry *entry;
entry = odict_get_type(o, ODICT_BOOL, key);
if (!entry)
return false;
if (value)
*value = entry->u.boolean;
return true;
}

View file

@ -7,3 +7,4 @@
SRCS += odict/entry.c
SRCS += odict/odict.c
SRCS += odict/type.c
SRCS += odict/get.c

126
src/rtmp/README.md Normal file
View file

@ -0,0 +1,126 @@
RTMP module
-----------
This module implements Real Time Messaging Protocol (RTMP) [1].
Functional overview:
-------------------
```
RTMP Specification v1.0 .......... YES
RTMP with TCP transport .......... YES
RTMPS (RTMP over TLS) ............ NO
RTMPE (RTMP over Adobe Encryption) NO
RTMPT (RTMP over HTTP) ........... NO
RTMFP (RTMP over UDP) ............ NO
Transport:
Client ........................... YES
Server ........................... YES
IPv4 ............................. YES
IPv6 ............................. YES
DNS Resolving A/AAAA ............. YES
RTMP Components:
RTMP Handshake ................... YES
RTMP Header encoding and decoding. YES
RTMP Chunking .................... YES
RTMP Dechunking .................. YES
AMF0 (Action Message Format) ..... YES
AMF3 (Action Message Format) ..... NO
Send and receive audio/video ..... YES
Regular and extended timestamp ... YES
Multiple streams ................. YES
```
TODO:
----
- [x] improve AMF encoding API
- [x] implement AMF transaction matching
- [x] add support for Data Message
- [x] add support for AMF Strict Array (type 10)
- [ ] add support for TLS encryption
- [x] add support for extended timestamp
Protocol stack:
--------------
.-------. .-------. .-------.
| AMF | | Audio | | Video |
'-------' '-------' '-------'
| | |
+----------+----------'
|
.-------.
| RTMP |
'-------'
|
|
.-------.
| TCP |
'-------'
Message Sequence:
----------------
```
Client Server
|----------------- TCP Connect -------------->|
| |
| |
| |
|<-------------- 3-way Handshake ------------>|
| |
| |
| |
|----------- Command Message(connect) ------->| chunkid=3, streamid=0, tid=1
| |
|<------- Window Acknowledgement Size --------| chunkid=2, streamid=0
| |
|<----------- Set Peer Bandwidth -------------| chunkid=2, streamid=0
| |
|-------- Window Acknowledgement Size ------->|
| |
|<------ User Control Message(StreamBegin) ---| chunkid=2, streamid=0
| |
|<------------ Command Message ---------------| chunkid=3, streamid=0, tid=1
| (_result- connect response) |
```
Interop:
-------
- Wowza Streaming Engine 4.7.1
- Youtube service
- FFmpeg's RTMP module
References:
----------
[1] http://wwwimages.adobe.com/www.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
[2] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/flv/video_file_format_spec_v10_1.pdf
[3] https://en.wikipedia.org/wiki/Action_Message_Format
[4] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf

163
src/rtmp/amf.c Normal file
View file

@ -0,0 +1,163 @@
/**
* @file rtmp/amf.c Real Time Messaging Protocol (RTMP) -- AMF Commands
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_tcp.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_rtmp.h>
#include "rtmp.h"
int rtmp_command_header_encode(struct mbuf *mb, const char *name, uint64_t tid)
{
int err;
if (!mb || !name)
return EINVAL;
err = rtmp_amf_encode_string(mb, name);
err |= rtmp_amf_encode_number(mb, tid);
return err;
}
int rtmp_amf_command(const struct rtmp_conn *conn, uint32_t stream_id,
const char *command, unsigned body_propc, ...)
{
struct mbuf *mb;
va_list ap;
int err;
if (!conn || !command)
return EINVAL;
mb = mbuf_alloc(512);
if (!mb)
return ENOMEM;
err = rtmp_amf_encode_string(mb, command);
if (err)
goto out;
if (body_propc) {
va_start(ap, body_propc);
err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
body_propc, &ap);
va_end(ap);
if (err)
goto out;
}
err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
RTMP_TYPE_AMF0,
stream_id, mb->buf, mb->end);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}
int rtmp_amf_reply(struct rtmp_conn *conn, uint32_t stream_id, bool success,
const struct odict *req,
unsigned body_propc, ...)
{
struct mbuf *mb;
va_list ap;
uint64_t tid;
int err;
if (!conn || !req)
return EINVAL;
if (!odict_get_number(req, &tid, "1"))
return EPROTO;
if (tid == 0)
return EPROTO;
mb = mbuf_alloc(512);
if (!mb)
return ENOMEM;
err = rtmp_command_header_encode(mb,
success ? "_result" : "_error", tid);
if (err)
goto out;
if (body_propc) {
va_start(ap, body_propc);
err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
body_propc, &ap);
va_end(ap);
if (err)
goto out;
}
err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
RTMP_TYPE_AMF0,
stream_id, mb->buf, mb->end);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}
int rtmp_amf_data(struct rtmp_conn *conn, uint32_t stream_id,
const char *command, unsigned body_propc, ...)
{
struct mbuf *mb;
va_list ap;
int err;
if (!conn || !command)
return EINVAL;
mb = mbuf_alloc(512);
if (!mb)
return ENOMEM;
err = rtmp_amf_encode_string(mb, command);
if (err)
goto out;
if (body_propc) {
va_start(ap, body_propc);
err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
body_propc, &ap);
va_end(ap);
if (err)
goto out;
}
err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
RTMP_TYPE_DATA,
stream_id, mb->buf, mb->end);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}

235
src/rtmp/amf_dec.c Normal file
View file

@ -0,0 +1,235 @@
/**
* @file rtmp/amf_dec.c Real Time Messaging Protocol (RTMP) -- AMF Decoding
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_rtmp.h>
#include "rtmp.h"
enum {
AMF_HASH_SIZE = 32
};
static int amf_decode_value(struct odict *dict, const char *key,
struct mbuf *mb);
static int amf_decode_object(struct odict *dict, struct mbuf *mb)
{
char *key = NULL;
uint16_t len;
int err = 0;
while (mbuf_get_left(mb) > 0) {
if (mbuf_get_left(mb) < 2)
return ENODATA;
len = ntohs(mbuf_read_u16(mb));
if (len == 0) {
uint8_t val;
if (mbuf_get_left(mb) < 1)
return ENODATA;
val = mbuf_read_u8(mb);
if (val == RTMP_AMF_TYPE_OBJECT_END)
return 0;
else
return EBADMSG;
}
if (mbuf_get_left(mb) < len)
return ENODATA;
err = mbuf_strdup(mb, &key, len);
if (err)
return err;
err = amf_decode_value(dict, key, mb);
key = mem_deref(key);
if (err)
return err;
}
return 0;
}
static int amf_decode_value(struct odict *dict, const char *key,
struct mbuf *mb)
{
union {
uint64_t i;
double f;
} num;
struct odict *object = NULL;
char *str = NULL;
uint32_t i, array_len;
uint8_t type;
uint16_t len;
bool boolean;
int err = 0;
if (mbuf_get_left(mb) < 1)
return ENODATA;
type = mbuf_read_u8(mb);
switch (type) {
case RTMP_AMF_TYPE_NUMBER:
if (mbuf_get_left(mb) < 8)
return ENODATA;
num.i = sys_ntohll(mbuf_read_u64(mb));
err = odict_entry_add(dict, key, ODICT_DOUBLE, num.f);
break;
case RTMP_AMF_TYPE_BOOLEAN:
if (mbuf_get_left(mb) < 1)
return ENODATA;
boolean = !!mbuf_read_u8(mb);
err = odict_entry_add(dict, key, ODICT_BOOL, boolean);
break;
case RTMP_AMF_TYPE_STRING:
if (mbuf_get_left(mb) < 2)
return ENODATA;
len = ntohs(mbuf_read_u16(mb));
if (mbuf_get_left(mb) < len)
return ENODATA;
err = mbuf_strdup(mb, &str, len);
if (err)
return err;
err = odict_entry_add(dict, key, ODICT_STRING, str);
mem_deref(str);
break;
case RTMP_AMF_TYPE_NULL:
err = odict_entry_add(dict, key, ODICT_NULL);
break;
case RTMP_AMF_TYPE_ECMA_ARRAY:
if (mbuf_get_left(mb) < 4)
return ENODATA;
array_len = ntohl(mbuf_read_u32(mb));
(void)array_len; /* ignore array length */
/* fallthrough */
case RTMP_AMF_TYPE_OBJECT:
err = odict_alloc(&object, 32);
if (err)
return err;
err = amf_decode_object(object, mb);
if (err) {
mem_deref(object);
return err;
}
err = odict_entry_add(dict, key, ODICT_OBJECT, object);
mem_deref(object);
break;
case RTMP_AMF_TYPE_STRICT_ARRAY:
if (mbuf_get_left(mb) < 4)
return ENODATA;
array_len = ntohl(mbuf_read_u32(mb));
if (!array_len)
return EPROTO;
err = odict_alloc(&object, 32);
if (err)
return err;
for (i=0; i<array_len; i++) {
char ix[32];
re_snprintf(ix, sizeof(ix), "%u", i);
err = amf_decode_value(object, ix, mb);
if (err) {
mem_deref(object);
return err;
}
}
err = odict_entry_add(dict, key, ODICT_ARRAY, object);
mem_deref(object);
break;
default:
err = EPROTO;
break;
}
return err;
}
int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb)
{
struct odict *msg;
unsigned ix = 0;
int err;
if (!msgp || !mb)
return EINVAL;
err = odict_alloc(&msg, AMF_HASH_SIZE);
if (err)
return err;
/* decode all entries on root-level */
while (mbuf_get_left(mb) > 0) {
char key[16];
re_snprintf(key, sizeof(key), "%u", ix++);
/* note: key is the numerical index */
err = amf_decode_value(msg, key, mb);
if (err)
goto out;
}
out:
if (err)
mem_deref(msg);
else
*msgp = msg;
return err;
}

245
src/rtmp/amf_enc.c Normal file
View file

@ -0,0 +1,245 @@
/**
* @file rtmp/amf_enc.c Real Time Messaging Protocol (RTMP) -- AMF Encoding
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_rtmp.h>
#include "rtmp.h"
static int rtmp_amf_encode_key(struct mbuf *mb, const char *key)
{
size_t len;
int err;
len = str_len(key);
if (len > 65535)
return EOVERFLOW;
err = mbuf_write_u16(mb, htons((uint16_t)len));
err |= mbuf_write_str(mb, key);
return err;
}
static int rtmp_amf_encode_object_start(struct mbuf *mb)
{
return mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT);
}
static int rtmp_amf_encode_array_start(struct mbuf *mb,
uint8_t type, uint32_t length)
{
int err;
err = mbuf_write_u8(mb, type);
err |= mbuf_write_u32(mb, htonl(length));
return err;
}
static int rtmp_amf_encode_object_end(struct mbuf *mb)
{
int err;
err = mbuf_write_u16(mb, 0);
err |= mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT_END);
return err;
}
static bool container_has_key(enum rtmp_amf_type type)
{
switch (type) {
case RTMP_AMF_TYPE_OBJECT: return true;
case RTMP_AMF_TYPE_ECMA_ARRAY: return true;
case RTMP_AMF_TYPE_STRICT_ARRAY: return false;
default: return false;
}
}
int rtmp_amf_encode_number(struct mbuf *mb, double val)
{
const union {
uint64_t i;
double f;
} num = {
.f = val
};
int err;
if (!mb)
return EINVAL;
err = mbuf_write_u8(mb, RTMP_AMF_TYPE_NUMBER);
err |= mbuf_write_u64(mb, sys_htonll(num.i));
return err;
}
int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean)
{
int err;
if (!mb)
return EINVAL;
err = mbuf_write_u8(mb, RTMP_AMF_TYPE_BOOLEAN);
err |= mbuf_write_u8(mb, !!boolean);
return err;
}
int rtmp_amf_encode_string(struct mbuf *mb, const char *str)
{
size_t len;
int err;
if (!mb || !str)
return EINVAL;
len = str_len(str);
if (len > 65535)
return EOVERFLOW;
err = mbuf_write_u8(mb, RTMP_AMF_TYPE_STRING);
err |= mbuf_write_u16(mb, htons((uint16_t)len));
err |= mbuf_write_str(mb, str);
return err;
}
int rtmp_amf_encode_null(struct mbuf *mb)
{
if (!mb)
return EINVAL;
return mbuf_write_u8(mb, RTMP_AMF_TYPE_NULL);
}
/*
* NUMBER double
* BOOLEAN bool
* STRING const char *
* OBJECT const char *key sub-count
* NULL NULL
* ARRAY const char *key sub-count
*/
int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
unsigned propc, va_list *ap)
{
bool encode_key;
unsigned i;
int err = 0;
if (!mb || !propc || !ap)
return EINVAL;
encode_key = container_has_key(container);
switch (container) {
case RTMP_AMF_TYPE_OBJECT:
err = rtmp_amf_encode_object_start(mb);
break;
case RTMP_AMF_TYPE_ECMA_ARRAY:
case RTMP_AMF_TYPE_STRICT_ARRAY:
err = rtmp_amf_encode_array_start(mb, container, propc);
break;
case RTMP_AMF_TYPE_ROOT:
break;
default:
return ENOTSUP;
}
if (err)
return err;
for (i=0; i<propc; i++) {
int type = va_arg(*ap, int);
const char *str;
int subcount;
double dbl;
bool b;
/* add key if ARRAY or OBJECT container */
if (encode_key) {
const char *key;
key = va_arg(*ap, const char *);
if (!key)
return EINVAL;
err = rtmp_amf_encode_key(mb, key);
if (err)
return err;
}
switch (type) {
case RTMP_AMF_TYPE_NUMBER:
dbl = va_arg(*ap, double);
err = rtmp_amf_encode_number(mb, dbl);
break;
case RTMP_AMF_TYPE_BOOLEAN:
b = va_arg(*ap, int);
err = rtmp_amf_encode_boolean(mb, b);
break;
case RTMP_AMF_TYPE_STRING:
str = va_arg(*ap, const char *);
err = rtmp_amf_encode_string(mb, str);
break;
case RTMP_AMF_TYPE_NULL:
err = rtmp_amf_encode_null(mb);
break;
case RTMP_AMF_TYPE_OBJECT:
case RTMP_AMF_TYPE_ECMA_ARRAY:
case RTMP_AMF_TYPE_STRICT_ARRAY:
/* recursive */
subcount = va_arg(*ap, int);
err = rtmp_amf_vencode_object(mb, type, subcount, ap);
break;
default:
return ENOTSUP;
}
if (err)
return err;
}
if (encode_key)
err = rtmp_amf_encode_object_end(mb);
return err;
}

87
src/rtmp/chunk.c Normal file
View file

@ -0,0 +1,87 @@
/**
* @file rtmp/chunk.c Real Time Messaging Protocol (RTMP) -- Chunking
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_tcp.h>
#include <re_list.h>
#include <re_rtmp.h>
#include "rtmp.h"
/*
* Stateless RTMP chunker
*/
int rtmp_chunker(unsigned format, uint32_t chunk_id,
uint32_t timestamp, uint32_t timestamp_delta,
uint8_t msg_type_id, uint32_t msg_stream_id,
const uint8_t *payload, size_t payload_len,
size_t max_chunk_sz, struct tcp_conn *tc)
{
const uint8_t *pend = payload + payload_len;
struct rtmp_header hdr;
struct mbuf *mb;
size_t chunk_sz;
int err;
if (!payload || !payload_len || !max_chunk_sz || !tc)
return EINVAL;
mb = mbuf_alloc(payload_len + 256);
if (!mb)
return ENOMEM;
memset(&hdr, 0, sizeof(hdr));
hdr.format = format;
hdr.chunk_id = chunk_id;
hdr.timestamp = timestamp;
hdr.timestamp_delta = timestamp_delta;
hdr.length = (uint32_t)payload_len;
hdr.type_id = msg_type_id;
hdr.stream_id = msg_stream_id;
chunk_sz = min(payload_len, max_chunk_sz);
err = rtmp_header_encode(mb, &hdr);
err |= mbuf_write_mem(mb, payload, chunk_sz);
if (err)
goto out;
payload += chunk_sz;
hdr.format = 3;
while (payload < pend) {
const size_t len = pend - payload;
chunk_sz = min(len, max_chunk_sz);
err = rtmp_header_encode(mb, &hdr);
err |= mbuf_write_mem(mb, payload, chunk_sz);
if (err)
goto out;
payload += chunk_sz;
}
mb->pos = 0;
err = tcp_send(tc, mb);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}

997
src/rtmp/conn.c Normal file
View file

@ -0,0 +1,997 @@
/**
* @file rtmp/conn.c Real Time Messaging Protocol (RTMP) -- NetConnection
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_tcp.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_dns.h>
#include <re_rtmp.h>
#include "rtmp.h"
enum {
WINDOW_ACK_SIZE = 2500000
};
static int req_connect(struct rtmp_conn *conn);
static void conn_destructor(void *data)
{
struct rtmp_conn *conn = data;
list_flush(&conn->ctransl);
list_flush(&conn->streaml);
mem_deref(conn->dnsq6);
mem_deref(conn->dnsq4);
mem_deref(conn->dnsc);
mem_deref(conn->tc);
mem_deref(conn->mb);
mem_deref(conn->dechunk);
mem_deref(conn->uri);
mem_deref(conn->app);
mem_deref(conn->host);
mem_deref(conn->stream);
}
static int handle_amf_command(struct rtmp_conn *conn, uint32_t stream_id,
struct mbuf *mb)
{
struct odict *msg = NULL;
const char *name;
int err;
err = rtmp_amf_decode(&msg, mb);
if (err)
return err;
name = odict_string(msg, "0");
if (conn->is_client &&
(0 == str_casecmp(name, "_result") ||
0 == str_casecmp(name, "_error"))) {
/* forward response to transaction layer */
rtmp_ctrans_response(&conn->ctransl, msg);
}
else {
struct rtmp_stream *strm;
if (stream_id == 0) {
if (conn->cmdh)
conn->cmdh(msg, conn->arg);
}
else {
strm = rtmp_stream_find(conn, stream_id);
if (strm) {
if (strm->cmdh)
strm->cmdh(msg, strm->arg);
}
}
}
mem_deref(msg);
return 0;
}
static int handle_user_control_msg(struct rtmp_conn *conn, struct mbuf *mb)
{
struct rtmp_stream *strm;
enum rtmp_event_type event;
uint32_t value;
int err;
if (mbuf_get_left(mb) < 6)
return EBADMSG;
event = ntohs(mbuf_read_u16(mb));
value = ntohl(mbuf_read_u32(mb));
switch (event) {
case RTMP_EVENT_STREAM_BEGIN:
case RTMP_EVENT_STREAM_EOF:
case RTMP_EVENT_STREAM_DRY:
case RTMP_EVENT_STREAM_IS_RECORDED:
case RTMP_EVENT_SET_BUFFER_LENGTH:
if (value != RTMP_CONTROL_STREAM_ID) {
strm = rtmp_stream_find(conn, value);
if (strm && strm->ctrlh)
strm->ctrlh(event, mb, strm->arg);
}
break;
case RTMP_EVENT_PING_REQUEST:
err = rtmp_control(conn, RTMP_TYPE_USER_CONTROL_MSG,
RTMP_EVENT_PING_RESPONSE, value);
if (err)
return err;
break;
default:
break;
}
return 0;
}
static int handle_data_message(struct rtmp_conn *conn, uint32_t stream_id,
struct mbuf *mb)
{
struct rtmp_stream *strm;
struct odict *msg;
int err;
err = rtmp_amf_decode(&msg, mb);
if (err)
return err;
strm = rtmp_stream_find(conn, stream_id);
if (strm && strm->datah)
strm->datah(msg, strm->arg);
mem_deref(msg);
return 0;
}
static int rtmp_dechunk_handler(const struct rtmp_header *hdr,
struct mbuf *mb, void *arg)
{
struct rtmp_conn *conn = arg;
struct rtmp_stream *strm;
uint32_t val;
uint32_t was;
uint8_t limit;
int err = 0;
switch (hdr->type_id) {
case RTMP_TYPE_SET_CHUNK_SIZE:
if (mbuf_get_left(mb) < 4)
return EBADMSG;
val = ntohl(mbuf_read_u32(mb));
val = val & 0x7fffffff;
rtmp_dechunker_set_chunksize(conn->dechunk, val);
break;
case RTMP_TYPE_ACKNOWLEDGEMENT:
if (mbuf_get_left(mb) < 4)
return EBADMSG;
val = ntohl(mbuf_read_u32(mb));
(void)val;
break;
case RTMP_TYPE_AMF0:
err = handle_amf_command(conn, hdr->stream_id, mb);
break;
case RTMP_TYPE_WINDOW_ACK_SIZE:
if (mbuf_get_left(mb) < 4)
return EBADMSG;
was = ntohl(mbuf_read_u32(mb));
if (was != 0)
conn->window_ack_size = was;
break;
case RTMP_TYPE_SET_PEER_BANDWIDTH:
if (mbuf_get_left(mb) < 5)
return EBADMSG;
was = ntohl(mbuf_read_u32(mb));
limit = mbuf_read_u8(mb);
(void)limit;
if (was != 0)
conn->window_ack_size = was;
err = rtmp_control(conn, RTMP_TYPE_WINDOW_ACK_SIZE,
(uint32_t)WINDOW_ACK_SIZE);
break;
case RTMP_TYPE_USER_CONTROL_MSG:
err = handle_user_control_msg(conn, mb);
break;
/* XXX: common code for audio+video */
case RTMP_TYPE_AUDIO:
strm = rtmp_stream_find(conn, hdr->stream_id);
if (strm) {
if (strm->auh) {
strm->auh(hdr->timestamp,
mb->buf, mb->end,
strm->arg);
}
}
break;
case RTMP_TYPE_VIDEO:
strm = rtmp_stream_find(conn, hdr->stream_id);
if (strm) {
if (strm->vidh) {
strm->vidh(hdr->timestamp,
mb->buf, mb->end,
strm->arg);
}
}
break;
case RTMP_TYPE_DATA:
err = handle_data_message(conn, hdr->stream_id, mb);
break;
default:
break;
}
return err;
}
static struct rtmp_conn *rtmp_conn_alloc(bool is_client,
rtmp_estab_h *estabh,
rtmp_command_h *cmdh,
rtmp_close_h *closeh,
void *arg)
{
struct rtmp_conn *conn;
int err;
conn = mem_zalloc(sizeof(*conn), conn_destructor);
if (!conn)
return NULL;
conn->is_client = is_client;
conn->state = RTMP_STATE_UNINITIALIZED;
conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
conn->window_ack_size = WINDOW_ACK_SIZE;
err = rtmp_dechunker_alloc(&conn->dechunk, RTMP_DEFAULT_CHUNKSIZE,
rtmp_dechunk_handler, conn);
if (err)
goto out;
/* must be above 2 */
conn->chunk_id_counter = RTMP_CHUNK_ID_CONN + 1;
conn->estabh = estabh;
conn->cmdh = cmdh;
conn->closeh = closeh;
conn->arg = arg;
out:
if (err)
return mem_deref(conn);
return conn;
}
static inline void set_state(struct rtmp_conn *conn,
enum rtmp_handshake_state state)
{
conn->state = state;
}
static int send_packet(struct rtmp_conn *conn, const uint8_t *pkt, size_t len)
{
struct mbuf *mb;
int err;
if (!conn || !pkt || !len)
return EINVAL;
mb = mbuf_alloc(len);
if (!mb)
return ENOMEM;
(void)mbuf_write_mem(mb, pkt, len);
mb->pos = 0;
err = tcp_send(conn->tc, mb);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}
static int handshake_start(struct rtmp_conn *conn)
{
uint8_t sig[1+RTMP_HANDSHAKE_SIZE];
int err;
sig[0] = RTMP_PROTOCOL_VERSION;
sig[1] = 0;
sig[2] = 0;
sig[3] = 0;
sig[4] = 0;
sig[5] = VER_MAJOR;
sig[6] = VER_MINOR;
sig[7] = VER_PATCH;
sig[8] = 0;
rand_bytes(sig + 9, sizeof(sig) - 9);
err = send_packet(conn, sig, sizeof(sig));
if (err)
return err;
set_state(conn, RTMP_STATE_VERSION_SENT);
return 0;
}
static void conn_close(struct rtmp_conn *conn, int err)
{
rtmp_close_h *closeh;
conn->tc = mem_deref(conn->tc);
conn->dnsq6 = mem_deref(conn->dnsq6);
conn->dnsq4 = mem_deref(conn->dnsq4);
closeh = conn->closeh;
if (closeh) {
conn->closeh = NULL;
closeh(err, conn->arg);
}
}
static void tcp_estab_handler(void *arg)
{
struct rtmp_conn *conn = arg;
int err = 0;
if (conn->is_client) {
err = handshake_start(conn);
}
if (err)
conn_close(conn, err);
}
/* Send AMF0 Command or Data */
int rtmp_send_amf_command(const struct rtmp_conn *conn,
unsigned format, uint32_t chunk_id,
uint8_t type_id,
uint32_t msg_stream_id,
const uint8_t *cmd, size_t len)
{
if (!conn || !cmd || !len)
return EINVAL;
return rtmp_chunker(format, chunk_id, 0, 0, type_id, msg_stream_id,
cmd, len, conn->send_chunk_size,
conn->tc);
}
static void connect_resp_handler(bool success, const struct odict *msg,
void *arg)
{
struct rtmp_conn *conn = arg;
rtmp_estab_h *estabh;
(void)msg;
if (!success) {
conn_close(conn, EPROTO);
return;
}
conn->connected = true;
estabh = conn->estabh;
if (estabh) {
conn->estabh = NULL;
estabh(conn->arg);
}
}
static int send_connect(struct rtmp_conn *conn)
{
const int ac = 0x0400; /* AAC */
const int vc = 0x0080; /* H264 */
return rtmp_amf_request(conn, RTMP_CONTROL_STREAM_ID, "connect",
connect_resp_handler, conn,
1,
RTMP_AMF_TYPE_OBJECT, 8,
RTMP_AMF_TYPE_STRING, "app", conn->app,
RTMP_AMF_TYPE_STRING, "flashVer", "LNX 9,0,124,2",
RTMP_AMF_TYPE_STRING, "tcUrl", conn->uri,
RTMP_AMF_TYPE_BOOLEAN, "fpad", false,
RTMP_AMF_TYPE_NUMBER, "capabilities", 15.0,
RTMP_AMF_TYPE_NUMBER, "audioCodecs", (double)ac,
RTMP_AMF_TYPE_NUMBER, "videoCodecs", (double)vc,
RTMP_AMF_TYPE_NUMBER, "videoFunction", 1.0);
}
static int client_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
{
uint8_t s0;
uint8_t s1[RTMP_HANDSHAKE_SIZE];
int err = 0;
switch (conn->state) {
case RTMP_STATE_VERSION_SENT:
if (mbuf_get_left(mb) < (1+RTMP_HANDSHAKE_SIZE))
return ENODATA;
s0 = mbuf_read_u8(mb);
if (s0 != RTMP_PROTOCOL_VERSION)
return EPROTO;
(void)mbuf_read_mem(mb, s1, sizeof(s1));
err = send_packet(conn, s1, sizeof(s1));
if (err)
return err;
set_state(conn, RTMP_STATE_ACK_SENT);
break;
case RTMP_STATE_ACK_SENT:
if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
return ENODATA;
/* S2 (ignored) */
mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
conn->send_chunk_size = 4096;
err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
conn->send_chunk_size);
if (err)
return err;
err = send_connect(conn);
if (err)
return err;
set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
break;
case RTMP_STATE_HANDSHAKE_DONE:
err = rtmp_dechunker_receive(conn->dechunk, mb);
if (err)
return err;
break;
default:
return EPROTO;
}
return 0;
}
static int server_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
{
uint8_t c0;
uint8_t c1[RTMP_HANDSHAKE_SIZE];
int err = 0;
switch (conn->state) {
case RTMP_STATE_UNINITIALIZED:
if (mbuf_get_left(mb) < 1)
return ENODATA;
c0 = mbuf_read_u8(mb);
if (c0 != RTMP_PROTOCOL_VERSION)
return EPROTO;
/* Send S0 + S1 */
err = handshake_start(conn);
if (err)
return err;
break;
case RTMP_STATE_VERSION_SENT:
if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
return ENODATA;
(void)mbuf_read_mem(mb, c1, sizeof(c1));
/* Copy C1 to S2 */
err = send_packet(conn, c1, sizeof(c1));
if (err)
return err;
set_state(conn, RTMP_STATE_ACK_SENT);
break;
case RTMP_STATE_ACK_SENT:
if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
return ENODATA;
/* C2 (ignored) */
mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
conn->send_chunk_size = 4096;
err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
conn->send_chunk_size);
if (err)
return err;
set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
break;
case RTMP_STATE_HANDSHAKE_DONE:
err = rtmp_dechunker_receive(conn->dechunk, mb);
if (err)
return err;
break;
default:
return EPROTO;
}
return 0;
}
static void tcp_recv_handler(struct mbuf *mb_pkt, void *arg)
{
struct rtmp_conn *conn = arg;
int err;
conn->total_bytes += mbuf_get_left(mb_pkt);
/* re-assembly of fragments */
if (conn->mb) {
const size_t len = mbuf_get_left(mb_pkt), pos = conn->mb->pos;
if ((mbuf_get_left(conn->mb) + len) > RTMP_MESSAGE_LEN_MAX) {
err = EOVERFLOW;
goto out;
}
conn->mb->pos = conn->mb->end;
err = mbuf_write_mem(conn->mb,
mbuf_buf(mb_pkt), mbuf_get_left(mb_pkt));
if (err)
goto out;
conn->mb->pos = pos;
}
else {
conn->mb = mem_ref(mb_pkt);
}
while (mbuf_get_left(conn->mb) > 0) {
size_t pos;
uint32_t nrefs;
pos = conn->mb->pos;
mem_ref(conn);
if (conn->is_client)
err = client_handle_packet(conn, conn->mb);
else
err = server_handle_packet(conn, conn->mb);
nrefs = mem_nrefs(conn);
mem_deref(conn);
if (nrefs == 1)
return;
if (!conn->tc)
return;
if (err) {
/* rewind */
conn->mb->pos = pos;
if (err == ENODATA)
err = 0;
break;
}
if (conn->mb->pos >= conn->mb->end) {
conn->mb = mem_deref(conn->mb);
break;
}
}
if (err)
goto out;
if (conn->total_bytes >= (conn->last_ack + conn->window_ack_size)) {
conn->last_ack = conn->total_bytes;
err = rtmp_control(conn, RTMP_TYPE_ACKNOWLEDGEMENT,
(uint32_t)conn->total_bytes);
if (err)
goto out;
}
out:
if (err)
conn_close(conn, err);
}
static void tcp_close_handler(int err, void *arg)
{
struct rtmp_conn *conn = arg;
if (conn->is_client && !conn->connected && conn->srvc > 0) {
err = req_connect(conn);
if (!err)
return;
}
conn_close(conn, err);
}
static int req_connect(struct rtmp_conn *conn)
{
const struct sa *addr;
int err = EINVAL;
while (conn->srvc > 0) {
--conn->srvc;
addr = &conn->srvv[conn->srvc];
conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
conn->window_ack_size = WINDOW_ACK_SIZE;
conn->state = RTMP_STATE_UNINITIALIZED;
conn->last_ack = 0;
conn->total_bytes = 0;
conn->mb = mem_deref(conn->mb);
conn->tc = mem_deref(conn->tc);
rtmp_dechunker_set_chunksize(conn->dechunk,
RTMP_DEFAULT_CHUNKSIZE);
err = tcp_connect(&conn->tc, addr, tcp_estab_handler,
tcp_recv_handler, tcp_close_handler, conn);
if (!err)
break;
}
return err;
}
static bool rr_handler(struct dnsrr *rr, void *arg)
{
struct rtmp_conn *conn = arg;
if (conn->srvc >= ARRAY_SIZE(conn->srvv))
return true;
switch (rr->type) {
case DNS_TYPE_A:
sa_set_in(&conn->srvv[conn->srvc++], rr->rdata.a.addr,
conn->port);
break;
case DNS_TYPE_AAAA:
sa_set_in6(&conn->srvv[conn->srvc++], rr->rdata.aaaa.addr,
conn->port);
break;
}
return false;
}
static void query_handler(int err, const struct dnshdr *hdr, struct list *ansl,
struct list *authl, struct list *addl, void *arg)
{
struct rtmp_conn *conn = arg;
(void)hdr;
(void)authl;
(void)addl;
dns_rrlist_apply2(ansl, conn->host, DNS_TYPE_A, DNS_TYPE_AAAA,
DNS_CLASS_IN, true, rr_handler, conn);
/* wait for other (A/AAAA) query to complete */
if (conn->dnsq4 || conn->dnsq6)
return;
if (conn->srvc == 0) {
err = err ? err : EDESTADDRREQ;
goto out;
}
err = req_connect(conn);
if (err)
goto out;
return;
out:
conn_close(conn, err);
}
/**
* Connect to an RTMP server
*
* @param connp Pointer to allocated RTMP connection object
* @param dnsc DNS Client for resolving FQDN uris
* @param uri RTMP uri to connect to
* @param estabh Established handler
* @param cmdh Incoming command handler
* @param closeh Close handler
* @param arg Handler argument
*
* @return 0 if success, otherwise errorcode
*
* Example URIs:
*
* rtmp://a.rtmp.youtube.com/live2/my-stream
*/
int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
rtmp_estab_h *estabh, rtmp_command_h *cmdh,
rtmp_close_h *closeh, void *arg)
{
struct rtmp_conn *conn;
struct pl pl_host;
struct pl pl_port;
struct pl pl_app;
struct pl pl_stream;
int err;
if (!connp || !uri)
return EINVAL;
if (re_regex(uri, strlen(uri), "rtmp://[^:/]+[:]*[0-9]*/[^/]+/[^]+",
&pl_host, NULL, &pl_port, &pl_app, &pl_stream))
return EINVAL;
conn = rtmp_conn_alloc(true, estabh, cmdh, closeh, arg);
if (!conn)
return ENOMEM;
conn->port = pl_isset(&pl_port) ? pl_u32(&pl_port) : RTMP_PORT;
err = pl_strdup(&conn->app, &pl_app);
err |= pl_strdup(&conn->stream, &pl_stream);
err |= str_dup(&conn->uri, uri);
if (err)
goto out;
if (0 == sa_set(&conn->srvv[0], &pl_host, conn->port)) {
conn->srvc = 1;
err = req_connect(conn);
if (err)
goto out;
}
else {
#ifdef HAVE_INET6
struct sa tmp;
#endif
if (!dnsc) {
err = EINVAL;
goto out;
}
err = pl_strdup(&conn->host, &pl_host);
if (err)
goto out;
conn->dnsc = mem_ref(dnsc);
err = dnsc_query(&conn->dnsq4, dnsc, conn->host, DNS_TYPE_A,
DNS_CLASS_IN, true, query_handler, conn);
if (err)
goto out;
#ifdef HAVE_INET6
if (0 == net_default_source_addr_get(AF_INET6, &tmp)) {
err = dnsc_query(&conn->dnsq6, dnsc, conn->host,
DNS_TYPE_AAAA, DNS_CLASS_IN,
true, query_handler, conn);
if (err)
goto out;
}
#endif
}
out:
if (err)
mem_deref(conn);
else
*connp = conn;
return err;
}
/**
* Accept an incoming TCP connection creating an RTMP Server connection
*
* @param connp Pointer to allocated RTMP connection object
* @param ts TCP socket with pending connection
* @param cmdh Incoming command handler
* @param closeh Close handler
* @param arg Handler argument
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg)
{
struct rtmp_conn *conn;
int err;
if (!connp || !ts)
return EINVAL;
conn = rtmp_conn_alloc(false, NULL, cmdh, closeh, arg);
if (!conn)
return ENOMEM;
err = tcp_accept(&conn->tc, ts, tcp_estab_handler,
tcp_recv_handler, tcp_close_handler, conn);
if (err)
goto out;
out:
if (err)
mem_deref(conn);
else
*connp = conn;
return err;
}
int rtmp_conn_send_msg(const struct rtmp_conn *conn,
unsigned format, uint32_t chunk_id,
uint32_t timestamp, uint32_t timestamp_delta,
uint8_t msg_type_id, uint32_t msg_stream_id,
const uint8_t *payload, size_t payload_len)
{
if (!conn || !payload || !payload_len)
return EINVAL;
return rtmp_chunker(format, chunk_id, timestamp, timestamp_delta,
msg_type_id, msg_stream_id, payload, payload_len,
conn->send_chunk_size,
conn->tc);
}
unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn)
{
if (!conn)
return 0;
return ++conn->chunk_id_counter;
}
uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn)
{
if (!conn)
return 0;
return ++conn->tid_counter;
}
/**
* Get the underlying TCP connection from an RTMP connection
*
* @param conn RTMP Connection
*
* @return TCP-Connection
*/
struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn)
{
return conn ? conn->tc : NULL;
}
/**
* Get the RTMP connection stream name from rtmp_connect
*
* @param conn RTMP Connection
*
* @return RTMP Stream name or NULL
*/
const char *rtmp_conn_stream(const struct rtmp_conn *conn)
{
return conn ? conn->stream : NULL;
}
static const char *rtmp_handshake_name(enum rtmp_handshake_state state)
{
switch (state) {
case RTMP_STATE_UNINITIALIZED: return "UNINITIALIZED";
case RTMP_STATE_VERSION_SENT: return "VERSION_SENT";
case RTMP_STATE_ACK_SENT: return "ACK_SENT";
case RTMP_STATE_HANDSHAKE_DONE: return "HANDSHAKE_DONE";
default: return "?";
}
}
int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn)
{
int err = 0;
if (!conn)
return 0;
err |= re_hprintf(pf, "role: %s\n",
conn->is_client ? "Client" : "Server");
err |= re_hprintf(pf, "state: %s\n",
rtmp_handshake_name(conn->state));
err |= re_hprintf(pf, "connected: %d\n", conn->connected);
err |= re_hprintf(pf, "chunk_size: send=%u\n",
conn->send_chunk_size);
err |= re_hprintf(pf, "bytes: %zu\n", conn->total_bytes);
err |= re_hprintf(pf, "streams: %u\n",
list_count(&conn->streaml));
if (conn->is_client) {
err |= re_hprintf(pf, "uri: %s\n", conn->uri);
err |= re_hprintf(pf, "app: %s\n", conn->app);
err |= re_hprintf(pf, "stream: %s\n", conn->stream);
}
err |= re_hprintf(pf, "%H\n", rtmp_dechunker_debug, conn->dechunk);
return err;
}

106
src/rtmp/control.c Normal file
View file

@ -0,0 +1,106 @@
/**
* @file rtmp/control.c Real Time Messaging Protocol (RTMP) -- Control
*
* Copyright (C) 2010 Creytiv.com
*/
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_rtmp.h>
#include "rtmp.h"
/**
* Send an RTMP control message
*
* @param conn RTMP connection
* @param type RTMP Packet type
* @param ... Optional packet arguments
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_control(const struct rtmp_conn *conn, enum rtmp_packet_type type, ...)
{
struct mbuf *mb;
uint32_t u32;
uint16_t event;
va_list ap;
int err = 0;
if (!conn)
return EINVAL;
mb = mbuf_alloc(8);
if (!mb)
return ENOMEM;
va_start(ap, type);
switch (type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
case RTMP_TYPE_WINDOW_ACK_SIZE:
case RTMP_TYPE_ACKNOWLEDGEMENT:
u32 = va_arg(ap, uint32_t);
err = mbuf_write_u32(mb, htonl(u32));
break;
case RTMP_TYPE_USER_CONTROL_MSG:
event = va_arg(ap, unsigned);
err = mbuf_write_u16(mb, htons(event));
err |= mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
break;
case RTMP_TYPE_SET_PEER_BANDWIDTH:
err = mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
err |= mbuf_write_u8(mb, va_arg(ap, unsigned));
break;
default:
err = ENOTSUP;
break;
}
va_end(ap);
if (err)
goto out;
err = rtmp_conn_send_msg(conn, 0, RTMP_CHUNK_ID_CONTROL, 0, 0, type,
RTMP_CONTROL_STREAM_ID, mb->buf, mb->end);
if (err)
goto out;
out:
mem_deref(mb);
return err;
}
/**
* Get the event name as a string
*
* @param event RTMP Event type
*
* @return Name of the event as a string
*/
const char *rtmp_event_name(enum rtmp_event_type event)
{
switch (event) {
case RTMP_EVENT_STREAM_BEGIN: return "StreamBegin";
case RTMP_EVENT_STREAM_EOF: return "StreamEOF";
case RTMP_EVENT_STREAM_DRY: return "StreamDry";
case RTMP_EVENT_SET_BUFFER_LENGTH: return "SetBufferLength";
case RTMP_EVENT_STREAM_IS_RECORDED: return "StreamIsRecorded";
case RTMP_EVENT_PING_REQUEST: return "PingRequest";
case RTMP_EVENT_PING_RESPONSE: return "PingResponse";
default: return "?";
}
}

138
src/rtmp/ctrans.c Normal file
View file

@ -0,0 +1,138 @@
/**
* @file rtmp/ctrans.c Real Time Messaging Protocol -- AMF Client Transactions
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_tcp.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_rtmp.h>
#include "rtmp.h"
struct rtmp_ctrans {
struct le le;
uint64_t tid;
rtmp_resp_h *resph;
void *arg;
};
static void ctrans_destructor(void *data)
{
struct rtmp_ctrans *ct = data;
list_unlink(&ct->le);
}
static struct rtmp_ctrans *rtmp_ctrans_find(const struct list *ctransl,
uint64_t tid)
{
struct le *le;
for (le = list_head(ctransl); le; le = le->next) {
struct rtmp_ctrans *ct = le->data;
if (tid == ct->tid)
return ct;
}
return NULL;
}
int rtmp_amf_request(struct rtmp_conn *conn, uint32_t stream_id,
const char *command,
rtmp_resp_h *resph, void *arg, unsigned body_propc, ...)
{
struct rtmp_ctrans *ct = NULL;
struct mbuf *mb;
va_list ap;
int err;
if (!conn || !command || !resph)
return EINVAL;
mb = mbuf_alloc(512);
if (!mb)
return ENOMEM;
ct = mem_zalloc(sizeof(*ct), ctrans_destructor);
if (!ct) {
err = ENOMEM;
goto out;
}
ct->tid = rtmp_conn_assign_tid(conn);
ct->resph = resph;
ct->arg = arg;
err = rtmp_command_header_encode(mb, command, ct->tid);
if (err)
goto out;
if (body_propc) {
va_start(ap, body_propc);
err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
body_propc, &ap);
va_end(ap);
if (err)
goto out;
}
err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
RTMP_TYPE_AMF0,
stream_id, mb->buf, mb->end);
if (err)
goto out;
list_append(&conn->ctransl, &ct->le, ct);
out:
mem_deref(mb);
if (err)
mem_deref(ct);
return err;
}
int rtmp_ctrans_response(const struct list *ctransl,
const struct odict *msg)
{
struct rtmp_ctrans *ct;
uint64_t tid;
bool success;
rtmp_resp_h *resph;
void *arg;
if (!ctransl || !msg)
return EINVAL;
success = (0 == str_casecmp(odict_string(msg, "0"), "_result"));
if (!odict_get_number(msg, &tid, "1"))
return EPROTO;
ct = rtmp_ctrans_find(ctransl, tid);
if (!ct)
return ENOENT;
resph = ct->resph;
arg = ct->arg;
mem_deref(ct);
resph(success, msg, arg);
return 0;
}

264
src/rtmp/dechunk.c Normal file
View file

@ -0,0 +1,264 @@
/**
* @file rtmp/dechunk.c Real Time Messaging Protocol (RTMP) -- Dechunking
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_list.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_rtmp.h>
#include "rtmp.h"
enum {
MAX_CHUNKS = 64,
};
struct rtmp_chunk {
struct le le;
struct rtmp_header hdr;
struct mbuf *mb;
};
/** Defines the RTMP Dechunker */
struct rtmp_dechunker {
struct list chunkl; /* struct rtmp_chunk */
size_t chunk_sz;
rtmp_dechunk_h *chunkh;
void *arg;
};
static void destructor(void *data)
{
struct rtmp_dechunker *rd = data;
list_flush(&rd->chunkl);
}
static void chunk_destructor(void *data)
{
struct rtmp_chunk *chunk = data;
list_unlink(&chunk->le);
mem_deref(chunk->mb);
}
static struct rtmp_chunk *create_chunk(struct list *chunkl,
const struct rtmp_header *hdr)
{
struct rtmp_chunk *chunk;
chunk = mem_zalloc(sizeof(*chunk), chunk_destructor);
if (!chunk)
return NULL;
chunk->hdr = *hdr;
list_append(chunkl, &chunk->le, chunk);
return chunk;
}
static struct rtmp_chunk *find_chunk(const struct list *chunkl,
uint32_t chunk_id)
{
struct le *le;
for (le = list_head(chunkl); le; le = le->next) {
struct rtmp_chunk *chunk = le->data;
if (chunk_id == chunk->hdr.chunk_id)
return chunk;
}
return NULL;
}
/*
* Stateful RTMP de-chunker for receiving complete messages
*/
int rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
rtmp_dechunk_h *chunkh, void *arg)
{
struct rtmp_dechunker *rd;
if (!rdp || !chunk_sz || !chunkh)
return EINVAL;
rd = mem_zalloc(sizeof(*rd), destructor);
if (!rd)
return ENOMEM;
rd->chunk_sz = chunk_sz;
rd->chunkh = chunkh;
rd->arg = arg;
*rdp = rd;
return 0;
}
int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb)
{
struct rtmp_header hdr;
struct rtmp_chunk *chunk;
size_t chunk_sz, left, msg_len;
int err;
if (!rd || !mb)
return EINVAL;
err = rtmp_header_decode(&hdr, mb);
if (err)
return err;
/* find preceding chunk, from chunk id */
chunk = find_chunk(&rd->chunkl, hdr.chunk_id);
if (!chunk) {
/* only type 0 can create a new chunk stream */
if (hdr.format == 0) {
if (list_count(&rd->chunkl) > MAX_CHUNKS)
return EOVERFLOW;
chunk = create_chunk(&rd->chunkl, &hdr);
if (!chunk)
return ENOMEM;
}
else
return ENOENT;
}
/* only types 0-2 can create a new buffer */
switch (hdr.format) {
case 0:
case 1:
case 2:
if (hdr.format == 0) {
/* copy the whole header */
chunk->hdr = hdr;
}
else if (hdr.format == 1) {
chunk->hdr.timestamp_delta = hdr.timestamp_delta;
chunk->hdr.length = hdr.length;
chunk->hdr.type_id = hdr.type_id;
chunk->hdr.timestamp += hdr.timestamp_delta;
}
else if (hdr.format == 2) {
chunk->hdr.timestamp_delta = hdr.timestamp_delta;
chunk->hdr.timestamp += hdr.timestamp_delta;
}
msg_len = chunk->hdr.length;
chunk_sz = min(msg_len, rd->chunk_sz);
if (mbuf_get_left(mb) < chunk_sz)
return ENODATA;
mem_deref(chunk->mb);
chunk->mb = mbuf_alloc(msg_len);
if (!chunk->mb)
return ENOMEM;
err = mbuf_read_mem(mb, chunk->mb->buf, chunk_sz);
if (err)
return err;
chunk->mb->pos = chunk_sz;
chunk->mb->end = chunk_sz;
chunk->hdr.format = hdr.format;
break;
case 3:
if (!chunk->mb)
return EPROTO;
left = mbuf_get_space(chunk->mb);
chunk_sz = min(left, rd->chunk_sz);
if (mbuf_get_left(mb) < chunk_sz)
return ENODATA;
err = mbuf_read_mem(mb, mbuf_buf(chunk->mb), chunk_sz);
if (err)
return err;
chunk->mb->pos += chunk_sz;
chunk->mb->end += chunk_sz;
break;
}
if (chunk->mb->pos >= chunk->mb->size) {
struct mbuf *buf;
chunk->mb->pos = 0;
buf = chunk->mb;
chunk->mb = NULL;
err = rd->chunkh(&chunk->hdr, buf, rd->arg);
mem_deref(buf);
}
return err;
}
void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz)
{
if (!rd || !chunk_sz)
return;
rd->chunk_sz = chunk_sz;
}
int rtmp_dechunker_debug(struct re_printf *pf, const struct rtmp_dechunker *rd)
{
struct le *le;
int err;
if (!rd)
return 0;
err = re_hprintf(pf, "Dechunker Debug:\n");
err |= re_hprintf(pf, "chunk list: (%u)\n", list_count(&rd->chunkl));
for (le = rd->chunkl.head; le; le = le->next) {
const struct rtmp_chunk *msg = le->data;
err |= re_hprintf(pf, ".. %H\n",
rtmp_header_print, &msg->hdr);
}
err |= re_hprintf(pf, "\n");
return err;
}

256
src/rtmp/hdr.c Normal file
View file

@ -0,0 +1,256 @@
/**
* @file rtmp/hdr.c Real Time Messaging Protocol (RTMP) -- Headers
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_sys.h>
#include <re_rtmp.h>
#include "rtmp.h"
enum {
RTMP_CHUNK_ID_MIN = 3,
RTMP_CHUNK_ID_MAX = 65599, /* 65535 + 64 */
RTMP_CHUNK_OFFSET = 64,
};
static int mbuf_write_u24_hton(struct mbuf *mb, uint32_t u24)
{
int err = 0;
err |= mbuf_write_u8(mb, u24 >> 16);
err |= mbuf_write_u8(mb, u24 >> 8);
err |= mbuf_write_u8(mb, u24 >> 0);
return err;
}
static uint32_t mbuf_read_u24_ntoh(struct mbuf *mb)
{
uint32_t u24;
u24 = (uint32_t)mbuf_read_u8(mb) << 16;
u24 |= (uint32_t)mbuf_read_u8(mb) << 8;
u24 |= (uint32_t)mbuf_read_u8(mb) << 0;
return u24;
}
static int encode_basic_hdr(struct mbuf *mb, unsigned fmt,
uint32_t chunk_id)
{
uint8_t v;
int err = 0;
if (chunk_id >= 320) {
const uint16_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
v = fmt<<6 | 1;
err |= mbuf_write_u8(mb, v);
err |= mbuf_write_u16(mb, htons(cs_id));
}
else if (chunk_id >= RTMP_CHUNK_OFFSET) {
const uint8_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
v = fmt<<6 | 0;
err |= mbuf_write_u8(mb, v);
err |= mbuf_write_u8(mb, cs_id);
}
else {
v = fmt<<6 | chunk_id;
err |= mbuf_write_u8(mb, v);
}
return err;
}
static int decode_basic_hdr(struct rtmp_header *hdr, struct mbuf *mb)
{
uint8_t cs_id;
uint8_t v;
if (mbuf_get_left(mb) < 1)
return ENODATA;
v = mbuf_read_u8(mb);
hdr->format = v>>6;
cs_id = v & 0x3f;
switch (cs_id) {
case 0:
if (mbuf_get_left(mb) < 1)
return ENODATA;
hdr->chunk_id = mbuf_read_u8(mb) + RTMP_CHUNK_OFFSET;
break;
case 1:
if (mbuf_get_left(mb) < 2)
return ENODATA;
hdr->chunk_id = ntohs(mbuf_read_u16(mb)) + RTMP_CHUNK_OFFSET;
break;
default:
hdr->chunk_id = cs_id;
break;
}
return 0;
}
int rtmp_header_encode(struct mbuf *mb, const struct rtmp_header *hdr)
{
bool ext_ts;
int err = 0;
if (!mb || !hdr)
return EINVAL;
err = encode_basic_hdr(mb, hdr->format, hdr->chunk_id);
if (err)
return err;
switch (hdr->format) {
case 0:
ext_ts = (hdr->timestamp >= 0x00ffffff);
err |= mbuf_write_u24_hton(mb,
ext_ts ? 0xffffff : hdr->timestamp);
err |= mbuf_write_u24_hton(mb, hdr->length);
err |= mbuf_write_u8(mb, hdr->type_id);
err |= mbuf_write_u32(mb, sys_htoll(hdr->stream_id));
if (ext_ts) {
err |= mbuf_write_u32(mb, htonl(hdr->timestamp));
}
break;
case 1:
err |= mbuf_write_u24_hton(mb, hdr->timestamp_delta);
err |= mbuf_write_u24_hton(mb, hdr->length);
err |= mbuf_write_u8(mb, hdr->type_id);
break;
case 2:
err |= mbuf_write_u24_hton(mb, hdr->timestamp_delta);
break;
case 3:
break;
}
return err;
}
int rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb)
{
int err;
if (!hdr || !mb)
return EINVAL;
memset(hdr, 0, sizeof(*hdr));
err = decode_basic_hdr(hdr, mb);
if (err)
return err;
switch (hdr->format) {
case 0:
if (mbuf_get_left(mb) < 11)
return ENODATA;
hdr->timestamp = mbuf_read_u24_ntoh(mb);
hdr->length = mbuf_read_u24_ntoh(mb);
hdr->type_id = mbuf_read_u8(mb);
hdr->stream_id = sys_ltohl(mbuf_read_u32(mb));
if (hdr->timestamp == 0x00ffffff) {
if (mbuf_get_left(mb) < 4)
return ENODATA;
hdr->timestamp = ntohl(mbuf_read_u32(mb));
}
break;
case 1:
if (mbuf_get_left(mb) < 7)
return ENODATA;
hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
hdr->length = mbuf_read_u24_ntoh(mb);
hdr->type_id = mbuf_read_u8(mb);
break;
case 2:
if (mbuf_get_left(mb) < 3)
return ENODATA;
hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
break;
case 3:
/* no payload */
break;
}
return 0;
}
int rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr)
{
if (!hdr)
return 0;
return re_hprintf(pf,
"fmt %u, chunk %u, "
"timestamp %5u, ts_delta %2u,"
" len %3u, type %2u (%-14s) stream_id %u",
hdr->format, hdr->chunk_id, hdr->timestamp,
hdr->timestamp_delta, hdr->length, hdr->type_id,
rtmp_packet_type_name(hdr->type_id), hdr->stream_id);
}
const char *rtmp_packet_type_name(enum rtmp_packet_type type)
{
switch (type) {
case RTMP_TYPE_SET_CHUNK_SIZE: return "Set Chunk Size";
case RTMP_TYPE_ACKNOWLEDGEMENT: return "Acknowledgement";
case RTMP_TYPE_USER_CONTROL_MSG: return "User Control Message";
case RTMP_TYPE_WINDOW_ACK_SIZE: return "Window Acknowledgement Size";
case RTMP_TYPE_SET_PEER_BANDWIDTH:return "Set Peer Bandwidth";
case RTMP_TYPE_AUDIO: return "Audio Message";
case RTMP_TYPE_VIDEO: return "Video Message";
case RTMP_TYPE_DATA: return "Data Message";
case RTMP_TYPE_AMF0: return "AMF";
default: return "?";
}
}

16
src/rtmp/mod.mk Normal file
View file

@ -0,0 +1,16 @@
#
# mod.mk
#
# Copyright (C) 2010 Creytiv.com
#
SRCS += rtmp/amf.c
SRCS += rtmp/amf_dec.c
SRCS += rtmp/amf_enc.c
SRCS += rtmp/chunk.c
SRCS += rtmp/conn.c
SRCS += rtmp/control.c
SRCS += rtmp/ctrans.c
SRCS += rtmp/dechunk.c
SRCS += rtmp/hdr.c
SRCS += rtmp/stream.c

176
src/rtmp/rtmp.h Normal file
View file

@ -0,0 +1,176 @@
/**
* @file rtmp.h Real Time Messaging Protocol (RTMP) -- Internal API
*
* Copyright (C) 2010 Creytiv.com
*/
enum {
RTMP_PROTOCOL_VERSION = 3,
RTMP_DEFAULT_CHUNKSIZE = 128,
RTMP_HANDSHAKE_SIZE = 1536,
RTMP_MESSAGE_LEN_MAX = 524288,
};
/* Chunk IDs */
enum {
RTMP_CHUNK_ID_CONTROL = 2,
RTMP_CHUNK_ID_CONN = 3,
};
/** Defines the RTMP Handshake State */
enum rtmp_handshake_state {
RTMP_STATE_UNINITIALIZED = 0,
RTMP_STATE_VERSION_SENT,
RTMP_STATE_ACK_SENT,
RTMP_STATE_HANDSHAKE_DONE
};
/**
* Defines an RTMP Connection
*/
struct rtmp_conn {
struct list streaml;
struct rtmp_dechunker *dechunk;
struct tcp_conn *tc;
struct mbuf *mb; /* TCP reassembly buffer */
enum rtmp_handshake_state state;
size_t total_bytes;
size_t last_ack;
uint32_t window_ack_size;
uint32_t send_chunk_size;
unsigned chunk_id_counter;
bool is_client;
bool connected;
rtmp_estab_h *estabh;
rtmp_command_h *cmdh;
rtmp_close_h *closeh;
void *arg;
/* client specific: */
struct dnsc *dnsc;
struct dns_query *dnsq4;
struct dns_query *dnsq6;
struct list ctransl;
struct sa srvv[16];
unsigned srvc;
uint64_t tid_counter;
uint16_t port;
char *app;
char *uri;
char *stream;
char *host;
};
/**
* Defines an RTMP Stream
*/
struct rtmp_stream {
struct le le;
const struct rtmp_conn *conn; /**< Pointer to parent connection */
bool created;
uint32_t stream_id;
unsigned chunk_id_audio;
unsigned chunk_id_video;
unsigned chunk_id_data;
rtmp_audio_h *auh;
rtmp_video_h *vidh;
rtmp_command_h *datah;
rtmp_command_h *cmdh;
rtmp_resp_h *resph;
rtmp_control_h *ctrlh;
void *arg;
};
struct rtmp_header {
unsigned format:2; /* type 0-3 */
uint32_t chunk_id; /* from 3-65599 */
uint32_t timestamp; /* 24-bit or 32-bit */
uint32_t timestamp_delta; /* 24-bit */
uint32_t length; /* 24-bit */
uint8_t type_id; /* enum rtmp_packet_type */
uint32_t stream_id;
};
/* Command */
int rtmp_command_header_encode(struct mbuf *mb, const char *name,
uint64_t tid);
/* Connection */
int rtmp_conn_send_msg(const struct rtmp_conn *conn, unsigned format,
uint32_t chunk_id, uint32_t timestamp,
uint32_t timestamp_delta, uint8_t msg_type_id,
uint32_t msg_stream_id,
const uint8_t *payload, size_t payload_len);
int rtmp_send_amf_command(const struct rtmp_conn *conn,
unsigned format, uint32_t chunk_id,
uint8_t type_id,
uint32_t msg_stream_id,
const uint8_t *cmd, size_t len);
unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn);
uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn);
/* Client Transaction */
struct rtmp_ctrans;
int rtmp_ctrans_response(const struct list *ctransl,
const struct odict *msg);
/*
* RTMP Chunk
*/
int rtmp_chunker(unsigned format, uint32_t chunk_id,
uint32_t timestamp, uint32_t timestamp_delta,
uint8_t msg_type_id, uint32_t msg_stream_id,
const uint8_t *payload, size_t payload_len,
size_t max_chunk_sz, struct tcp_conn *tc);
/*
* RTMP Header
*/
int rtmp_header_encode(struct mbuf *mb, const struct rtmp_header *hdr);
int rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb);
int rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr);
const char *rtmp_packet_type_name(enum rtmp_packet_type type);
/*
* RTMP De-chunker
*/
struct rtmp_dechunker;
typedef int (rtmp_dechunk_h)(const struct rtmp_header *hdr,
struct mbuf *mb, void *arg);
int rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
rtmp_dechunk_h *chunkh, void *arg);
int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb);
void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz);
int rtmp_dechunker_debug(struct re_printf *pf,
const struct rtmp_dechunker *rd);
/*
* AMF (Action Message Format)
*/
int rtmp_amf_encode_number(struct mbuf *mb, double val);
int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean);
int rtmp_amf_encode_string(struct mbuf *mb, const char *str);
int rtmp_amf_encode_null(struct mbuf *mb);
int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
unsigned propc, va_list *ap);
int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb);

288
src/rtmp/stream.c Normal file
View file

@ -0,0 +1,288 @@
/**
* @file rtmp/stream.c Real Time Messaging Protocol (RTMP) -- NetStream
*
* Copyright (C) 2010 Creytiv.com
*/
#include <string.h>
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
#include <re_mbuf.h>
#include <re_net.h>
#include <re_sa.h>
#include <re_list.h>
#include <re_tcp.h>
#include <re_sys.h>
#include <re_odict.h>
#include <re_rtmp.h>
#include "rtmp.h"
static void destructor(void *data)
{
struct rtmp_stream *strm = data;
list_unlink(&strm->le);
if (strm->created) {
rtmp_amf_command(strm->conn, 0, "deleteStream",
3,
RTMP_AMF_TYPE_NUMBER, 0.0,
RTMP_AMF_TYPE_NULL,
RTMP_AMF_TYPE_NUMBER, (double)strm->stream_id);
}
}
/**
* Allocate a new RTMP Stream object
*
* @param strmp Pointer to allocated RTMP Stream
* @param conn RTMP Connection
* @param stream_id Stream id
* @param cmdh Command handler
* @param ctrlh Control handler
* @param auh Audio handler
* @param vidh Video handler
* @param datah Data handler
* @param arg Handler argument
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_stream_alloc(struct rtmp_stream **strmp, struct rtmp_conn *conn,
uint32_t stream_id, rtmp_command_h *cmdh,
rtmp_control_h *ctrlh, rtmp_audio_h *auh,
rtmp_video_h *vidh, rtmp_command_h *datah,
void *arg)
{
struct rtmp_stream *strm;
if (!strmp || !conn)
return EINVAL;
strm = mem_zalloc(sizeof(*strm), destructor);
if (!strm)
return ENOMEM;
strm->conn = conn;
strm->stream_id = stream_id;
strm->cmdh = cmdh;
strm->ctrlh = ctrlh;
strm->auh = auh;
strm->vidh = vidh;
strm->datah = datah;
strm->arg = arg;
strm->chunk_id_audio = rtmp_conn_assign_chunkid(conn);
strm->chunk_id_video = rtmp_conn_assign_chunkid(conn);
strm->chunk_id_data = rtmp_conn_assign_chunkid(conn);
list_append(&conn->streaml, &strm->le, strm);
*strmp = strm;
return 0;
}
static void createstream_handler(bool success, const struct odict *msg,
void *arg)
{
struct rtmp_stream *strm = arg;
uint64_t num;
if (!success)
goto out;
if (!odict_get_number(msg, &num, "3")) {
success = false;
goto out;
}
strm->stream_id = (uint32_t)num;
if (strm->stream_id == 0) {
success = false;
goto out;
}
strm->created = true;
out:
if (strm->resph)
strm->resph(success, msg, strm->arg);
}
/**
* Create a new RTMP Stream by sending "createStream" to the RTMP Server.
*
* @param strmp Pointer to allocated RTMP Stream
* @param conn RTMP Connection
* @param resph RTMP Response handler
* @param cmdh Command handler
* @param ctrlh Control handler
* @param auh Audio handler
* @param vidh Video handler
* @param datah Data handler
* @param arg Handler argument
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_stream_create(struct rtmp_stream **strmp, struct rtmp_conn *conn,
rtmp_resp_h *resph, rtmp_command_h *cmdh,
rtmp_control_h *ctrlh, rtmp_audio_h *auh,
rtmp_video_h *vidh, rtmp_command_h *datah,
void *arg)
{
struct rtmp_stream *strm;
int err;
if (!strmp || !conn)
return EINVAL;
err = rtmp_stream_alloc(&strm, conn, (uint32_t)-1,
cmdh, ctrlh, auh, vidh, datah, arg);
if (err)
return err;
strm->resph = resph;
err = rtmp_amf_request(conn, 0,
"createStream", createstream_handler, strm,
1,
RTMP_AMF_TYPE_NULL);
if (err)
goto out;
out:
if (err)
mem_deref(strm);
else
*strmp = strm;
return err;
}
/**
* Start playing an RTMP Stream by sending "play" to the RTMP Server
*
* @param strm RTMP Stream
* @param name Stream name
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_play(struct rtmp_stream *strm, const char *name)
{
if (!strm || !name)
return EINVAL;
return rtmp_amf_command(strm->conn, strm->stream_id, "play",
4,
RTMP_AMF_TYPE_NUMBER, 0.0,
RTMP_AMF_TYPE_NULL,
RTMP_AMF_TYPE_STRING, name,
RTMP_AMF_TYPE_NUMBER, -2000.0);
}
/**
* Start publishing an RTMP Stream by sending "publish" to the RTMP Server
*
* @param strm RTMP Stream
* @param name Stream name
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_publish(struct rtmp_stream *strm, const char *name)
{
if (!strm || !name)
return EINVAL;
return rtmp_amf_command(strm->conn, strm->stream_id, "publish",
4,
RTMP_AMF_TYPE_NUMBER, 0.0,
RTMP_AMF_TYPE_NULL,
RTMP_AMF_TYPE_STRING, name,
RTMP_AMF_TYPE_STRING, "live");
}
/**
* Send audio packet on the RTMP Stream
*
* @param strm RTMP Stream
* @param timestamp Timestamp in [milliseconds]
* @param pld Audio payload
* @param len Payload length
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_send_audio(struct rtmp_stream *strm, uint32_t timestamp,
const uint8_t *pld, size_t len)
{
uint32_t chunk_id;
if (!strm || !pld || !len)
return EINVAL;
chunk_id = strm->chunk_id_audio;
return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
RTMP_TYPE_AUDIO, strm->stream_id, pld, len);
}
/**
* Send video packet on the RTMP Stream
*
* @param strm RTMP Stream
* @param timestamp Timestamp in [milliseconds]
* @param pld Video payload
* @param len Payload length
*
* @return 0 if success, otherwise errorcode
*/
int rtmp_send_video(struct rtmp_stream *strm, uint32_t timestamp,
const uint8_t *pld, size_t len)
{
uint32_t chunk_id;
if (!strm || !pld || !len)
return EINVAL;
chunk_id = strm->chunk_id_video;
return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
RTMP_TYPE_VIDEO, strm->stream_id, pld, len);
}
/**
* Find an RTMP Stream by stream id
*
* @param conn RTMP Connection
* @param stream_id Stream id
*
* @return RTMP Stream if found, or NULL if not found
*/
struct rtmp_stream *rtmp_stream_find(const struct rtmp_conn *conn,
uint32_t stream_id)
{
struct le *le;
if (!conn)
return NULL;
for (le = list_head(&conn->streaml); le; le = le->next) {
struct rtmp_stream *strm = le->data;
if (stream_id == strm->stream_id)
return strm;
}
return NULL;
}