同步和异步

同步和异步函数

Redis 堆栈 Redis 社区版 Redis 企业软件 Redis 云 Redis 社区版 适用于 Kubernetes 的 Redis Enterprise 客户

默认情况下,每次调用函数时,都会同步执行该函数。这确保了原子性属性,这意味着在函数运行时不会在 Redis 上执行其他命令。原子性属性具有以下几个优点:

  • 可以同时更新多个密钥,从而保证其他客户端看到完整更新,而不是部分更新。
  • Redis 中的数据在处理过程中保持不变。

但是,atomicity 属性的主要缺点是 Redis 在整个调用过程中被阻止,从而阻止它为其他客户端提供服务。

Redis Stack 的触发器和函数功能旨在通过在后台调用函数来为函数编写者提供更大的灵活性。当函数在后台调用时,它无法直接访问 Redis 密钥空间。要从后台与 Redis 密钥空间交互,该函数必须阻止 Redis 并进入原子部分,在该部分再次保证原子性属性。

要在后台运行触发器和函数,可以使用registerAsyncFunction应用程序接口。协程在后台线程上调用,不会阻止 Redis 进程。下面是一个示例:

#!js api_version=1.0 name=lib

redis.registerAsyncFunction('test', async function(){
    return 'test';
});

The simple function shown above will return the value 'test' and will execute on a background thread without blocking Redis. This allows Redis to continue accepting commands from other clients while the function is running.

The coroutine also accepts an optional client argument, which differs from the client used in synchronous functions. This client argument does not allow direct invocation of Redis commands. Instead, it provides the capability to block Redis and enter an atomic section where the atomicity property is once again guaranteed. Here's an example that demonstrates invoking a ping command from within an async coroutine:

#!js api_version=1.0 name=lib

redis.registerFunction('test', async function(client){
    return client.block(function(redis_client){
        return redis_client.call('ping');
    });
});

Running this function will return a pong reply:

127.0.0.1:6379> TFCALLASYNC lib.test 0
"PONG"

Notice that this time, in order to invoke the function, we used TFCALLASYNC. We can only invoke async functions using TFCALLASYNC.

Now let's look at a more complex example. Assume we want to write a function that counts the number of hashes in Redis that have a name property with some value. As a first attempt, we'll write a synchronous function that uses the SCAN command to scan the key space:

#!js api_version=1.0 name=lib

redis.registerFunction('test', function(client, expected_name){
    var count = 0;
    var cursor = '0';
    do{
        var res = client.call('scan', cursor);
        cursor = res[0];
        var keys = res[1];
        keys.forEach((key) => {
            if (client.call('hget', key, 'name') == expected_name) {
                count += 1;
            }
        });
    } while(cursor != '0');
    return count;
});

While this function works, it has the potential to block Redis for a long time. So let's modify this function to run in the background as a coroutine:

#!js api_version=1.0 name=lib

redis.registerAsyncFunction('test', async function(async_client, expected_name){
    var count = 0;
    var cursor = '0';
    do{
        async_client.block((client)=>{
            var res = client.call('scan', cursor);
            cursor = res[0];
            var keys = res[1];
            keys.forEach((key) => {
                if (client.call('hget', key, 'name') == expected_name) {
                    count += 1;
                }
            });
        });
    } while(cursor != '0');
    return count;
});

Both implementations return the same result, but the second function runs in the background and blocks Redis just to analyze the next batch of keys that are returned from the SCAN command. Other commands will be processed in between SCAN batches. Notice that the coroutine approach allows the key space to be changed while the scanning it. The function writer will need to decide if this is acceptable.

Start sync and move to async

The previous example, although functional, has a drawback in terms of performance. Even though Redis is not blocked, it still takes time to return the reply to the user. However, if we modify the requirement slightly and agree to obtain an approximate value, we can achieve much better performance in most cases. This can be done by implementing result caching using a key named <name>_count and setting an expiration time on that key, which triggers recalculation of the value periodically. Here's the updated code:

#!js api_version=1.0 name=lib

redis.registerAsyncFunction('test', async function(async_client, expected_name){
    // check the cache first
    var cached_value = async_client.block((client)=>{
        return client.call('get', expected_name + '_count');
    });

    if (cached_value != null) {
        return cached_value;
    }


    var count = 0;
    var cursor = '0';
    do{
        async_client.block((client)=>{
            var res = client.call('scan', cursor);
            cursor = res[0];
            var keys = res[1];
            keys.forEach((key) => {
                if (client.call('hget', key, 'name') == expected_name) {
                    count += 1;
                }
            });
        });
    } while(cursor != '0');

    // set count to the cache wil 5 seconds expiration
    async_client.block((client)=>{
        client.call('set', expected_name + '_count', count);
        client.call('expire', expected_name + '_count', 5);
    });

    return count;
});

The above code works as expected. It first checks the cache and if the cache exists it's returned. Otherwise it will perform the calculation and update the cache. But the above example is not optimal. The callback is a coroutine, which means that it will always be calculated on a background thread. Intrinsically, moving to a background thread is costly. The best approach would be to check the cache synchronously and, only if its not there, move to the background. Triggers and functions provides for starting synchronously and then moving asynchronous execution using executeAsync function as required. The new code:

#!js api_version=1.0 name=lib

redis.registerAsyncFunction('test', function(client, expected_name){
    // check the cache first
    var cached_value = client.call('get', expected_name + '_count');
    if (cached_value != null) {
        return cached_value;
    }

    // cache is not set, move to background
    return client.executeAsync(async function(async_client) {
        var count = 0;
        var cursor = '0';
        do{
            async_client.block((client)=>{
                var res = client.call('scan', cursor);
                cursor = res[0];
                var keys = res[1];
                keys.forEach((key) => {
                    if (client.call('hget', key, 'name') == expected_name) {
                        count += 1;
                    }
                });
            });
        } while(cursor != '0');

        // set count to the cache wil 5 seconds expiration
        async_client.block((client)=>{
            client.call('set', expected_name + '_count', count);
            client.call('expire', expected_name + '_count', 5);
        });

        return count;
    });
});

executeAsync will return a Promise object. When triggers and functions sees that the function returns a Promise, it waits for the promise to be resolved and returns its result to the client. The above implementation will be much faster in the case of cache hit.

Notice that even though we registered a synchronous function (not a coroutine) we still used registerAsyncFunction. This is because our function has the potential of blocking the client, taking the execution to the background. If we had used registerFunction, Triggers and functions would not have allowed the function to block the client and it would have ignored the returned promise object.

Also notice it is not always possible to wait for a promise to be resolved. If the command is called inside a multi/exec it is not possible to block it and wait for the promise. In such cases the client will get an error. It is possible to check if blocking the client is allowed using the client.isBlockAllowed() function, which will return true if it is OK to wait for a promise to be resolved and false if it is not possible.

Call blocking commands

Redis has a few commands that blocks the client and executed asynchronously when some condition holds (commands like blpop). In general, such commands are not suppose to be called inside a script and calling them will result in running their none blocking logic. For example, blpop will basically runs lpop and return empty result if the list it empty.

RedisGears allows running blocking commands using client.callAsync API. client.callAsync will execute the blocking command and return a promise object which will be resolved when the command invocation finished (notice that client.callAsync allow calling any command and not just blocking but it will always return a promise object that will be resolve later, so using it for regular commands is less efficient).

Example:

#!js api_version=1.0 name=lib

redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
    var res = null
    do {
        res = await client.block((c) => {
            return c.callAsync('blpop', key, '0');
        })
    } while (res[1] != expected_val);
    return res;
});

The following function will continue popping elements from the requested list up until it will encounter the requested value. In case the list is empty it will wait until elements will be added to the list.

RedisGears also provided client.callAsyncRaw API, which is the same as client.callAsync but will not decode the replies as utf8.

Notice: There is no guarantee when the promise returned from client.callAsyn will be resolved. So the function writer should not make any assumption about atomicity guarantees.

Block Redis failure

Blocking Redis might fail for a few reasons:

  • Redis reached OOM state and the redis.functionFlags.NO_WRITES or redis.functionFlags.ALLOW_OOM flags are not set (see functions flags for more information)
  • redis.functionFlags.NO_WRITES flag is not set and the Redis instance changed roles and is now a replica.
  • The ACL user that invoked the function was deleted.

The failure will result in an exception that the function writer can choose to handle or throw it to be caught by triggers and functions.

Block Redis timeout

Blocking Redis for a long time is discouraged and is considered an unsafe operation. The triggers and functions feature attempts to protect the function writer and will time out the blocking function if it continues for too long. The timeout can be set as a module configuration along side the fatal failure policy that indicates how to handle the timeout. Policies can be one of the following:

  • Abort - Stop the function invocation even at the cost of losing the atomicity property.
  • Kill - Keep the atomicity property and do not stop the function invocation. In this case there is a risk of an external process killing the Redis server, thinking that the shard is not responding.
RATE THIS PAGE
Back to top ↑